procesos multiprocesamiento concurrentes python exception multiprocessing

concurrentes - multiprocesamiento en python



Excepción arrojada en el grupo de multiprocesamiento no detectado (9)

Como ha utilizado apply_sync , creo que el caso de uso es querer realizar algunas tareas de sincronización. Usar la devolución de llamada para el manejo es otra opción. Tenga en cuenta que esta opción solo está disponible para python3.2 y superior y no está disponible en python2.7.

from multiprocessing import Pool def callback(result): print(''success'', result) def callback_error(result): print(''error'', result) def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go, callback=callback, error_callback=callback_error) # You can do another things p.close() p.join()

Parece que cuando se genera una excepción de un proceso multiprocesamiento.Pool, no hay ningún rastro de pila ni ninguna otra indicación de que haya fallado. Ejemplo:

from multiprocessing import Pool def go(): print(1) raise Exception() print(2) p = Pool() p.apply_async(go) p.close() p.join()

imprime 1 y se detiene silenciosamente. Curiosamente, levantar una BaseException funciona. ¿Hay alguna manera de hacer que el comportamiento para todas las excepciones sea el mismo que BaseException?


Dado que ya hay respuestas decentes para multiprocessing.Pool . multiprocessing.Pool disponible, proporcionaré una solución con un enfoque diferente para completar.

Para python >= 3.2 la siguiente solución parece ser la más simple:

from concurrent.futures import ProcessPoolExecutor, wait def go(): print(1) raise Exception() print(2) futures = [] with ProcessPoolExecutor() as p: for i in range(10): futures.append(p.submit(go)) results = [f.result() for f in futures]

Ventajas:

  • muy poco código
  • plantea una excepción en el proceso principal
  • proporciona un seguimiento de la pila
  • sin dependencias externas

Para obtener más información sobre la API, consulte: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

Además, si está enviando una gran cantidad de tareas y desea que su proceso principal falle tan pronto como falle una de sus tareas, puede usar el siguiente fragmento:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed import time def go(): print(1) time.sleep(0.3) raise Exception() print(2) futures = [] with ProcessPoolExecutor(1) as p: for i in range(10): futures.append(p.submit(go)) for f in as_completed(futures): if f.exception() is not None: for f in futures: f.cancel() break [f.result() for f in futures]

Todas las demás respuestas fallan solo una vez que se han ejecutado todas las tareas.


He tenido excepciones de registro de éxito con este decorador:

import traceback, functools, multiprocessing def trace_unhandled_exceptions(func): @functools.wraps(func) def wrapped_func(*args, **kwargs): try: func(*args, **kwargs) except: print ''Exception in ''+func.__name__ traceback.print_exc() return wrapped_func

con el código en la pregunta, es

@trace_unhandled_exceptions def go(): print(1) raise Exception() print(2) p = multiprocessing.Pool(1) p.apply_async(go) p.close() p.join()

Simplemente decore la función que pasa a su grupo de procesos. La clave para este funcionamiento es @functools.wraps(func) contrario, el multiprocesamiento arroja un PicklingError .

el código anterior da

1 Exception in go Traceback (most recent call last): File "<stdin>", line 5, in wrapped_func File "<stdin>", line 4, in go Exception


Intentaré usar pdb:

import pdb import sys def handler(type, value, tb): pdb.pm() sys.excepthook = handler


La solución con más votos en el momento de escribir tiene un problema:

from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() ## waiting here for go() to complete... p.close() p.join()

Como señaló @dfrankow, esperará en x.get() , lo que arruina el punto de ejecutar una tarea de forma asincrónica. Entonces, para una mejor eficiencia (en particular, si su función de trabajador toma mucho tiempo) la cambiaría a:

from multiprocessing import Pool def go(x): print(1) # task_that_takes_a_long_time() raise Exception("Can''t go anywhere.") print(2) return x**2 p = Pool() results = [] for x in range(1000): results.append( p.apply_async(go, [x]) ) p.close() for r in results: r.get()

