starmap set_start_method parallelize how from python multithreading multiprocessing pickle pool

set_start_method - python pool starmap



No se puede saltear<type ''instancemethod''> cuando se usa multiprocesamiento Pool.map() (8)

Estoy tratando de usar la función Pool.map() multiprocessing para dividir el trabajo de forma simultánea. Cuando uso el siguiente código, funciona bien:

import multiprocessing def f(x): return x*x def go(): pool = multiprocessing.Pool(processes=4) print pool.map(f, range(10)) if __name__== ''__main__'' : go()

Sin embargo, cuando lo uso en un enfoque más orientado a objetos, no funciona. El mensaje de error que da es:

PicklingError: Can''t pickle <type ''instancemethod''>: attribute lookup __builtin__.instancemethod failed

Esto ocurre cuando el siguiente es mi programa principal:

import someClass if __name__== ''__main__'' : sc = someClass.someClass() sc.go()

y la siguiente es mi clase someClass :

import multiprocessing class someClass(object): def __init__(self): pass def f(self, x): return x*x def go(self): pool = multiprocessing.Pool(processes=4) print pool.map(self.f, range(10))

¿Alguien sabe cuál podría ser el problema, o una forma fácil de evitarlo?


Algunas limitaciones a la solución de Steven Bethard:

Cuando registra su método de clase como una función, sorprendentemente se llama al destructor de su clase cada vez que finaliza el procesamiento de su método. Entonces, si tiene 1 instancia de su clase que llama n veces su método, los miembros pueden desaparecer entre 2 ejecuciones y puede obtener un mensaje malloc: *** error for object 0x...: pointer being freed was not allocated (por ejemplo, abierto archivo de miembro) o pure virtual method called, terminate called without an active exception (lo que significa que la vida útil de un objeto miembro que utilicé fue más corta de lo que pensaba). Lo tengo cuando se trata de n mayor que el tamaño del grupo. Aquí hay un pequeño ejemplo:

from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult # --------- see Stenven''s solution above ------------- from copy_reg import pickle from types import MethodType def _pickle_method(method): func_name = method.im_func.__name__ obj = method.im_self cls = method.im_class return _unpickle_method, (func_name, obj, cls) def _unpickle_method(func_name, obj, cls): for cls in cls.mro(): try: func = cls.__dict__[func_name] except KeyError: pass else: break return func.__get__(obj, cls) class Myclass(object): def __init__(self, nobj, workers=cpu_count()): print "Constructor ..." # multi-processing pool = Pool(processes=workers) async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results def __del__(self): print "... Destructor" def process_obj(self, index): print "object %d" % index return "results" pickle(MethodType, _pickle_method, _unpickle_method) Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once)

Salida:

Constructor ... object 0 object 1 object 2 ... Destructor object 3 ... Destructor object 4 ... Destructor object 5 ... Destructor object 6 ... Destructor object 7 ... Destructor ... Destructor ... Destructor [''results'', ''results'', ''results'', ''results'', ''results'', ''results'', ''results'', ''results''] ... Destructor

El método __call__ no es tan equivalente, porque [Ninguno, ...] se leen de los resultados:

from multiprocessing import Pool, cpu_count from multiprocessing.pool import ApplyResult class Myclass(object): def __init__(self, nobj, workers=cpu_count()): print "Constructor ..." # multiprocessing pool = Pool(processes=workers) async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ] pool.close() # waiting for all results map(ApplyResult.wait, async_results) lst_results=[r.get() for r in async_results] print lst_results def __call__(self, i): self.process_obj(i) def __del__(self): print "... Destructor" def process_obj(self, i): print "obj %d" % i return "result" Myclass(nobj=8, workers=3) # problem !!! the destructor is called nobj times (instead of once), # **and** results are empty !

Entonces ninguno de los dos métodos es satisfactorio ...


El problema es que el multiprocesamiento debe desenterrar las cosas para unirlas entre los procesos, y los métodos vinculados no son seleccionables. La solución (ya sea que la considere "fácil" o no) ;-) es agregar la infraestructura a su programa para permitir que dichos métodos sean escaneados, registrándolo con el método de biblioteca estándar copy_reg .

Por ejemplo, la contribución de Steven Bethard a este hilo (hacia el final del hilo) muestra un enfoque perfectamente viable para permitir el método de decapado / desengrase a través de copy_reg .


En este caso simple, donde someClass.f no hereda ningún dato de la clase y no adjunta nada a la clase, una posible solución sería separar out f , para que pueda ser escabechado:

import multiprocessing def f(x): return x*x class someClass(object): def __init__(self): pass def go(self): pool = multiprocessing.Pool(processes=4) print pool.map(f, range(10))


Hay otro atajo que puede usar, aunque puede ser ineficiente dependiendo de las instancias de su clase.

Como todos han dicho, el problema es que el código de multiprocessing tiene que resumir las cosas que envía a los subprocesos que ha comenzado, y el recolector no hace métodos de instancia.

