set_start_method - starmap python multiprocessing
¿Por qué puedo pasar un método de instancia a multiprocessing.Process, pero no a multiprocessing.Pool? (3)
Aquí hay una alternativa que uso a veces, y funciona en Python2.x:
Puede crear una especie de "alias" de nivel superior para los métodos de instancia, que acepte un objeto cuyos métodos de instancia desee ejecutar en un grupo y haga que llame a los métodos de instancia por usted:
import functools
import multiprocessing
def _instance_method_alias(obj, arg):
"""
Alias for instance method that allows the method to be called in a
multiprocessing pool
"""
obj.instance_method(arg)
return
class MyClass(object):
"""
Our custom class whose instance methods we want to be able to use in a
multiprocessing pool
"""
def __init__(self):
self.my_string = "From MyClass: {}"
def instance_method(self, arg):
"""
Some arbitrary instance method
"""
print(self.my_string.format(arg))
return
# create an object of MyClass
obj = MyClass()
# use functools.partial to create a new method that always has the
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)
# create our list of things we will use the pool to map
l = [1,2,3]
# create the pool of workers
pool = multiprocessing.Pool()
# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)
# cleanup
pool.close()
pool.join()
Este código produce esta salida:
Desde MyClass: 1
Desde MyClass: 2
Desde MyClass: 3
Una limitación es que no puede usar esto para métodos que modifican el objeto. Cada proceso obtiene una copia del objeto sobre el que está llamando a los métodos, por lo que los cambios no se propagarán al proceso principal. Sin embargo, si no necesita modificar el objeto de los métodos que está llamando, esta puede ser una solución simple.
Estoy tratando de escribir una aplicación que aplique una función simultáneamente con un
multiprocessing.Pool
.
Me gustaría que esta función sea un método de instancia (para poder definirla de manera diferente en diferentes subclases).
Esto no parece ser posible;
Como he aprendido en otra parte, los
métodos
aparentemente
vinculados no se pueden encurtir
.
Entonces, ¿por qué comenzar un
multiprocessing.Process
con un método enlazado como objetivo funciona?
El siguiente código:
import multiprocessing
def test1():
print "Hello, world 1"
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print answer
print
for answer in pool.imap(self.square, range(10)):
print answer
def test2(self):
print "Hello, world 2"
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
Produce esta salida:
Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:/Python27/Lib/threading.py", line 551, in __bootstrap_inner
self.run()
File "C:/Python27/Lib/threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "C:/Python27/Lib/multiprocessing/pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can''t pickle <type ''instancemethod''>: attribute lookup __builtin__.instancemethod failed
¿Por qué los procesos pueden manejar métodos enlazados, pero no agrupaciones?
Aquí hay una manera más fácil de trabajar en Python 2, simplemente ajuste el método de instancia original. Funciona bien en MacOSX y Linux, no funciona en Windows, probó Python 2.7
from multiprocessing import Pool
class Person(object):
def __init__(self):
self.name = ''Weizhong Tu''
def calc(self, x):
print self.name
return x ** 5
def func(x, p=Person()):
return p.calc(x)
pool = Pool()
print pool.map(func, range(10))
El módulo
pickle
normalmente no puede elegir métodos de instancia:
>>> import pickle
>>> class A(object):
... def z(self): print "hi"
...
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can''t pickle %s objects" % base.__name__
TypeError: can''t pickle instancemethod objects
Sin embargo, el módulo de
multiprocessing
tiene un
Pickler
personalizado que agrega un código para habilitar esta función
:
#
# Try making some callable types picklable
#
from pickle import Pickler
class ForkingPickler(Pickler):
dispatch = Pickler.dispatch.copy()
@classmethod
def register(cls, type, reduce):
def dispatcher(self, obj):
rv = reduce(obj)
self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
Puede replicar esto usando el módulo
copy_reg
para verlo funcionar por usted mismo:
>>> import copy_reg
>>> def _reduce_method(m):
... if m.im_self is None:
... return getattr, (m.im_class, m.im_func.func_name)
... else:
... return getattr, (m.im_self, m.im_func.func_name)
...
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__/ngetattr/np0/n(ccopy_reg/n_reconstructor/np1/n(c__main__/nA/np2/nc__builtin__/nobject/np3/nNtp4/nRp5/nS''z''/np6/ntp7/nRp8/n."
Cuando usa
Process.start
para generar un nuevo proceso en Windows, selecciona
todos los parámetros que pasó al proceso secundario utilizando este
ForkingPickler
personalizado
:
#
# Windows
#
else:
# snip...
from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
#
# We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument.
#
class Popen(object):
''''''
Start a subprocess to run the code of a process object
''''''
_tls = thread._local()
def __init__(self, process_obj):
# create pipe for communication with child
rfd, wfd = os.pipe()
# get handle for read end of the pipe and make it inheritable
...
# start process
...
# set attributes of self
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, ''wb'')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
Tenga en cuenta la sección "enviar información al niño".
Está utilizando la función de
dump
, que utiliza
ForkingPickler
para
ForkingPickler
los datos, lo que significa que su método de instancia se puede encurtir.
Ahora, cuando usa métodos en
multiprocessing.Pool
para enviar un método a un proceso secundario, está usando un
multiprocessing.Pipe
Pipe para encurtir los datos.
En Python 2.7,
multiprocessing.Pipe
se implementa en C
y llama a
pickle_dumps
directamente
, por lo que no aprovecha el
ForkingPickler
.
Eso significa que el método de encurtido no funciona.
Sin embargo, si usa
copy_reg
para registrar el tipo de
copy_reg
de
instancemethod
, en lugar de un
Pickler
personalizado,
todos los
intentos de pickling se verán afectados.
Entonces puede usar eso para habilitar métodos de instancia de decapado, incluso a través de
Pool
:
import multiprocessing
import copy_reg
import types
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)
def test1():
print("Hello, world 1")
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print(answer)
print
for answer in pool.imap(self.square, range(10)):
print(answer)
def test2(self):
print("Hello, world 2")
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
Salida:
Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
1GOT (0, 5, (True, 6))
GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10
GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
GOT (1, 6, (True, 36))
36
GOT (1, 7, (True, 49))
49
GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None
También tenga en cuenta que en Python 3.x,
pickle
puede seleccionar tipos de métodos de instancia de forma nativa, por lo que nada de esto importa más.
:)