python - multitarea - PicklingError cuando se utiliza multiprocesamiento
multitarea en python (1)
Tengo problemas al utilizar Pool.map_async () (y también Pool.map ()) en el módulo de multiprocesamiento. He implementado una función de paralelo para bucle que funciona bien siempre y cuando la entrada de la función a Pool.map_async sea una función "regular". Cuando la función es, por ejemplo, un método para una clase, obtengo un PicklingError:
cPickle.PicklingError: Can''t pickle <type ''function''>: attribute lookup __builtin__.function failed
Uso Python solo para computación científica, por lo que no estoy tan familiarizado con el concepto de decapado, acabo de aprender un poco sobre él hoy. He analizado un par de respuestas anteriores, como No puedo decapitar <tipo ''instancemethod''> al usar Pool.map () de multiproceso de python , pero no puedo descubrir cómo hacerlo funcionar, incluso cuando se sigue el enlace que se proporciona en el responder.
Mi código, cuyo objetivo es simular un vector de rv Normal con el uso de múltiples núcleos. Tenga en cuenta que esto es solo un ejemplo y tal vez ni siquiera tenga resultados para ejecutarse en varios núcleos.
import multiprocessing as mp
import scipy as sp
import scipy.stats as spstat
def parfor(func, args, static_arg = None, nWorkers = 8, chunksize = None):
"""
Purpose: Evaluate function using Multiple cores.
Input:
func - Function to evaluate in parallel
arg - Array of arguments to evaluate func(arg)
static_arg - The "static" argument (if any), i.e. the variables that are constant in the evaluation of func.
nWorkers - Number of Workers to process computations.
Output:
func(i, static_arg) for i in args.
"""
# Prepare arguments for func: Collect arguments with static argument (if any)
if static_arg != None:
arguments = [[arg] + static_arg for arg in list(args)]
else:
arguments = args
# Initialize workers
pool = mp.Pool(processes = nWorkers)
# Evaluate function
result = pool.map_async(func, arguments, chunksize = chunksize)
pool.close()
pool.join()
return sp.array(result.get()).flatten()
# First test-function. Freeze location and scale for the Normal random variates generator.
# This returns a function that is a method of the class Norm_gen. Methods cannot be pickled
# so this will give an error.
def genNorm(loc, scale):
def subfunc(a):
return spstat.norm.rvs(loc = loc, scale = scale, size = a)
return subfunc
# Second test-function. The same as above but does not return a method of a class. This is a "plain" function and can be
# pickled
def test(fargs):
x, a, b = fargs
return spstat.norm.rvs(size = x, loc = a, scale = b)
# Try it out.
N = 1000000
# Set arguments to function. args1 = [1, 1, 1,... ,1], the purpose is just to generate a random variable of size 1 for each
# element in the output vector.
args1 = sp.ones(N)
static_arg = [0, 1] # standarized normal.
# This gives the PicklingError
func = genNorm(*static_arg)
sim = parfor(func, args1, static_arg = None, nWorkers = 12, chunksize = None)
# This is OK:
func = test
sim = parfor(func, args1, static_arg = static_arg, nWorkers = 12, chunksize = None)
Siguiendo el enlace proporcionado en la respuesta a la pregunta en Can''t pickle <type ''instancemethod''> cuando se usa Pool.map () de multiproceso de python , Steven Bethard (casi al final) sugiere usar el módulo copy_reg. Su código es:
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
import copy_reg
import types
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
Realmente no entiendo cómo puedo hacer uso de esto. Lo único que se me ocurrió fue ponerlo justo antes de mi código, pero no sirvió de nada. Una solución simple es, por supuesto, simplemente ir con la que funciona y evitar involucrarse con copy_reg. Estoy más interesado en conseguir que copy_reg funcione correctamente para aprovechar al máximo el multiprocesamiento sin tener que solucionar el problema cada vez.
Gracias por su ayuda, es muy apreciado.
Matias
El problema aquí es menos del mensaje de error "pickle" que el conceptual: el multiprocesamiento bifurca su código en diferentes procesos de "trabajo" para realizar su magia.
A continuación, envía datos hacia y desde el proceso diferente mediante la serialización perfecta y la deserialización de los datos (esa es la parte que utiliza el pickle).
Cuando parte de los datos pasados de un lado a otro es una función, se supone que existe una función con el mismo nombre en el proceso del destinatario, y (supongo) pasa el nombre de la función, como una cadena. Como las funciones no tienen estado, el proceso de trabajo llamado simplemente llama a esa misma función con los datos que ha recibido. (Las funciones de Python no se pueden serializar mediante pickle, por lo que solo se pasa la referencia entre el maestro y los procesos de trabajo)
Cuando su función es un método en una instancia, aunque cuando codificamos python es muy similar a una función, con una variable automática "automática", no es la misma por debajo. Porque las instancias (objetos) son de estado. Eso significa que el proceso de trabajo no tiene una copia del objeto que es el propietario del método que desea llamar en el otro lado.
Tampoco funcionará la forma de pasar su método como una función a la llamada map_async, ya que el multiprocesamiento solo usa una referencia de función, no la función real cuando se pasa.
Por lo tanto, debe (1) cambiar su código para que pase una función, y no un método, a los procesos de trabajo, convirtiendo los estados que el objeto mantiene en nuevos parámetros para llamar. (2) Cree una función de "destino" para la llamada map_async que reconstruye el objeto necesario en el lado del proceso de trabajo y luego llama a la función dentro de él. La mayoría de las clases directas en Python son seleccionables por sí mismas, por lo que podría pasar el objeto que es el propietario de la función en la llamada map_async, y la función "objetivo" llamaría al método apropiado en sí mismo en el lado del trabajador.
(2) puede sonar "difícil", pero probablemente sea algo como esto, a menos que la clase de su objeto no pueda ser encurtida:
import types
def target(object, *args, **kw):
method_name = args[0]
return getattr(object, method_name)(*args[1:])
(...)
#And add these 3 lines prior to your map_async call:
# Evaluate function
if isinstance (func, types.MethodType):
arguments.insert(0, func.__name__)
func = target
result = pool.map_async(func, arguments, chunksize = chunksize)
* descargo de responsabilidad: no he probado esto