threading starmap set_start_method multiple ejemplos python python-2.7 multiprocessing pickle

set_start_method - starmap python multiprocessing



¿Por qué puedo pasar un método de instancia a multiprocessing.Process, pero no a multiprocessing.Pool? (3)

Aquí hay una alternativa que uso a veces, y funciona en Python2.x:

Puede crear una especie de "alias" de nivel superior para los métodos de instancia, que acepte un objeto cuyos métodos de instancia desee ejecutar en un grupo y haga que llame a los métodos de instancia por usted:

import functools import multiprocessing def _instance_method_alias(obj, arg): """ Alias for instance method that allows the method to be called in a multiprocessing pool """ obj.instance_method(arg) return class MyClass(object): """ Our custom class whose instance methods we want to be able to use in a multiprocessing pool """ def __init__(self): self.my_string = "From MyClass: {}" def instance_method(self, arg): """ Some arbitrary instance method """ print(self.my_string.format(arg)) return # create an object of MyClass obj = MyClass() # use functools.partial to create a new method that always has the # MyClass object passed as its first argument _bound_instance_method_alias = functools.partial(_instance_method_alias, obj) # create our list of things we will use the pool to map l = [1,2,3] # create the pool of workers pool = multiprocessing.Pool() # call pool.map, passing it the newly created function pool.map(_bound_instance_method_alias, l) # cleanup pool.close() pool.join()

Este código produce esta salida:

Desde MyClass: 1
Desde MyClass: 2
Desde MyClass: 3

Una limitación es que no puede usar esto para métodos que modifican el objeto. Cada proceso obtiene una copia del objeto sobre el que está llamando a los métodos, por lo que los cambios no se propagarán al proceso principal. Sin embargo, si no necesita modificar el objeto de los métodos que está llamando, esta puede ser una solución simple.

Estoy tratando de escribir una aplicación que aplique una función simultáneamente con un multiprocessing.Pool . Me gustaría que esta función sea un método de instancia (para poder definirla de manera diferente en diferentes subclases). Esto no parece ser posible; Como he aprendido en otra parte, los métodos aparentemente vinculados no se pueden encurtir . Entonces, ¿por qué comenzar un multiprocessing.Process con un método enlazado como objetivo funciona? El siguiente código:

import multiprocessing def test1(): print "Hello, world 1" def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print answer print for answer in pool.imap(self.square, range(10)): print answer def test2(self): print "Hello, world 2" def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main()

Produce esta salida:

Hello, world 1 Hello, world 2 1 2 3 4 5 6 7 8 9 10 Exception in thread Thread-2: Traceback (most recent call last): File "C:/Python27/Lib/threading.py", line 551, in __bootstrap_inner self.run() File "C:/Python27/Lib/threading.py", line 504, in run self.__target(*self.__args, **self.__kwargs) File "C:/Python27/Lib/multiprocessing/pool.py", line 319, in _handle_tasks put(task) PicklingError: Can''t pickle <type ''instancemethod''>: attribute lookup __builtin__.instancemethod failed

¿Por qué los procesos pueden manejar métodos enlazados, pero no agrupaciones?


Aquí hay una manera más fácil de trabajar en Python 2, simplemente ajuste el método de instancia original. Funciona bien en MacOSX y Linux, no funciona en Windows, probó Python 2.7

from multiprocessing import Pool class Person(object): def __init__(self): self.name = ''Weizhong Tu'' def calc(self, x): print self.name return x ** 5 def func(x, p=Person()): return p.calc(x) pool = Pool() print pool.map(func, range(10))


El módulo pickle normalmente no puede elegir métodos de instancia:

>>> import pickle >>> class A(object): ... def z(self): print "hi" ... >>> a = A() >>> pickle.dumps(a.z) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps Pickler(file, protocol).dump(obj) File "/usr/local/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/local/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex raise TypeError, "can''t pickle %s objects" % base.__name__ TypeError: can''t pickle instancemethod objects

