procesos multitarea ejemplos crear concurrentes con python exception process multiprocessing traceback

multitarea - Python: obtención de una traza de un multiprocesamiento. Proceso



multitarea en python (5)

Python 3

En Python 3, ahora el método get de multiprocessing.pool.Async devuelve traceback completo, consulte http://bugs.python.org/issue13831 .

Python 2

Utilice traceback.format_exc (que significa expection formateado) para obtener la cadena de traceback. Sería mucho más conveniente hacer un decorador como a continuación.

def full_traceback(func): import traceback, functools @functools.wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except Exception as e: msg = "{}/n/nOriginal {}".format(e, traceback.format_exc()) raise type(e)(msg) return wrapper

Ejemplo:

def func0(): raise NameError("func0 exception") def func1(): return func0() # Key is here! @full_traceback def main(i): return func1() if __name__ == ''__main__'': from multiprocessing import Pool pool = Pool(4) try: results = pool.map_async(main, range(5)).get(1e5) finally: pool.close() pool.join()

El traceback con el decorador:

Traceback (most recent call last): File "bt.py", line 34, in <module> results = pool.map_async(main, range(5)).get(1e5) File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get raise self._value NameError: Exception in func0 Original Traceback (most recent call last): File "bt.py", line 13, in wrapper return func(*args, **kwargs) File "bt.py", line 27, in main return func1() File "bt.py", line 23, in func1 return func0() File "bt.py", line 20, in func0 raise NameError("Exception in func0") NameError: Exception in func0

El traceback sin el decorador:

Traceback (most recent call last): File "bt.py", line 34, in <module> results = pool.map_async(main, range(5)).get(1e5) File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get raise self._value NameError: Exception in func0

Estoy intentando obtener un objeto de rastreo de un multiproceso. Procesar. Desafortunadamente, pasar la información de excepción a través de un conducto no funciona porque los objetos de seguimiento no pueden ser escaneados:

def foo(pipe_to_parent): try: raise Exception(''xxx'') except: pipe_to_parent.send(sys.exc_info()) to_child, to_self = multiprocessing.Pipe() process = multiprocessing.Process(target = foo, args = (to_self,)) process.start() exc_info = to_child.recv() process.join() print traceback.format_exception(*exc_info) to_child.close() to_self.close()

Rastrear:

Traceback (most recent call last): File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap self.run() File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run self._target(*self._args, **self._kwargs) File "foo", line 7, in foo to_parent.send(sys.exc_info()) PicklingError: Can''t pickle <type ''traceback''>: attribute lookup __builtin__.traceback failed

¿Hay alguna otra forma de acceder a la información de excepción? Me gustaría evitar pasar la cadena formateada.


Como el multiprocessing imprime los contenidos de cadena de excepciones generadas en procesos secundarios, puede envolver todo su código de proceso hijo en un intento, excepto que detecta excepciones, formatea los rastreos de la pila relavent y genera una nueva Exception que contiene toda la información relevante en su cadena:

Un ejemplo de una función que uso con multiprocessing.map :

def run_functor(functor): """ Given a no-argument functor, run it and return its result. We can use this with multiprocessing.map and map it over a list of job functors to do them. Handles getting more than multiprocessing''s pitiful exception output """ try: # This is where you do your actual work return functor() except: # Put all exception text into an exception and raise that raise Exception("".join(traceback.format_exception(*sys.exc_info())))

Lo que obtienes es un seguimiento de la pila con otra traza de pila formateada como el mensaje de error, que ayuda con la depuración.


Con tblib puede pasar excepciones envueltas y volver a publicarlas más tarde:

import tblib.pickling_support tblib.pickling_support.install() from multiprocessing import Pool import sys class ExceptionWrapper(object): def __init__(self, ee): self.ee = ee __, __, self.tb = sys.exc_info() def re_raise(self): raise self.ee.with_traceback(self.tb) # for Python 2 replace the previous line by: # raise self.ee, None, self.tb # example how to use ExceptionWrapper def inverse(i): """will fail for i == 0""" try: return 1.0 / i except Exception as e: return ExceptionWrapper(e) def main(): p = Pool(1) results = p.map(inverse, [0, 1, 2, 3]) for result in results: if isinstance(result, ExceptionWrapper): result.re_raise() if __name__ == "__main__": main()

Por lo tanto, si detecta una excepción en su proceso remoto, envuélvala con ExceptionWrapper y luego vuelva a pasarla. Llamar a re_reraise en el proceso principal hará el trabajo.


Esta es una variación de esta excelente respuesta . Ambos dependen de tblib para almacenar el rastreo.

Sin embargo, en lugar de tener que devolver el objeto de excepción (según lo solicitado por el OP), la función de worker puede dejarse tal como está y se envuelve en try / except para almacenar excepciones para re-raise.

import tblib.pickling_support tblib.pickling_support.install() import sys class DelayedException(Exception): def __init__(self, ee): self.ee = ee __, __, self.tb = sys.exc_info() super(DelayedException, self).__init__(str(ee)) def re_raise(self): raise self.ee, None, self.tb

Ejemplo

def worker(): try: raise ValueError(''Something went wrong.'') except Exception as e: raise DelayedException(e) if __name__ == ''__main__'': import multiprocessing pool = multiprocessing.Pool() try: pool.imap(worker, [1, 2, 3]) except DelayedException as e: e.re_raise()


Parece ser difícil hacer el objeto pickleback picklable. Pero solo puede enviar los 2 primeros elementos de sys.exc_info() y una información de sys.exc_info() preformada con el método traceback.extract_tb :

import multiprocessing import sys import traceback def foo(pipe_to_parent): try: raise Exception(''xxx'') except: except_type, except_class, tb = sys.exc_info() pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb))) to_child, to_self = multiprocessing.Pipe() process = multiprocessing.Process(target = foo, args = (to_self,)) process.start() exc_info = to_child.recv() process.join() print exc_info to_child.close() to_self.close()

que te dan:

(<type ''exceptions.Exception''>, Exception(''xxx'',), [(''test_tb.py'', 7, ''foo'', "raise Exception(''xxx'')")])

Y luego, podrá obtener más información acerca de la causa de la excepción (nombre de archivo, número de línea donde excepción levantada, nombre del método y la declaración que plantea la excepción)