concurrentes - multiprocesamiento en python
Excepción arrojada en el grupo de multiprocesamiento no detectado (9)
Como ha utilizado apply_sync
, creo que el caso de uso es querer realizar algunas tareas de sincronización. Usar la devolución de llamada para el manejo es otra opción. Tenga en cuenta que esta opción solo está disponible para python3.2 y superior y no está disponible en python2.7.
from multiprocessing import Pool
def callback(result):
print(''success'', result)
def callback_error(result):
print(''error'', result)
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)
# You can do another things
p.close()
p.join()
Parece que cuando se genera una excepción de un proceso multiprocesamiento.Pool, no hay ningún rastro de pila ni ninguna otra indicación de que haya fallado. Ejemplo:
from multiprocessing import Pool
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go)
p.close()
p.join()
imprime 1 y se detiene silenciosamente. Curiosamente, levantar una BaseException funciona. ¿Hay alguna manera de hacer que el comportamiento para todas las excepciones sea el mismo que BaseException?
Dado que ya hay respuestas decentes para multiprocessing.Pool
. multiprocessing.Pool
disponible, proporcionaré una solución con un enfoque diferente para completar.
Para python >= 3.2
la siguiente solución parece ser la más simple:
from concurrent.futures import ProcessPoolExecutor, wait
def go():
print(1)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor() as p:
for i in range(10):
futures.append(p.submit(go))
results = [f.result() for f in futures]
Ventajas:
- muy poco código
- plantea una excepción en el proceso principal
- proporciona un seguimiento de la pila
- sin dependencias externas
Para obtener más información sobre la API, consulte: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
Además, si está enviando una gran cantidad de tareas y desea que su proceso principal falle tan pronto como falle una de sus tareas, puede usar el siguiente fragmento:
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time
def go():
print(1)
time.sleep(0.3)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor(1) as p:
for i in range(10):
futures.append(p.submit(go))
for f in as_completed(futures):
if f.exception() is not None:
for f in futures:
f.cancel()
break
[f.result() for f in futures]
Todas las demás respuestas fallan solo una vez que se han ejecutado todas las tareas.
He tenido excepciones de registro de éxito con este decorador:
import traceback, functools, multiprocessing
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print ''Exception in ''+func.__name__
traceback.print_exc()
return wrapped_func
con el código en la pregunta, es
@trace_unhandled_exceptions
def go():
print(1)
raise Exception()
print(2)
p = multiprocessing.Pool(1)
p.apply_async(go)
p.close()
p.join()
Simplemente decore la función que pasa a su grupo de procesos. La clave para este funcionamiento es @functools.wraps(func)
contrario, el multiprocesamiento arroja un PicklingError
.
el código anterior da
1
Exception in go
Traceback (most recent call last):
File "<stdin>", line 5, in wrapped_func
File "<stdin>", line 4, in go
Exception
Intentaré usar pdb:
import pdb
import sys
def handler(type, value, tb):
pdb.pm()
sys.excepthook = handler
La solución con más votos en el momento de escribir tiene un problema:
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get() ## waiting here for go() to complete...
p.close()
p.join()
Como señaló @dfrankow, esperará en x.get()
, lo que arruina el punto de ejecutar una tarea de forma asincrónica. Entonces, para una mejor eficiencia (en particular, si su función de trabajador toma mucho tiempo) la cambiaría a:
from multiprocessing import Pool
def go(x):
print(1)
# task_that_takes_a_long_time()
raise Exception("Can''t go anywhere.")
print(2)
return x**2
p = Pool()
results = []
for x in range(1000):
results.append( p.apply_async(go, [x]) )
p.close()
for r in results:
r.get()
Ventajas : la función de trabajador se ejecuta de forma asíncrona, por lo que si, por ejemplo, está ejecutando muchas tareas en varios núcleos, será mucho más eficiente que la solución original.
Desventajas : si hay una excepción en la función de trabajador, solo se generará después de que el grupo haya completado todas las tareas. Esto puede o no ser el comportamiento deseable. EDITADO de acuerdo con el comentario de @ colinfang, que solucionó esto.
Tal vez me esté perdiendo algo, pero ¿no es eso lo que devuelve el método get
del objeto Result? Ver grupos de procesos .
clase multiprocesamiento.pool.AsyncResult
La clase del resultado devuelto por Pool.apply_async () y Pool.map_async (). Get ([timeout])
Devuelve el resultado cuando llegue. Si el tiempo de espera no es Ninguno y el resultado no llega dentro de un tiempo de espera de segundos, entonces multiprocesamiento.TimeoutError aumenta. Si la llamada remota generó una excepción, get () volverá a generar esa excepción.
Entonces, modificando ligeramente su ejemplo, uno puede hacer
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()
Lo cual da como resultado
1
Traceback (most recent call last):
File "rob.py", line 10, in <module>
x.get()
File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
Exception: foobar
Esto no es completamente satisfactorio, ya que no imprime el rastreo, pero es mejor que nada.
ACTUALIZACIÓN: Este error ha sido reparado en Python 3.4, cortesía de Richard Oudkerk. Consulte el tema sobre el método de multiprocesamiento.pool.Async debería devolver el rastreo completo .
Tengo una solución razonable para el problema, al menos para fines de depuración. Actualmente no tengo una solución que eleve la excepción en los procesos principales. Lo primero que pensé fue en usar un decorador, pero solo puedes elegir las funciones definidas en el nivel superior de un módulo , así que eso está bien.
En cambio, una clase de apply_async
simple y una subclase Pool que usa esto para apply_async
(y, por lo tanto, se apply
). map_async
como ejercicio para el lector.
import traceback
from multiprocessing.pool import Pool
import multiprocessing
# Shortcut to multiprocessing''s logger
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)
class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable
def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)
except Exception as e:
# Here we add some debugging help. If multiprocessing''s
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise
# It was fine, give a normal answer
return result
class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)
def go():
print(1)
raise Exception()
print(2)
multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)
p.apply_async(go)
p.close()
p.join()
Esto me da:
1
[ERROR/PoolWorker-1] Traceback (most recent call last):
File "mpdebug.py", line 24, in __call__
result = self.__callable(*args, **kwargs)
File "mpdebug.py", line 44, in go
raise Exception()
Exception
RemoteException.py un módulo RemoteException.py que muestra el rastreo completo de una excepción en un proceso. Python2. RemoteException.py y agregue esto a su código:
import RemoteException
@RemoteException.showError
def go():
raise Exception(''Error!'')
if __name__ == ''__main__'':
import multiprocessing
p = multiprocessing.Pool(processes = 1)
r = p.apply(go) # full traceback is shown here
import logging
from multiprocessing import Pool
def proc_wrapper(func, *args, **kwargs):
"""Print exception because multiprocessing lib doesn''t return them right."""
try:
return func(*args, **kwargs)
except Exception as e:
logging.exception(e)
raise
def go(x):
print x
raise Exception("foobar")
p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()