proyectos ejemplos python multiprocessing pickle

python - ejemplos - Multiprocesamiento: ¿cómo usar Pool.map en una función definida en una clase?



django (13)

Cuando ejecuto algo como:

from multiprocessing import Pool p = Pool(5) def f(x): return x*x p.map(f, [1,2,3])

funciona bien. Sin embargo, poniendo esto como una función de una clase:

class calculate(object): def run(self): def f(x): return x*x p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run()

Me da el siguiente error:

Exception in thread Thread-1: Traceback (most recent call last): File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner self.run() File "/sw/lib/python2.6/threading.py", line 484, in run self.__target(*self.__args, **self.__kwargs) File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks put(task) PicklingError: Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed

He visto una publicación de Alex Martelli sobre el mismo tipo de problema, pero no fue lo suficientemente explícito.


Actualmente, no hay solución a su problema, hasta donde yo sé: la función que le da a map() debe ser accesible a través de una importación de su módulo. Esta es la razón por la cual el código de Robert funciona: la función f() se puede obtener importando el siguiente código:

def f(x): return x*x class Calculate(object): def run(self): p = Pool() return p.map(f, [1,2,3]) if __name__ == ''__main__'': cl = Calculate() print cl.run()

De hecho, agregué una sección "principal" porque sigue las recomendaciones para la plataforma de Windows ("Asegúrese de que el módulo principal pueda ser importado de forma segura por un nuevo intérprete de Python sin causar efectos secundarios no deseados").

También agregué una letra mayúscula delante de Calculate , para seguir a PEP 8 . :)


Aquí está mi solución, que creo que es un poco menos hackosa que la mayoría de los otros aquí. Es similar a la respuesta de nightowl.

someclasses = [MyClass(), MyClass(), MyClass()] def method_caller(some_object, some_method=''the method''): return getattr(some_object, some_method)() othermethod = partial(method_caller, some_method=''othermethod'') with Pool(6) as pool: result = pool.map(othermethod, someclasses)


El multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.

Si usa una bifurcación de multiprocessing llamada pathos.multiprocesssing , puede usar directamente clases y métodos de clase en las funciones de map de multiprocesamiento. Esto se debe a que se usa dill lugar de pickle o cPickle , y dill puede serializar casi cualquier cosa en python.

pathos.multiprocessing también proporciona una función de mapa asíncrona ... y puede map funciones con múltiples argumentos (por ejemplo, map(math.pow, [1,2,3], [4,5,6]) )

Ver las discusiones: ¿Qué pueden hacer el multiprocesamiento y el eneldo juntos?

y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization

Incluso maneja el código que escribió inicialmente, sin modificaciones, y del intérprete. ¿Por qué hacer algo más que sea más frágil y específico para un solo caso?

>>> from pathos.multiprocessing import ProcessingPool as Pool >>> class calculate(object): ... def run(self): ... def f(x): ... return x*x ... p = Pool() ... return p.map(f, [1,2,3]) ... >>> cl = calculate() >>> print cl.run() [1, 4, 9]

Obtenga el código aquí: https://github.com/uqfoundation/pathos

Y, solo para presumir un poco más de lo que puede hacer:

>>> from pathos.multiprocessing import ProcessingPool as Pool >>> >>> p = Pool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> res = p.amap(t.plus, x, y) >>> res.get() [4, 6, 8, 10]


La solución por mrule es correcta pero tiene un error: si el niño envía una gran cantidad de datos, puede llenar el búfer de la tubería, bloqueando en el pipe.send() del niño, mientras el padre espera que el niño salga pipe.join() . La solución es leer los datos del niño antes de join() al niño. Además, el niño debe cerrar el extremo del tubo de los padres para evitar un punto muerto. El código a continuación arregla eso. También tenga en cuenta que este parmap crea un proceso por elemento en X Una solución más avanzada es usar multiprocessing.cpu_count() para dividir X en una cantidad de fragmentos, y luego combinar los resultados antes de regresar. Dejo eso como un ejercicio para el lector para no estropear la concisión de la buena respuesta por mrule. ;)

from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(ppipe, cpipe,x): ppipe.close() cpipe.send(f(x)) cpipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] ret = [p.recv() for (p,c) in pipe] [p.join() for p in proc] return ret if __name__ == ''__main__'': print parmap(lambda x:x**x,range(1,5))


Las funciones definidas en las clases (incluso dentro de las funciones dentro de las clases) realmente no se salvan. Sin embargo, esto funciona:

def f(x): return x*x class calculate(object): def run(self): p = Pool() return p.map(f, [1,2,3]) cl = calculate() print cl.run()


