python - example - picklingerror can t pickle
Error de decapado por multiproceso de Python (7)
Esta solución solo requiere la instalación de eneldo y no hay otras bibliotecas como pathos
def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
"""
Unpack dumped function as target function and call it with arguments.
:param (dumped_function, item, args, kwargs):
a tuple of dumped function and its arguments
:return:
result of target function
"""
target_function = dill.loads(dumped_function)
res = target_function(item, *args, **kwargs)
return res
def pack_function_for_map(target_function, items, *args, **kwargs):
"""
Pack function and arguments to object that can be sent from one
multiprocessing.Process to another. The main problem is:
«multiprocessing.Pool.map*» or «apply*»
cannot use class methods or closures.
It solves this problem with «dill».
It works with target function as argument, dumps it («with dill»)
and returns dumped function with arguments of target function.
For more performance we dump only target function itself
and don''t dump its arguments.
How to use (pseudo-code):
~>>> import multiprocessing
~>>> images = [...]
~>>> pool = multiprocessing.Pool(100500)
~>>> features = pool.map(
~... *pack_function_for_map(
~... super(Extractor, self).extract_features,
~... images,
~... type=''png''
~... **options,
~... )
~... )
~>>>
:param target_function:
function, that you want to execute like target_function(item, *args, **kwargs).
:param items:
list of items for map
:param args:
positional arguments for target_function(item, *args, **kwargs)
:param kwargs:
named arguments for target_function(item, *args, **kwargs)
:return: tuple(function_wrapper, dumped_items)
It returs a tuple with
* function wrapper, that unpack and call target function;
* list of packed target function and its'' arguments.
"""
dumped_function = dill.dumps(target_function)
dumped_items = [(dumped_function, item, args, kwargs) for item in items]
return apply_packed_function_for_map, dumped_items
También funciona para matrices numpy.
Lamento no poder reproducir el error con un ejemplo más simple y mi código es demasiado complicado para publicar. Si ejecuto el programa en el shell de IPython en lugar del python normal, las cosas funcionan bien.
Busqué algunas notas anteriores sobre este problema. Todos fueron causados por el uso de la función pool to call definida dentro de una función de clase. Pero este no es el caso para mí.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed
Apreciaría cualquier ayuda.
ACTUALIZACIÓN: La función I pickle se define en el nivel superior del módulo. Aunque llama a una función que contiene una función anidada. es decir, f () llama a g () llama a h () que tiene una función anidada i (), y estoy llamando a pool.apply_async (f). f (), g (), h () están todos definidos en el nivel superior. Intenté un ejemplo más simple con este patrón y funciona bien.
Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed
Este error también aparecerá si tiene alguna función incorporada dentro del objeto modelo que se pasó al trabajo asincrónico.
Así que asegúrese de verificar que los objetos modelo que se pasan no tengan funciones incorporadas. (En nuestro caso, estábamos usando la función FieldTracker()
de django-model-utils dentro del modelo para rastrear un determinado campo). Aquí está el link al problema relevante de GitHub.
¿Estás pasando una serie nudosa de cuerdas por casualidad?
He tenido el mismo error exacto cuando paso una matriz que contiene una cadena vacía. Creo que puede deberse a este error: http://projects.scipy.org/numpy/ticket/1658
Aquí hay una lista de lo que puede ser escabechado . En particular, las funciones solo se pueden seleccionar si están definidas en el nivel superior de un módulo.
Este pedazo de código:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
produce un error casi idéntico al que publicaste:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed
El problema es que todos los métodos de pool
usan una queue.Queue
para pasar tareas a los procesos de trabajo. Todo lo que pasa a través de queue.Queue
debe ser seleccionable, y foo.work
no es seleccionable, ya que no está definido en el nivel superior del módulo.
Se puede solucionar definiendo una función en el nivel superior, que llama a foo.work()
:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
Tenga en cuenta que foo
es seleccionable, ya que Foo
se define en el nivel superior y foo.__dict__
es seleccionable.
Como otros han dicho, el multiprocessing
solo puede transferir objetos de Python a procesos de trabajo que pueden ser escamados. Si no puede reorganizar su código como se describe en unutbu, puede usar las capacidades extendidas de decapado / descamado de eneldo para transferir datos (especialmente datos de código) como se muestra a continuación.
Esta solución solo requiere la instalación de dill
y ninguna otra biblioteca como pathos
:
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()
Descubrí que también puedo generar exactamente esa salida de error en una pieza de código que funciona perfectamente al intentar usar el generador de perfiles.
Tenga en cuenta que esto fue en Windows (donde el bifurcación es un poco menos elegante).
Estaba corriendo:
python -m profile -o output.pstats <script>
Y descubrió que eliminar el perfil eliminó el error y restaurarlo restaurando el perfil. También me estaba volviendo loco porque sabía que el código solía funcionar. Estaba revisando para ver si algo había actualizado pool.py ... luego tuve una sensación de depresión y eliminé el perfil y eso fue todo.
Publicando aquí para los archivos en caso de que alguien más se encuentre con él.
Utilizaría pathos.multiprocesssing
, en lugar de multiprocessing
. pathos.multiprocessing
es una bifurcación de multiprocessing
que usa dill
. dill
puede serializar casi cualquier cosa en python, por lo que puede enviar mucho más en paralelo. La bifurcación de pathos
también tiene la capacidad de trabajar directamente con múltiples funciones de argumento, como lo necesita para los métodos de clase.
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
Obtén pathos
(y si quieres, dill
) aquí: https://github.com/uqfoundation