procesos multitarea ejemplos crear concurrentes con python multiprocessing pickle dill

ejemplos - multitarea en python



¿Qué pueden hacer el multiprocesamiento y el eneldo juntos? (1)

Me gustaría usar la biblioteca de multiprocessing en Python. Tristemente, el multiprocessing utiliza pickle que no admite funciones con cierres, lambdas o funciones en __main__ . Los tres de estos son importantes para mí

In [1]: import pickle In [2]: pickle.dumps(lambda x: x) PicklingError: Can''t pickle <function <lambda> at 0x23c0e60>: it''s not found as __main__.<lambda>

Afortunadamente hay dill un dill más robusto. Aparentemente el dill realiza magia en la importación para que el trabajo de pickle

In [3]: import dill In [4]: pickle.dumps(lambda x: x) Out[4]: "cdill.dill/n_load_type/np0/n(S''FunctionType''/np1 ...

Esto es muy alentador, particularmente porque no tengo acceso al código fuente de multiprocesamiento. Lamentablemente, todavía no puedo hacer funcionar este ejemplo tan básico

import multiprocessing as mp import dill p = mp.Pool(4) print p.map(lambda x: x**2, range(10))

¿Por qué es esto? ¿Qué me estoy perdiendo? ¿Cuáles son exactamente las limitaciones de la combinación de multiprocessing + dill ?

Edición temporal para JF Sebastian

mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py Temporary Edit for J.F Sebastian mrockli@mrockli-notebook:~/workspace/toolz$ python testmp.py Exception in thread Thread-2: Traceback (most recent call last): File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed ^C ...lots of junk... [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run()Exception in thread Thread-2: Traceback (most recent call last): File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 808, in __bootstrap_inner self.run() File "/home/mrockli/Software/anaconda/lib/python2.7/threading.py", line 761, in run self.__target(*self.__args, **self.__kwargs) File "/home/mrockli/Software/anaconda/lib/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed ^C ...lots of junk... [DEBUG/MainProcess] cleaning up worker 3 [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [DEBUG/MainProcess] cleaning up worker 0 [DEBUG/MainProcess] added worker [DEBUG/MainProcess] added worker [INFO/PoolWorker-5] child process calling self.run() [INFO/PoolWorker-6] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-7] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/PoolWorker-8] child process calling self.run()


multiprocessing hace algunas malas elecciones sobre el decapado. No me malinterprete, toma algunas buenas decisiones que le permiten agrupar ciertos tipos para que puedan usarse en la función de mapa de un grupo. Sin embargo, dado que tenemos dill que puede hacer el decapado, el propio encurtido de multiprocesamiento se vuelve un poco limitante. En realidad, si el multiprocessing fuera a usar pickle lugar de cPickle ... y también eliminar algunas de sus propias anulaciones de decapado, el dill podría tomar el control y proporcionar una serialización mucho más completa para el multiprocessing .

Hasta que eso ocurra, hay una bifurcación de multiprocessing llamada pathos (la versión de lanzamiento es un poco obsoleta, por desgracia) que elimina las limitaciones anteriores. Pathos también agrega algunas características interesantes que el multiprocesamiento no tiene, como múltiples arcos en la función de mapa. Pathos debe lanzarse, después de algunas actualizaciones suaves, principalmente la conversión a python 3.x.

Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import dill >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

y solo para presumir un poco de lo que pathos.multiprocessing puede hacer ...

>>> def busy_add(x,y, delay=0.01): ... for n in range(x): ... x += n ... for n in range(y): ... y -= n ... import time ... time.sleep(delay) ... return x + y ... >>> def busy_squared(x): ... import time, random ... time.sleep(2*random.random()) ... return x*x ... >>> def squared(x): ... return x*x ... >>> def quad_factory(a=1, b=1, c=0): ... def quad(x): ... return a*x**2 + b*x + c ... return quad ... >>> square_plus_one = quad_factory(2,0,1) >>> >>> def test1(pool): ... print pool ... print "x: %s/n" % str(x) ... print pool.map.__name__ ... start = time.time() ... res = pool.map(squared, x) ... print "time to results:", time.time() - start ... print "y: %s/n" % str(res) ... print pool.imap.__name__ ... start = time.time() ... res = pool.imap(squared, x) ... print "time to queue:", time.time() - start ... start = time.time() ... res = list(res) ... print "time to results:", time.time() - start ... print "y: %s/n" % str(res) ... print pool.amap.__name__ ... start = time.time() ... res = pool.amap(squared, x) ... print "time to queue:", time.time() - start ... start = time.time() ... res = res.get() ... print "time to results:", time.time() - start ... print "y: %s/n" % str(res) ... >>> def test2(pool, items=4, delay=0): ... _x = range(-items/2,items/2,2) ... _y = range(len(_x)) ... _d = [delay]*len(_x) ... print map ... res1 = map(busy_squared, _x) ... res2 = map(busy_add, _x, _y, _d) ... print pool.map ... _res1 = pool.map(busy_squared, _x) ... _res2 = pool.map(busy_add, _x, _y, _d) ... assert _res1 == res1 ... assert _res2 == res2 ... print pool.imap ... _res1 = pool.imap(busy_squared, _x) ... _res2 = pool.imap(busy_add, _x, _y, _d) ... assert list(_res1) == res1 ... assert list(_res2) == res2 ... print pool.amap ... _res1 = pool.amap(busy_squared, _x) ... _res2 = pool.amap(busy_add, _x, _y, _d) ... assert _res1.get() == res1 ... assert _res2.get() == res2 ... print "" ... >>> def test3(pool): # test against a function that should fail in pickle ... print pool ... print "x: %s/n" % str(x) ... print pool.map.__name__ ... start = time.time() ... res = pool.map(square_plus_one, x) ... print "time to results:", time.time() - start ... print "y: %s/n" % str(res) ... >>> def test4(pool, maxtries, delay): ... print pool ... m = pool.amap(busy_add, x, x) ... tries = 0 ... while not m.ready(): ... time.sleep(delay) ... tries += 1 ... print "TRY: %s" % tries ... if tries >= maxtries: ... print "TIMEOUT" ... break ... print m.get() ... >>> import time >>> x = range(18) >>> delay = 0.01 >>> items = 20 >>> maxtries = 20 >>> from pathos.multiprocessing import ProcessingPool as Pool >>> pool = Pool(nodes=4) >>> test1(pool) <pool ProcessingPool(ncpus=4)> x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] map time to results: 0.0553691387177 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] imap time to queue: 7.91549682617e-05 time to results: 0.102381229401 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] amap time to queue: 7.08103179932e-05 time to results: 0.0489699840546 y: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289] >>> test2(pool, items, delay) <built-in function map> <bound method ProcessingPool.map of <pool ProcessingPool(ncpus=4)>> <bound method ProcessingPool.imap of <pool ProcessingPool(ncpus=4)>> <bound method ProcessingPool.amap of <pool ProcessingPool(ncpus=4)>> >>> test3(pool) <pool ProcessingPool(ncpus=4)> x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17] map time to results: 0.0523059368134 y: [1, 3, 9, 19, 33, 51, 73, 99, 129, 163, 201, 243, 289, 339, 393, 451, 513, 579] >>> test4(pool, maxtries, delay) <pool ProcessingPool(ncpus=4)> TRY: 1 TRY: 2 TRY: 3 TRY: 4 TRY: 5 TRY: 6 TRY: 7 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34]