Modifiqué el método de Klaus se porque si bien funcionaba para mí con listas pequeñas, se bloqueaba cuando el número de elementos era ~ 1000 o más. En lugar de presionar los trabajos de uno en uno con la condición None detención, cargo la cola de entrada todo de una vez y simplemente dejo que los procesos la mastiquen hasta que esté vacía.

from multiprocessing import cpu_count, Queue, Process def apply_func(f, q_in, q_out): while not q_in.empty(): i, x = q_in.get() q_out.put((i, f(x))) # map a function using a pool of processes def parmap(f, X, nprocs = cpu_count()): q_in, q_out = Queue(), Queue() proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)] sent = [q_in.put((i, x)) for i, x in enumerate(X)] [p.start() for p in proc] res = [q_out.get() for _ in sent] [p.join() for p in proc] return [x for i,x in sorted(res)]

Editar: desafortunadamente ahora me estoy encontrando con este error en mi sistema: el límite máximo de la cola de multiprocesamiento es 32767 , con suerte las soluciones allí ayudarán.


No estoy seguro de si este enfoque se ha tomado, pero un trabajo que estoy usando es:

from multiprocessing import Pool t = None def run(n): return t.f(n) class Test(object): def __init__(self, number): self.number = number def f(self, x): print x * self.number def pool(self): pool = Pool(2) pool.map(run, range(10)) if __name__ == ''__main__'': t = Test(9) t.pool() pool = Pool(2) pool.map(run, range(10))

La salida debería ser:

0 9 18 27 36 45 54 63 72 81 0 9 18 27 36 45 54 63 72 81


No pude usar los códigos publicados hasta ahora porque los códigos que usan "multiprocesamiento.Pool" no funcionan con expresiones lambda y los códigos que no usan "multiprocesamiento.Pool" generan tantos procesos como elementos de trabajo.

Adapte el código porque genera una cantidad predefinida de trabajadores y solo itera a través de la lista de entrada si existe un trabajador inactivo. También habilité el modo "daemon" para los trabajadores st ctrl-c funciona como se esperaba.

import multiprocessing def fun(f, q_in, q_out): while True: i, x = q_in.get() if i is None: break q_out.put((i, f(x))) def parmap(f, X, nprocs=multiprocessing.cpu_count()): q_in = multiprocessing.Queue(1) q_out = multiprocessing.Queue() proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out)) for _ in range(nprocs)] for p in proc: p.daemon = True p.start() sent = [q_in.put((i, x)) for i, x in enumerate(X)] [q_in.put((None, None)) for _ in range(nprocs)] res = [q_out.get() for _ in range(len(sent))] [p.join() for p in proc] return [x for i, x in sorted(res)] if __name__ == ''__main__'': print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))


Sé que esto fue preguntado hace más de 6 años, pero solo quería agregar mi solución, ya que algunas de las sugerencias anteriores parecen terriblemente complicadas, pero mi solución fue realmente muy simple.

Todo lo que tuve que hacer fue ajustar la llamada pool.map () a una función auxiliar. Pasar el objeto de clase junto con args para el método como una tupla, que se parecía un poco a esto.

def run_in_parallel(args): return args[0].method(args[1]) myclass = MyClass() method_args = [1,2,3,4,5,6] args_map = [ (myclass, arg) for arg in method_args ] pool = Pool() pool.map(run_in_parallel, args_map)


También he luchado con esto. Tenía funciones como miembros de datos de una clase, como un ejemplo simplificado:

from multiprocessing import Pool import itertools pool = Pool() class Example(object): def __init__(self, my_add): self.f = my_add def add_lists(self, list1, list2): # Needed to do something like this (the following line won''t work) return pool.map(self.f,list1,list2)

Necesitaba usar la función self.f en una llamada Pool.map () desde dentro de la misma clase y self.f no tomaba una tupla como argumento. Como esta función estaba incrustada en una clase, no me resultó claro cómo escribir el tipo de envoltorio que sugerían otras respuestas.

Resolví este problema usando un contenedor diferente que toma una tupla / lista, donde el primer elemento es la función, y los elementos restantes son los argumentos para esa función, llamada eval_func_tuple (f_args). Usando esto, la línea problemática puede ser reemplazada por return pool.map (eval_func_tuple, itertools.izip (itertools.repeat (self.f), list1, list2)). Aquí está el código completo:

Archivo: util.py

def add(a, b): return a+b def eval_func_tuple(f_args): """Takes a tuple of a function and args, evaluates and returns result""" return f_args[0](*f_args[1:])

