multitarea - Python: obtención de una traza de un multiprocesamiento. Proceso
multitarea en python (5)
Python 3
En Python 3, ahora el método get
de multiprocessing.pool.Async
devuelve traceback completo, consulte http://bugs.python.org/issue13831 .
Python 2
Utilice traceback.format_exc
(que significa expection formateado) para obtener la cadena de traceback. Sería mucho más conveniente hacer un decorador como a continuación.
def full_traceback(func):
import traceback, functools
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
msg = "{}/n/nOriginal {}".format(e, traceback.format_exc())
raise type(e)(msg)
return wrapper
Ejemplo:
def func0():
raise NameError("func0 exception")
def func1():
return func0()
# Key is here!
@full_traceback
def main(i):
return func1()
if __name__ == ''__main__'':
from multiprocessing import Pool
pool = Pool(4)
try:
results = pool.map_async(main, range(5)).get(1e5)
finally:
pool.close()
pool.join()
El traceback con el decorador:
Traceback (most recent call last):
File "bt.py", line 34, in <module>
results = pool.map_async(main, range(5)).get(1e5)
File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: Exception in func0
Original Traceback (most recent call last):
File "bt.py", line 13, in wrapper
return func(*args, **kwargs)
File "bt.py", line 27, in main
return func1()
File "bt.py", line 23, in func1
return func0()
File "bt.py", line 20, in func0
raise NameError("Exception in func0")
NameError: Exception in func0
El traceback sin el decorador:
Traceback (most recent call last):
File "bt.py", line 34, in <module>
results = pool.map_async(main, range(5)).get(1e5)
File "/opt/anaconda/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
NameError: Exception in func0
Estoy intentando obtener un objeto de rastreo de un multiproceso. Procesar. Desafortunadamente, pasar la información de excepción a través de un conducto no funciona porque los objetos de seguimiento no pueden ser escaneados:
def foo(pipe_to_parent):
try:
raise Exception(''xxx'')
except:
pipe_to_parent.send(sys.exc_info())
to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print traceback.format_exception(*exc_info)
to_child.close()
to_self.close()
Rastrear:
Traceback (most recent call last):
File "/usr/lib/python2.6/multiprocessing/process.py", line 231, in _bootstrap
self.run()
File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run
self._target(*self._args, **self._kwargs)
File "foo", line 7, in foo
to_parent.send(sys.exc_info())
PicklingError: Can''t pickle <type ''traceback''>: attribute lookup __builtin__.traceback failed
¿Hay alguna otra forma de acceder a la información de excepción? Me gustaría evitar pasar la cadena formateada.
Como el multiprocessing
imprime los contenidos de cadena de excepciones generadas en procesos secundarios, puede envolver todo su código de proceso hijo en un intento, excepto que detecta excepciones, formatea los rastreos de la pila relavent y genera una nueva Exception
que contiene toda la información relevante en su cadena:
Un ejemplo de una función que uso con multiprocessing.map
:
def run_functor(functor):
"""
Given a no-argument functor, run it and return its result. We can
use this with multiprocessing.map and map it over a list of job
functors to do them.
Handles getting more than multiprocessing''s pitiful exception output
"""
try:
# This is where you do your actual work
return functor()
except:
# Put all exception text into an exception and raise that
raise Exception("".join(traceback.format_exception(*sys.exc_info())))
Lo que obtienes es un seguimiento de la pila con otra traza de pila formateada como el mensaje de error, que ayuda con la depuración.
Con tblib
puede pasar excepciones envueltas y volver a publicarlas más tarde:
import tblib.pickling_support
tblib.pickling_support.install()
from multiprocessing import Pool
import sys
class ExceptionWrapper(object):
def __init__(self, ee):
self.ee = ee
__, __, self.tb = sys.exc_info()
def re_raise(self):
raise self.ee.with_traceback(self.tb)
# for Python 2 replace the previous line by:
# raise self.ee, None, self.tb
# example how to use ExceptionWrapper
def inverse(i):
"""will fail for i == 0"""
try:
return 1.0 / i
except Exception as e:
return ExceptionWrapper(e)
def main():
p = Pool(1)
results = p.map(inverse, [0, 1, 2, 3])
for result in results:
if isinstance(result, ExceptionWrapper):
result.re_raise()
if __name__ == "__main__":
main()
Por lo tanto, si detecta una excepción en su proceso remoto, envuélvala con ExceptionWrapper
y luego vuelva a pasarla. Llamar a re_reraise
en el proceso principal hará el trabajo.
Esta es una variación de esta excelente respuesta . Ambos dependen de tblib para almacenar el rastreo.
Sin embargo, en lugar de tener que devolver el objeto de excepción (según lo solicitado por el OP), la función de worker
puede dejarse tal como está y se envuelve en try
/ except
para almacenar excepciones para re-raise.
import tblib.pickling_support
tblib.pickling_support.install()
import sys
class DelayedException(Exception):
def __init__(self, ee):
self.ee = ee
__, __, self.tb = sys.exc_info()
super(DelayedException, self).__init__(str(ee))
def re_raise(self):
raise self.ee, None, self.tb
Ejemplo
def worker():
try:
raise ValueError(''Something went wrong.'')
except Exception as e:
raise DelayedException(e)
if __name__ == ''__main__'':
import multiprocessing
pool = multiprocessing.Pool()
try:
pool.imap(worker, [1, 2, 3])
except DelayedException as e:
e.re_raise()
Parece ser difícil hacer el objeto pickleback picklable. Pero solo puede enviar los 2 primeros elementos de sys.exc_info()
y una información de sys.exc_info()
preformada con el método traceback.extract_tb :
import multiprocessing
import sys
import traceback
def foo(pipe_to_parent):
try:
raise Exception(''xxx'')
except:
except_type, except_class, tb = sys.exc_info()
pipe_to_parent.send((except_type, except_class, traceback.extract_tb(tb)))
to_child, to_self = multiprocessing.Pipe()
process = multiprocessing.Process(target = foo, args = (to_self,))
process.start()
exc_info = to_child.recv()
process.join()
print exc_info
to_child.close()
to_self.close()
que te dan:
(<type ''exceptions.Exception''>, Exception(''xxx'',), [(''test_tb.py'', 7, ''foo'', "raise Exception(''xxx'')")])
Y luego, podrá obtener más información acerca de la causa de la excepción (nombre de archivo, número de línea donde excepción levantada, nombre del método y la declaración que plantea la excepción)