starmap set_start_method parallel from python threadpool multiprocessing pickle

set_start_method - python pool process



multiprocessing.Pool-PicklingError: No puede decaparse<tipo ''thread.lock''>: atributo de búsqueda thread.lock falló (2)

Después de mucho cavar en un problema similar ...

También resulta que CUALQUIER objeto que contenga un objeto threading.Condition () NUNCA trabajará NUNCA con multiprocesamiento.Pool.

Aquí hay un ejemplo

import multiprocessing as mp import threading class MyClass(object): def __init__(self): self.cond = threading.Condition() def foo(mc): pass pool=mp.Pool() mc=MyClass() pool.map(foo,(mc,))

Corrí esto con Python 2.7.5 y golpeé el mismo error:

Exception in thread Thread-2: Traceback (most recent call last): File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner self.run() File "/usr/lib64/python2.7/threading.py", line 764, in run self.__target(*self.__args, **self.__kwargs) File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks put(task) PicklingError: Can''t pickle <type ''thread.lock''>: attribute lookup thread.lock failed

Pero luego lo ejecutó en Python 3.4.1 y este problema se ha solucionado.

Aunque todavía no he encontrado ninguna solución útil para aquellos de nosotros que todavía estamos en 2.7.x.

multiprocessing.Pool me está volviendo loco ...
Quiero actualizar muchos paquetes, y para cada uno de ellos tengo que comprobar si hay una versión mayor o no. Esto se hace mediante la función check_one .
El código principal está en el método Updater.update : allí creo el objeto Pool y llamo al método map() .

Aquí está el código:

def check_one(args): res, total, package, version = args i = res.qsize() logger.info(''/r[{0:.1%} - {1}, {2} / {3}]'', i / float(total), package, i, total, addn=False) try: json = PyPIJson(package).retrieve() new_version = Version(json[''info''][''version'']) except Exception as e: logger.error(''Error: Failed to fetch data for {0} ({1})'', package, e) return if new_version > version: res.put_nowait((package, version, new_version, json)) class Updater(FileManager): # __init__ and other methods... def update(self): logger.info(''Searching for updates'') packages = Queue.Queue() data = ((packages, self.set_len, dist.project_name, Version(dist.version)) / for dist in self.working_set) pool = multiprocessing.Pool() pool.map(check_one, data) pool.close() pool.join() while True: try: package, version, new_version, json = packages.get_nowait() except Queue.Empty: break txt = ''A new release is avaiable for {0}: {1!s} (old {2}), update''.format(package, new_version, version) u = logger.ask(txt, bool=(''upgrade version'', ''keep working version''), dont_ask=self.yes) if u: self.upgrade(package, json, new_version) else: logger.info(''{0} has not been upgraded'', package) self._clean() logger.success(''Updating finished successfully'')

Cuando lo ejecuto me sale este error extraño:

Searching for updates Exception in thread Thread-1: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 505, in run self.__target(*self.__args, **self.__kwargs) File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks put(task) PicklingError: Can''t pickle <type ''thread.lock''>: attribute lookup thread.lock failed


el multiprocesamiento pasa las tareas (que incluyen check_one y data ) a los procesos de trabajo a través de un Queue.Queue . Todo lo que se ponga en la Queue.Queue debe ser seleccionable. Queue sí mismas no son seleccionables:

import multiprocessing as mp import Queue def foo(queue): pass pool=mp.Pool() q=Queue.Queue() pool.map(foo,(q,))

produce esta excepción:

UnpickleableError: Cannot pickle <type ''thread.lock''> objects

Sus data incluyen packages , que es un Queue.Queue. Esa podría ser la fuente del problema.

Aquí hay una posible solución: La Queue se está utilizando para dos propósitos:

  1. para averiguar el tamaño aproximado (llamando a qsize )
  2. para almacenar los resultados para su posterior recuperación.

En lugar de llamar a qsize , para compartir un valor entre múltiples procesos, podríamos usar un mp.Value .

En lugar de almacenar los resultados en una cola, podemos (y debemos) simplemente devolver los valores de las llamadas a check_one . El pool.map recopila los resultados en una cola de su propia creación y devuelve los resultados como el valor de retorno de pool.map .

Por ejemplo:

import multiprocessing as mp import Queue import random import logging # logger=mp.log_to_stderr(logging.DEBUG) logger = logging.getLogger(__name__) qsize = mp.Value(''i'', 1) def check_one(args): total, package, version = args i = qsize.value logger.info(''/r[{0:.1%} - {1}, {2} / {3}]''.format( i / float(total), package, i, total)) new_version = random.randrange(0,100) qsize.value += 1 if new_version > version: return (package, version, new_version, None) else: return None def update(): logger.info(''Searching for updates'') set_len=10 data = ( (set_len, ''project-{0}''.format(i), random.randrange(0,100)) for i in range(set_len) ) pool = mp.Pool() results = pool.map(check_one, data) pool.close() pool.join() for result in results: if result is None: continue package, version, new_version, json = result txt = ''A new release is avaiable for {0}: {1!s} (old {2}), update''.format( package, new_version, version) logger.info(txt) logger.info(''Updating finished successfully'') if __name__==''__main__'': logging.basicConfig(level=logging.DEBUG) update()