Sin embargo, en lugar de enviar el método de instancia, puede enviar la instancia de clase real, más el nombre de la función a llamar, a una función ordinaria que luego usa getattr para llamar al método de instancia, creando así el método vinculado en el grupo subproceso. Esto es similar a definir un método __call__ excepto que puede llamar a más de una función miembro.

Robando el código de @ EricH. De su respuesta y anotándolo un poco (lo volví a escribir, de ahí todos los cambios de nombre y tal, por alguna razón esto pareció más fácil que cortar y pegar :-)) para ilustrar toda la magia:

import multiprocessing import os def call_it(instance, name, args=(), kwargs=None): "indirect caller for instance methods and multiprocessing" if kwargs is None: kwargs = {} return getattr(instance, name)(*args, **kwargs) class Klass(object): def __init__(self, nobj, workers=multiprocessing.cpu_count()): print "Constructor (in pid=%d)..." % os.getpid() self.count = 1 pool = multiprocessing.Pool(processes = workers) async_results = [pool.apply_async(call_it, args = (self, ''process_obj'', (i,))) for i in range(nobj)] pool.close() map(multiprocessing.pool.ApplyResult.wait, async_results) lst_results = [r.get() for r in async_results] print lst_results def __del__(self): self.count -= 1 print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count) def process_obj(self, index): print "object %d" % index return "results" Klass(nobj=8, workers=3)

El resultado muestra que, de hecho, el constructor se llama una vez (en el pid original) y el destructor se llama 9 veces (una para cada copia hecha = 2 o 3 veces por pool-worker-process según sea necesario, más una vez en el original proceso). Esto a menudo está bien, como en este caso, ya que el recolector predeterminado hace una copia de la instancia completa y (semi) secretamente vuelve a llenarla, en este caso, al hacer:

obj = object.__new__(Klass) obj.__dict__.update({''count'':1})

Es por eso que aunque el destructor se llama ocho veces en los tres procesos de trabajo, cuenta de 1 a 0 cada vez, pero por supuesto todavía puede meterse en problemas de esta manera. Si es necesario, puede proporcionar su propio __setstate__ :

def __setstate__(self, adict): self.count = adict[''count'']

en este caso, por ejemplo.


También podría definir un __call__() dentro de su someClass() , que llama someClass.go() y luego pasar una instancia de someClass() al grupo. Este objeto es seleccionable y funciona bien (para mí) ...


También podría definir un __call__() dentro de su someClass() , que llama someClass.go() y luego pasar una instancia de someClass() al grupo. Este objeto es seleccionable y funciona bien (para mí) ...

class someClass(object): def __init__(self): pass def f(self, x): return x*x def go(self): p = Pool(4) sc = p.map(self, range(4)) print sc def __call__(self, x): return self.f(x) sc = someClass() sc.go()


Todas estas soluciones son feas porque el multiprocesamiento y el decapado están rotos y limitados a menos que salte fuera de la biblioteca estándar.

Si usa una bifurcación de multiprocessing llamada pathos.multiprocesssing , puede usar directamente clases y métodos de clase en las funciones de map de multiprocesamiento. Esto se debe a que se usa dill lugar de pickle o cPickle , y dill puede serializar casi cualquier cosa en python.

pathos.multiprocessing también proporciona una función de mapa asíncrona ... y puede map funciones con múltiples argumentos (por ejemplo, map(math.pow, [1,2,3], [4,5,6]) )

Ver: ¿Qué pueden hacer el multiprocesamiento y el eneldo juntos?

y: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp >>> p = pp.ProcessPool(4) >>> >>> def add(x,y): ... return x+y ... >>> x = [0,1,2,3] >>> y = [4,5,6,7] >>> >>> p.map(add, x, y) [4, 6, 8, 10] >>> >>> class Test(object): ... def plus(self, x, y): ... return x+y ... >>> t = Test() >>> >>> p.map(Test.plus, [t]*4, x, y) [4, 6, 8, 10] >>> >>> p.map(t.plus, x, y) [4, 6, 8, 10]

Y para ser explícito, puede hacer exactamente lo que quería hacer desde el principio, y puede hacerlo desde el intérprete, si así lo desea.

>>> import pathos.pools as pp >>> class someClass(object): ... def __init__(self): ... pass ... def f(self, x): ... return x*x ... def go(self): ... pool = pp.ProcessPool(4) ... print pool.map(self.f, range(10)) ... >>> sc = someClass() >>> sc.go() [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>>

Obtenga el código aquí: https://github.com/uqfoundation/pathos


Una solución potencialmente trivial para esto es cambiar a usar multiprocessing.dummy . Esta es una implementación basada en subprocesos de la interfaz de multiprocesamiento que no parece tener este problema en Python 2.7. No tengo mucha experiencia aquí, pero este rápido cambio de importación me permitió llamar a apply_async en un método de clase.

Algunos buenos recursos en multiprocessing.dummy :

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/