Sin embargo, el módulo de multiprocessing tiene un Pickler personalizado que agrega un código para habilitar esta función :

# # Try making some callable types picklable # from pickle import Pickler class ForkingPickler(Pickler): dispatch = Pickler.dispatch.copy() @classmethod def register(cls, type, reduce): def dispatcher(self, obj): rv = reduce(obj) self.save_reduce(obj=obj, *rv) cls.dispatch[type] = dispatcher def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

Puede replicar esto usando el módulo copy_reg para verlo funcionar por usted mismo:

>>> import copy_reg >>> def _reduce_method(m): ... if m.im_self is None: ... return getattr, (m.im_class, m.im_func.func_name) ... else: ... return getattr, (m.im_self, m.im_func.func_name) ... >>> copy_reg.pickle(type(a.z), _reduce_method) >>> pickle.dumps(a.z) "c__builtin__/ngetattr/np0/n(ccopy_reg/n_reconstructor/np1/n(c__main__/nA/np2/nc__builtin__/nobject/np3/nNtp4/nRp5/nS''z''/np6/ntp7/nRp8/n."

Cuando usa Process.start para generar un nuevo proceso en Windows, selecciona todos los parámetros que pasó al proceso secundario utilizando este ForkingPickler personalizado :

# # Windows # else: # snip... from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) # # We define a Popen class similar to the one from subprocess, but # whose constructor takes a process object as its argument. # class Popen(object): '''''' Start a subprocess to run the code of a process object '''''' _tls = thread._local() def __init__(self, process_obj): # create pipe for communication with child rfd, wfd = os.pipe() # get handle for read end of the pipe and make it inheritable ... # start process ... # set attributes of self ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, ''wb'') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close()

Tenga en cuenta la sección "enviar información al niño". Está utilizando la función de dump , que utiliza ForkingPickler para ForkingPickler los datos, lo que significa que su método de instancia se puede encurtir.

Ahora, cuando usa métodos en multiprocessing.Pool para enviar un método a un proceso secundario, está usando un multiprocessing.Pipe Pipe para encurtir los datos. En Python 2.7, multiprocessing.Pipe se implementa en C y llama a pickle_dumps directamente , por lo que no aprovecha el ForkingPickler . Eso significa que el método de encurtido no funciona.

Sin embargo, si usa copy_reg para registrar el tipo de copy_reg de instancemethod , en lugar de un Pickler personalizado, todos los intentos de pickling se verán afectados. Entonces puede usar eso para habilitar métodos de instancia de decapado, incluso a través de Pool :

import multiprocessing import copy_reg import types def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _reduce_method) def test1(): print("Hello, world 1") def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print(answer) print for answer in pool.imap(self.square, range(10)): print(answer) def test2(self): print("Hello, world 2") def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main()

Salida:

Hello, world 1 Hello, world 2 GOT (0, 0, (True, 1)) GOT (0, 1, (True, 2)) GOT (0, 2, (True, 3)) GOT (0, 3, (True, 4)) GOT (0, 4, (True, 5)) 1GOT (0, 5, (True, 6)) GOT (0, 6, (True, 7)) 2 GOT (0, 7, (True, 8)) 3 GOT (0, 8, (True, 9)) GOT (0, 9, (True, 10)) 4 5 6 7 8 9 10 GOT (1, 0, (True, 0)) 0 GOT (1, 1, (True, 1)) 1 GOT (1, 2, (True, 4)) 4 GOT (1, 3, (True, 9)) 9 GOT (1, 4, (True, 16)) 16 GOT (1, 5, (True, 25)) 25 GOT (1, 6, (True, 36)) 36 GOT (1, 7, (True, 49)) 49 GOT (1, 8, (True, 64)) 64 GOT (1, 9, (True, 81)) 81 GOT None

También tenga en cuenta que en Python 3.x, pickle puede seleccionar tipos de métodos de instancia de forma nativa, por lo que nada de esto importa más. :)