thread - Captura la excepción de un hilo en el hilo de la persona que llama en Python
sincronizar hilos en python (11)
Aunque no es posible detectar directamente una excepción arrojada en un hilo diferente, aquí hay un código para obtener de forma bastante transparente algo muy cercano a esta funcionalidad. El subproceso secundario debe subclase de la clase ExThread
lugar de threading.Thread
Thread y el subproceso principal deben llamar al método child_thread.join_with_exception()
lugar de child_thread.join()
cuando espera que el subproceso termine su tarea.
Detalles técnicos de esta implementación: cuando el hilo hijo arroja una excepción, se pasa al padre a través de una Queue
y se vuelve a lanzar en el hilo padre. Observe que no hay espera ocupada en este enfoque.
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread ''{}''.".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread ''{}'': {}".format(thread_name, ex)
if __name__ == ''__main__'':
main()
Soy muy nuevo en Python y la programación multiproceso en general. Básicamente, tengo un script que copiará los archivos a otra ubicación. Me gustaría que esto se coloque en otro hilo para que pueda dar salida ....
para indicar que el script aún se está ejecutando.
El problema que estoy teniendo es que si los archivos no se pueden copiar lanzará una excepción. Esto está bien si se ejecuta en el hilo principal; sin embargo, tener el siguiente código no funciona:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print "Caught an exception"
En la clase de subprocesos en sí, traté de volver a lanzar la excepción, pero no funciona. He visto a personas aquí hacer preguntas similares, pero todas parecen estar haciendo algo más específico que lo que estoy tratando de hacer (y no entiendo muy bien las soluciones ofrecidas). He visto personas mencionar el uso de sys.exc_info()
, sin embargo, no sé dónde ni cómo usarlo.
¡Toda ayuda es muy apreciada!
EDITAR: El código para la clase de subprocesos está a continuación:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
Como noobio de Threading, me tomó mucho tiempo entender cómo implementar el código de Mateusz Kobos (arriba). Aquí hay una versión aclarada para ayudar a entender cómo usarla.
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError
def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except Exception:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
# This overrides the "run_with_exception" from class "ExThread"
# Note, this is where the actual thread to be run lives. The thread
# to be run could also call a method or be passed in as an object
def run_with_exception(self):
# Code will function until the int
print "sleeping 5 seconds"
import time
for i in 1, 2, 3, 4, 5:
print i
time.sleep(1)
# Thread should break here
int("str")
# I''m honestly not sure why these appear here? So, I removed them.
# Perhaps Mateusz can clarify?
# thread_name = threading.current_thread().name
# raise MyException("An error in thread ''{}''.".format(thread_name))
if __name__ == ''__main__'':
# The code lives in MyThread in this example. So creating the MyThread
# object set the code to be run (but does not start it yet)
t = MyThread()
# This actually starts the thread
t.start()
print
print ("Notice ''t.start()'' is considered to have completed, although"
" the countdown continues in its new thread. So you code "
"can tinue into new processing.")
# Now that the thread is running, the join allows for monitoring of it
try:
t.join_with_exception()
# should be able to be replace "Exception" with specific error (untested)
except Exception, e:
print
print "Exceptioon was caught and control passed back to the main thread"
print "Do some handling here...or raise a custom exception "
thread_name = threading.current_thread().name
e = ("Caught a MyException in thread: ''" +
str(thread_name) +
"'' [" + str(e) + "]")
raise Exception(e) # Or custom class of exception, such as MyException
De forma similar a RickardSjogren''s sin Queue, sys, etc., pero también sin algunos oyentes a las señales: ejecuta directamente un manejador de excepciones que corresponde a un bloque except.
#!/usr/bin/env python3
import threading
class ExceptionThread(threading.Thread):
def __init__(self, callback=None, *args, **kwargs):
"""
Redirect exceptions of thread to an exception handler.
:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
"""
self._callback = callback
super().__init__(*args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback
Solo self._callback y el excepto block in run () es adicional al threading normal. Thread.
El módulo concurrent.futures
simplifica el trabajo en hilos (o procesos) separados y maneja las excepciones resultantes:
import concurrent.futures
import shutil
def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)
while future.running():
# Print pretty dots here.
pass
# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()
concurrent.futures
se incluye con Python 3.2 y está disponible como el módulo de futures
backported para versiones anteriores.
El problema es que thread_obj.start()
regresa inmediatamente. El subproceso secundario que generó se ejecuta en su propio contexto, con su propia pila. Cualquier excepción que ocurra allí está en el contexto del hilo hijo, y está en su propia pila. Una forma en que puedo pensar en este momento para comunicar esta información al hilo principal es mediante el uso de algún tipo de mensaje que pase, por lo que podría ver eso.
Prueba esto para el tamaño:
import sys
import threading
import Queue
class ExcThread(threading.Thread):
def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket
def run(self):
try:
raise Exception(''An error occured here.'')
except Exception:
self.bucket.put(sys.exc_info())
def main():
bucket = Queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()
while True:
try:
exc = bucket.get(block=False)
except Queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace
thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break
if __name__ == ''__main__'':
main()
Este fue un pequeño problema desagradable, y me gustaría lanzar mi solución. Algunas otras soluciones que encontré (async.io, por ejemplo) parecían prometedoras, pero también presentaban un poco de una caja negra. El enfoque de bucle de cola / evento lo vincula a una determinada implementación. El código fuente de futuros concurrentes, sin embargo, tiene alrededor de 1000 líneas y es fácil de comprender . Me permitió resolver mi problema fácilmente: crear subprocesos de trabajo ad-hoc sin mucha configuración y poder detectar excepciones en el hilo principal.
Mi solución usa la API de futuros simultáneos y la API de subprocesos. Te permite crear un trabajador que te ofrece tanto el hilo como el futuro. De esta forma, puedes unirte al hilo para esperar el resultado:
worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
... o puede dejar que el trabajador simplemente envíe una devolución de llamada cuando haya terminado:
worker = Worker(test)
thread = worker.start(lambda x: print(''callback'', x))
... o puedes repetir hasta que el evento finalice:
worker = Worker(test)
thread = worker.start()
while True:
print("waiting")
if worker.future.done():
exc = worker.future.exception()
print(''exception?'', exc)
result = worker.future.result()
print(''result'', result)
break
time.sleep(0.25)
Aquí está el código:
from concurrent.futures import Future
import threading
import time
class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args
def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread
def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)
if(self._cb):
self._cb(self.future.result())
... y la función de prueba:
def test(*args):
print(''args are'', args)
time.sleep(2)
raise Exception(''foo'')
Hay muchas respuestas realmente complicadas a esta pregunta. ¿Estoy simplificando demasiado esto, porque esto parece suficiente para la mayoría de las cosas para mí?
from threading import Thread
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, ''_Thread__target''):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self):
super(PropagatingThread, self).join()
if self.exc:
raise self.exc
return self.ret
Si está seguro de que solo se ejecutará en una u otra versión de Python, podría reducir el método run()
a solo la versión destruida (si solo se ejecutará en versiones de Python antes del 3). ), o simplemente la versión limpia (si solo se ejecutará en versiones de Python comenzando con 3).
Ejemplo de uso:
def f(*args, **kwargs)
print(args)
print(kwargs)
raise Exception(''I suck'')
t = PropagatingThread(target=f, args=(5,), kwargs={''hello'':''world''})
t.start()
t.join()
Y verá la excepción planteada en el otro hilo cuando se una.
No es una buena práctica el usar desnudos, ya que, por lo general, pescas más de lo que pagas.
Sugeriría modificar el, except
para atrapar ÚNICAMENTE la excepción que le gustaría manejar. No creo que TheThread
tenga el efecto deseado, porque cuando vayas a instanciar TheThread
en el try
externo, si genera una excepción, la asignación nunca va a suceder.
En su lugar, es posible que desee alertar y seguir adelante, como por ejemplo:
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err
Luego, cuando se detecta esa excepción, puedes manejarla allí. Luego, cuando el try
externo TheThread
una excepción de TheThread
, sabe que no será la que ya manejó y lo ayudará a aislar el flujo de su proceso.
Si se produce una excepción en un hilo, la mejor manera es volver a subirlo en el hilo de la persona que llama durante la join
. Puede obtener información sobre la excepción que actualmente se maneja utilizando la función sys.exc_info()
. Esta información simplemente se puede almacenar como una propiedad del objeto de subproceso hasta que se llame a join
, en cuyo punto se puede volver a generar.
Tenga en cuenta que un Queue.Queue
(como se sugiere en otras respuestas) no es necesario en este caso simple en el que el hilo arroja a lo sumo 1 excepción y se completa justo después de lanzar una excepción . Evitamos las condiciones de carrera simplemente esperando a que se complete el hilo.
Por ejemplo, extienda ExcThread
(abajo), anulando excRun
(en lugar de run
).
Python 2.x:
import threading
class ExcThread(threading.Thread):
def excRun(self):
pass
def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don''t rethrow,
# just complete the function
def join(self):
threading.Thread.join(self)
if self.exc:
msg = "Thread ''%s'' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]
Python 3.x:
La forma de 3 argumentos para raise
se ha ido en Python 3, así que cambie la última línea a:
raise new_exc.with_traceback(self.exc[2])
Un método que me gusta está basado en el patrón del observador . Defino una clase de señal que utiliza mi hilo para emitir excepciones a los oyentes. También se puede usar para devolver valores de hilos. Ejemplo:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError(''Function {0} not removed from {1}''.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == ''__main__'':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError(''a failed'')
def b():
time.sleep(2)
return ''b returns''
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print ''Threads started''
t.join()
t2.join()
print ''Done''
No tengo suficiente experiencia trabajando con hilos para afirmar que este es un método completamente seguro. Pero me ha funcionado y me gusta la flexibilidad.
Una forma simple de capturar la excepción de thread y comunicarse de nuevo con el método de la persona que llama podría ser pasando el diccionario o una lista al método de worker
.
Ejemplo (pasar diccionario al método de trabajador):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj[''target''](*args, **kwargs)
except Exception as err:
shared_obj[''err''] = err
shared_obj = {''err'':'''', ''target'': my_method}
throw_me = "Test"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj[''err'']:
print(">>%s" % shared_obj[''err''])