Ventajas : la función de trabajador se ejecuta de forma asíncrona, por lo que si, por ejemplo, está ejecutando muchas tareas en varios núcleos, será mucho más eficiente que la solución original.

Desventajas : si hay una excepción en la función de trabajador, solo se generará después de que el grupo haya completado todas las tareas. Esto puede o no ser el comportamiento deseable. EDITADO de acuerdo con el comentario de @ colinfang, que solucionó esto.


Tal vez me esté perdiendo algo, pero ¿no es eso lo que devuelve el método get del objeto Result? Ver grupos de procesos .

clase multiprocesamiento.pool.AsyncResult

La clase del resultado devuelto por Pool.apply_async () y Pool.map_async (). Get ([timeout])
Devuelve el resultado cuando llegue. Si el tiempo de espera no es Ninguno y el resultado no llega dentro de un tiempo de espera de segundos, entonces multiprocesamiento.TimeoutError aumenta. Si la llamada remota generó una excepción, get () volverá a generar esa excepción.

Entonces, modificando ligeramente su ejemplo, uno puede hacer

from multiprocessing import Pool def go(): print(1) raise Exception("foobar") print(2) p = Pool() x = p.apply_async(go) x.get() p.close() p.join()

Lo cual da como resultado

1 Traceback (most recent call last): File "rob.py", line 10, in <module> x.get() File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get raise self._value Exception: foobar

Esto no es completamente satisfactorio, ya que no imprime el rastreo, pero es mejor que nada.

ACTUALIZACIÓN: Este error ha sido reparado en Python 3.4, cortesía de Richard Oudkerk. Consulte el tema sobre el método de multiprocesamiento.pool.Async debería devolver el rastreo completo .


Tengo una solución razonable para el problema, al menos para fines de depuración. Actualmente no tengo una solución que eleve la excepción en los procesos principales. Lo primero que pensé fue en usar un decorador, pero solo puedes elegir las funciones definidas en el nivel superior de un módulo , así que eso está bien.

En cambio, una clase de apply_async simple y una subclase Pool que usa esto para apply_async (y, por lo tanto, se apply ). map_async como ejercicio para el lector.

import traceback from multiprocessing.pool import Pool import multiprocessing # Shortcut to multiprocessing''s logger def error(msg, *args): return multiprocessing.get_logger().error(msg, *args) class LogExceptions(object): def __init__(self, callable): self.__callable = callable def __call__(self, *args, **kwargs): try: result = self.__callable(*args, **kwargs) except Exception as e: # Here we add some debugging help. If multiprocessing''s # debugging is on, it will arrange to log the traceback error(traceback.format_exc()) # Re-raise the original exception so the Pool worker can # clean up raise # It was fine, give a normal answer return result class LoggingPool(Pool): def apply_async(self, func, args=(), kwds={}, callback=None): return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) def go(): print(1) raise Exception() print(2) multiprocessing.log_to_stderr() p = LoggingPool(processes=1) p.apply_async(go) p.close() p.join()

Esto me da:

1 [ERROR/PoolWorker-1] Traceback (most recent call last): File "mpdebug.py", line 24, in __call__ result = self.__callable(*args, **kwargs) File "mpdebug.py", line 44, in go raise Exception() Exception


RemoteException.py un módulo RemoteException.py que muestra el rastreo completo de una excepción en un proceso. Python2. RemoteException.py y agregue esto a su código:

import RemoteException @RemoteException.showError def go(): raise Exception(''Error!'') if __name__ == ''__main__'': import multiprocessing p = multiprocessing.Pool(processes = 1) r = p.apply(go) # full traceback is shown here


import logging from multiprocessing import Pool def proc_wrapper(func, *args, **kwargs): """Print exception because multiprocessing lib doesn''t return them right.""" try: return func(*args, **kwargs) except Exception as e: logging.exception(e) raise def go(x): print x raise Exception("foobar") p = Pool() p.apply_async(proc_wrapper, (go, 5)) p.join() p.close()