set_start_method - python pool process
multiprocessing.Pool-PicklingError: No puede decaparse<tipo ''thread.lock''>: atributo de búsqueda thread.lock falló (2)
Después de mucho cavar en un problema similar ...
También resulta que CUALQUIER objeto que contenga un objeto threading.Condition () NUNCA trabajará NUNCA con multiprocesamiento.Pool.
Aquí hay un ejemplo
import multiprocessing as mp
import threading
class MyClass(object):
def __init__(self):
self.cond = threading.Condition()
def foo(mc):
pass
pool=mp.Pool()
mc=MyClass()
pool.map(foo,(mc,))
Corrí esto con Python 2.7.5 y golpeé el mismo error:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 764, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 342, in _handle_tasks
put(task)
PicklingError: Can''t pickle <type ''thread.lock''>: attribute lookup thread.lock failed
Pero luego lo ejecutó en Python 3.4.1 y este problema se ha solucionado.
Aunque todavía no he encontrado ninguna solución útil para aquellos de nosotros que todavía estamos en 2.7.x.
multiprocessing.Pool
me está volviendo loco ...
Quiero actualizar muchos paquetes, y para cada uno de ellos tengo que comprobar si hay una versión mayor o no. Esto se hace mediante la función check_one
.
El código principal está en el método Updater.update
: allí creo el objeto Pool y llamo al método map()
.
Aquí está el código:
def check_one(args):
res, total, package, version = args
i = res.qsize()
logger.info(''/r[{0:.1%} - {1}, {2} / {3}]'',
i / float(total), package, i, total, addn=False)
try:
json = PyPIJson(package).retrieve()
new_version = Version(json[''info''][''version''])
except Exception as e:
logger.error(''Error: Failed to fetch data for {0} ({1})'', package, e)
return
if new_version > version:
res.put_nowait((package, version, new_version, json))
class Updater(FileManager):
# __init__ and other methods...
def update(self):
logger.info(''Searching for updates'')
packages = Queue.Queue()
data = ((packages, self.set_len, dist.project_name, Version(dist.version)) /
for dist in self.working_set)
pool = multiprocessing.Pool()
pool.map(check_one, data)
pool.close()
pool.join()
while True:
try:
package, version, new_version, json = packages.get_nowait()
except Queue.Empty:
break
txt = ''A new release is avaiable for {0}: {1!s} (old {2}), update''.format(package,
new_version,
version)
u = logger.ask(txt, bool=(''upgrade version'', ''keep working version''), dont_ask=self.yes)
if u:
self.upgrade(package, json, new_version)
else:
logger.info(''{0} has not been upgraded'', package)
self._clean()
logger.success(''Updating finished successfully'')
Cuando lo ejecuto me sale este error extraño:
Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can''t pickle <type ''thread.lock''>: attribute lookup thread.lock failed
el multiprocesamiento pasa las tareas (que incluyen check_one
y data
) a los procesos de trabajo a través de un Queue.Queue
. Todo lo que se ponga en la Queue.Queue
debe ser seleccionable. Queue
sí mismas no son seleccionables:
import multiprocessing as mp
import Queue
def foo(queue):
pass
pool=mp.Pool()
q=Queue.Queue()
pool.map(foo,(q,))
produce esta excepción:
UnpickleableError: Cannot pickle <type ''thread.lock''> objects
Sus data
incluyen packages
, que es un Queue.Queue. Esa podría ser la fuente del problema.
Aquí hay una posible solución: La Queue
se está utilizando para dos propósitos:
- para averiguar el tamaño aproximado (llamando a
qsize
) - para almacenar los resultados para su posterior recuperación.
En lugar de llamar a qsize
, para compartir un valor entre múltiples procesos, podríamos usar un mp.Value
.
En lugar de almacenar los resultados en una cola, podemos (y debemos) simplemente devolver los valores de las llamadas a check_one
. El pool.map
recopila los resultados en una cola de su propia creación y devuelve los resultados como el valor de retorno de pool.map
.
Por ejemplo:
import multiprocessing as mp
import Queue
import random
import logging
# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)
qsize = mp.Value(''i'', 1)
def check_one(args):
total, package, version = args
i = qsize.value
logger.info(''/r[{0:.1%} - {1}, {2} / {3}]''.format(
i / float(total), package, i, total))
new_version = random.randrange(0,100)
qsize.value += 1
if new_version > version:
return (package, version, new_version, None)
else:
return None
def update():
logger.info(''Searching for updates'')
set_len=10
data = ( (set_len, ''project-{0}''.format(i), random.randrange(0,100))
for i in range(set_len) )
pool = mp.Pool()
results = pool.map(check_one, data)
pool.close()
pool.join()
for result in results:
if result is None: continue
package, version, new_version, json = result
txt = ''A new release is avaiable for {0}: {1!s} (old {2}), update''.format(
package, new_version, version)
logger.info(txt)
logger.info(''Updating finished successfully'')
if __name__==''__main__'':
logging.basicConfig(level=logging.DEBUG)
update()