Archivo: main.py

from multiprocessing import Pool import itertools import util pool = Pool() class Example(object): def __init__(self, my_add): self.f = my_add def add_lists(self, list1, list2): # The following line will now work return pool.map(util.eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)) if __name__ == ''__main__'': myExample = Example(util.add) list1 = [1, 2, 3] list2 = [10, 20, 30] print myExample.add_lists(list1, list2)

Ejecutar main.py dará [11, 22, 33]. Siéntase libre de mejorar esto, por ejemplo eval_func_tuple también podría ser modificado para tomar argumentos de palabra clave.

En otra nota, en otras respuestas, la función "parmap" puede hacerse más eficiente para el caso de más procesos que la cantidad de CPU disponibles. Estoy copiando una versión editada a continuación. Esta es mi primera publicación y no estaba seguro de si debería editar directamente la respuesta original. También cambié el nombre de algunas variables.

from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] numProcesses = len(processes) processNum = 0 outputList = [] while processNum < numProcesses: endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses) for proc in processes[processNum:endProcessNum]: proc.start() for proc in processes[processNum:endProcessNum]: proc.join() for proc,c in pipe[processNum:endProcessNum]: outputList.append(proc.recv()) processNum = endProcessNum return outputList if __name__ == ''__main__'': print parmap(lambda x:x**x,range(1,5))


También me molestaron las restricciones sobre qué tipo de funciones podría aceptar pool.map. Escribí lo siguiente para eludir esto. Parece que funciona, incluso para el uso recursivo de parmap.

from multiprocessing import Process, Pipe from itertools import izip def spawn(f): def fun(pipe,x): pipe.send(f(x)) pipe.close() return fun def parmap(f,X): pipe=[Pipe() for x in X] proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)] [p.start() for p in proc] [p.join() for p in proc] return [p.recv() for (p,c) in pipe] if __name__ == ''__main__'': print parmap(lambda x:x**x,range(1,5))


Tomé la respuesta de klaus se''s y aganders3 e hice un módulo documentado que es más legible y contiene en un solo archivo. Simplemente puede agregarlo a su proyecto. ¡Incluso tiene una barra de progreso opcional!

""" The ``processes`` module provides some convenience functions for using parallel processes in python. Adapted from http://.com/a/16071616/287297 Example usage: print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True) Comments: "It spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers so that KeyboardInterupt works as expected." Pitfalls: all the stdouts are sent back to the parent stdout, intertwined. Alternatively, use this fork of multiprocessing: https://github.com/uqfoundation/multiprocess """ # Modules # import multiprocessing from tqdm import tqdm ################################################################################ def apply_function(func_to_apply, queue_in, queue_out): while not queue_in.empty(): num, obj = queue_in.get() queue_out.put((num, func_to_apply(obj))) ################################################################################ def prll_map(func_to_apply, items, cpus=None, verbose=False): # Number of processes to use # if cpus is None: cpus = min(multiprocessing.cpu_count(), 32) # Create queues # q_in = multiprocessing.Queue() q_out = multiprocessing.Queue() # Process list # new_proc = lambda t,a: multiprocessing.Process(target=t, args=a) processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)] # Put all the items (objects) in the queue # sent = [q_in.put((i, x)) for i, x in enumerate(items)] # Start them all # for proc in processes: proc.daemon = True proc.start() # Display progress bar or not # if verbose: results = [q_out.get() for x in tqdm(range(len(sent)))] else: results = [q_out.get() for x in range(len(sent))] # Wait for them to finish # for proc in processes: proc.join() # Return results # return [x for i, x in sorted(results)] ################################################################################ def test(): def slow_square(x): import time time.sleep(2) return x**2 objs = range(20) squares = prll_map(slow_square, objs, 4, verbose=True) print "Result: %s" % squares

EDITAR : Sugerencia de @ alexander-mcfarlane añadida y una función de prueba


class Calculate(object): # Your instance method to be executed def f(self, x, y): return x*y if __name__ == ''__main__'': inp_list = [1,2,3] y = 2 cal_obj = Calculate() pool = Pool(2) results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

Existe la posibilidad de que desee aplicar esta función para cada instancia diferente de la clase. Entonces aquí está la solución para eso también

class Calculate(object): # Your instance method to be executed def __init__(self, x): self.x = x def f(self, y): return self.x*y if __name__ == ''__main__'': inp_list = [Calculate(i) for i in range(3)] y = 2 pool = Pool(2) results = pool.map(lambda x: x.f(y), inp_list)