threading thread stop sincronizar semaforos raspberry programacion hilos finish eliminar concurrente python multithreading exception-handling exception

thread - Captura la excepción de un hilo en el hilo de la persona que llama en Python



sincronizar hilos en python (11)

Aunque no es posible detectar directamente una excepción arrojada en un hilo diferente, aquí hay un código para obtener de forma bastante transparente algo muy cercano a esta funcionalidad. El subproceso secundario debe subclase de la clase ExThread lugar de threading.Thread Thread y el subproceso principal deben llamar al método child_thread.join_with_exception() lugar de child_thread.join() cuando espera que el subproceso termine su tarea.

Detalles técnicos de esta implementación: cuando el hilo hijo arroja una excepción, se pasa al padre a través de una Queue y se vuelve a lanzar en el hilo padre. Observe que no hay espera ocupada en este enfoque.

#!/usr/bin/env python import sys import threading import Queue class ExThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.__status_queue = Queue.Queue() def run_with_exception(self): """This method should be overriden.""" raise NotImplementedError def run(self): """This method should NOT be overriden.""" try: self.run_with_exception() except BaseException: self.__status_queue.put(sys.exc_info()) self.__status_queue.put(None) def wait_for_exc_info(self): return self.__status_queue.get() def join_with_exception(self): ex_info = self.wait_for_exc_info() if ex_info is None: return else: raise ex_info[1] class MyException(Exception): pass class MyThread(ExThread): def __init__(self): ExThread.__init__(self) def run_with_exception(self): thread_name = threading.current_thread().name raise MyException("An error in thread ''{}''.".format(thread_name)) def main(): t = MyThread() t.start() try: t.join_with_exception() except MyException as ex: thread_name = threading.current_thread().name print "Caught a MyException in thread ''{}'': {}".format(thread_name, ex) if __name__ == ''__main__'': main()

Soy muy nuevo en Python y la programación multiproceso en general. Básicamente, tengo un script que copiará los archivos a otra ubicación. Me gustaría que esto se coloque en otro hilo para que pueda dar salida .... para indicar que el script aún se está ejecutando.

El problema que estoy teniendo es que si los archivos no se pueden copiar lanzará una excepción. Esto está bien si se ejecuta en el hilo principal; sin embargo, tener el siguiente código no funciona:

try: threadClass = TheThread(param1, param2, etc.) threadClass.start() ##### **Exception takes place here** except: print "Caught an exception"

En la clase de subprocesos en sí, traté de volver a lanzar la excepción, pero no funciona. He visto a personas aquí hacer preguntas similares, pero todas parecen estar haciendo algo más específico que lo que estoy tratando de hacer (y no entiendo muy bien las soluciones ofrecidas). He visto personas mencionar el uso de sys.exc_info() , sin embargo, no sé dónde ni cómo usarlo.

¡Toda ayuda es muy apreciada!

EDITAR: El código para la clase de subprocesos está a continuación:

class TheThread(threading.Thread): def __init__(self, sourceFolder, destFolder): threading.Thread.__init__(self) self.sourceFolder = sourceFolder self.destFolder = destFolder def run(self): try: shul.copytree(self.sourceFolder, self.destFolder) except: raise


Como noobio de Threading, me tomó mucho tiempo entender cómo implementar el código de Mateusz Kobos (arriba). Aquí hay una versión aclarada para ayudar a entender cómo usarla.

#!/usr/bin/env python import sys import threading import Queue class ExThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.__status_queue = Queue.Queue() def run_with_exception(self): """This method should be overriden.""" raise NotImplementedError def run(self): """This method should NOT be overriden.""" try: self.run_with_exception() except Exception: self.__status_queue.put(sys.exc_info()) self.__status_queue.put(None) def wait_for_exc_info(self): return self.__status_queue.get() def join_with_exception(self): ex_info = self.wait_for_exc_info() if ex_info is None: return else: raise ex_info[1] class MyException(Exception): pass class MyThread(ExThread): def __init__(self): ExThread.__init__(self) # This overrides the "run_with_exception" from class "ExThread" # Note, this is where the actual thread to be run lives. The thread # to be run could also call a method or be passed in as an object def run_with_exception(self): # Code will function until the int print "sleeping 5 seconds" import time for i in 1, 2, 3, 4, 5: print i time.sleep(1) # Thread should break here int("str") # I''m honestly not sure why these appear here? So, I removed them. # Perhaps Mateusz can clarify? # thread_name = threading.current_thread().name # raise MyException("An error in thread ''{}''.".format(thread_name)) if __name__ == ''__main__'': # The code lives in MyThread in this example. So creating the MyThread # object set the code to be run (but does not start it yet) t = MyThread() # This actually starts the thread t.start() print print ("Notice ''t.start()'' is considered to have completed, although" " the countdown continues in its new thread. So you code " "can tinue into new processing.") # Now that the thread is running, the join allows for monitoring of it try: t.join_with_exception() # should be able to be replace "Exception" with specific error (untested) except Exception, e: print print "Exceptioon was caught and control passed back to the main thread" print "Do some handling here...or raise a custom exception " thread_name = threading.current_thread().name e = ("Caught a MyException in thread: ''" + str(thread_name) + "'' [" + str(e) + "]") raise Exception(e) # Or custom class of exception, such as MyException


De forma similar a RickardSjogren''s sin Queue, sys, etc., pero también sin algunos oyentes a las señales: ejecuta directamente un manejador de excepciones que corresponde a un bloque except.

#!/usr/bin/env python3 import threading class ExceptionThread(threading.Thread): def __init__(self, callback=None, *args, **kwargs): """ Redirect exceptions of thread to an exception handler. :param callback: function to handle occured exception :type callback: function(thread, exception) :param args: arguments for threading.Thread() :type args: tuple :param kwargs: keyword arguments for threading.Thread() :type kwargs: dict """ self._callback = callback super().__init__(*args, **kwargs) def run(self): try: if self._target: self._target(*self._args, **self._kwargs) except BaseException as e: if self._callback is None: raise e else: self._callback(self, e) finally: # Avoid a refcycle if the thread is running a function with # an argument that has a member that points to the thread. del self._target, self._args, self._kwargs, self._callback

Solo self._callback y el excepto block in run () es adicional al threading normal. Thread.


El módulo concurrent.futures simplifica el trabajo en hilos (o procesos) separados y maneja las excepciones resultantes:

import concurrent.futures import shutil def copytree_with_dots(src_path, dst_path): with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: # Execute the copy on a separate thread, # creating a future object to track progress. future = executor.submit(shutil.copytree, src_path, dst_path) while future.running(): # Print pretty dots here. pass # Return the value returned by shutil.copytree(), None. # Raise any exceptions raised during the copy process. return future.result()

concurrent.futures se incluye con Python 3.2 y está disponible como el módulo de futures backported para versiones anteriores.


El problema es que thread_obj.start() regresa inmediatamente. El subproceso secundario que generó se ejecuta en su propio contexto, con su propia pila. Cualquier excepción que ocurra allí está en el contexto del hilo hijo, y está en su propia pila. Una forma en que puedo pensar en este momento para comunicar esta información al hilo principal es mediante el uso de algún tipo de mensaje que pase, por lo que podría ver eso.

Prueba esto para el tamaño:

import sys import threading import Queue class ExcThread(threading.Thread): def __init__(self, bucket): threading.Thread.__init__(self) self.bucket = bucket def run(self): try: raise Exception(''An error occured here.'') except Exception: self.bucket.put(sys.exc_info()) def main(): bucket = Queue.Queue() thread_obj = ExcThread(bucket) thread_obj.start() while True: try: exc = bucket.get(block=False) except Queue.Empty: pass else: exc_type, exc_obj, exc_trace = exc # deal with the exception print exc_type, exc_obj print exc_trace thread_obj.join(0.1) if thread_obj.isAlive(): continue else: break if __name__ == ''__main__'': main()


Este fue un pequeño problema desagradable, y me gustaría lanzar mi solución. Algunas otras soluciones que encontré (async.io, por ejemplo) parecían prometedoras, pero también presentaban un poco de una caja negra. El enfoque de bucle de cola / evento lo vincula a una determinada implementación. El código fuente de futuros concurrentes, sin embargo, tiene alrededor de 1000 líneas y es fácil de comprender . Me permitió resolver mi problema fácilmente: crear subprocesos de trabajo ad-hoc sin mucha configuración y poder detectar excepciones en el hilo principal.

Mi solución usa la API de futuros simultáneos y la API de subprocesos. Te permite crear un trabajador que te ofrece tanto el hilo como el futuro. De esta forma, puedes unirte al hilo para esperar el resultado:

worker = Worker(test) thread = worker.start() thread.join() print(worker.future.result())

... o puede dejar que el trabajador simplemente envíe una devolución de llamada cuando haya terminado:

worker = Worker(test) thread = worker.start(lambda x: print(''callback'', x))

... o puedes repetir hasta que el evento finalice:

worker = Worker(test) thread = worker.start() while True: print("waiting") if worker.future.done(): exc = worker.future.exception() print(''exception?'', exc) result = worker.future.result() print(''result'', result) break time.sleep(0.25)

Aquí está el código:

from concurrent.futures import Future import threading import time class Worker(object): def __init__(self, fn, args=()): self.future = Future() self._fn = fn self._args = args def start(self, cb=None): self._cb = cb self.future.set_running_or_notify_cancel() thread = threading.Thread(target=self.run, args=()) thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process thread.start() return thread def run(self): try: self.future.set_result(self._fn(*self._args)) except BaseException as e: self.future.set_exception(e) if(self._cb): self._cb(self.future.result())

... y la función de prueba:

def test(*args): print(''args are'', args) time.sleep(2) raise Exception(''foo'')


Hay muchas respuestas realmente complicadas a esta pregunta. ¿Estoy simplificando demasiado esto, porque esto parece suficiente para la mayoría de las cosas para mí?

from threading import Thread class PropagatingThread(Thread): def run(self): self.exc = None try: if hasattr(self, ''_Thread__target''): # Thread uses name mangling prior to Python 3. self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) else: self.ret = self._target(*self._args, **self._kwargs) except BaseException as e: self.exc = e def join(self): super(PropagatingThread, self).join() if self.exc: raise self.exc return self.ret

Si está seguro de que solo se ejecutará en una u otra versión de Python, podría reducir el método run() a solo la versión destruida (si solo se ejecutará en versiones de Python antes del 3). ), o simplemente la versión limpia (si solo se ejecutará en versiones de Python comenzando con 3).

Ejemplo de uso:

def f(*args, **kwargs) print(args) print(kwargs) raise Exception(''I suck'') t = PropagatingThread(target=f, args=(5,), kwargs={''hello'':''world''}) t.start() t.join()

Y verá la excepción planteada en el otro hilo cuando se una.


No es una buena práctica el usar desnudos, ya que, por lo general, pescas más de lo que pagas.

Sugeriría modificar el, except para atrapar ÚNICAMENTE la excepción que le gustaría manejar. No creo que TheThread tenga el efecto deseado, porque cuando vayas a instanciar TheThread en el try externo, si genera una excepción, la asignación nunca va a suceder.

En su lugar, es posible que desee alertar y seguir adelante, como por ejemplo:

def run(self): try: shul.copytree(self.sourceFolder, self.destFolder) except OSError, err: print err

Luego, cuando se detecta esa excepción, puedes manejarla allí. Luego, cuando el try externo TheThread una excepción de TheThread , sabe que no será la que ya manejó y lo ayudará a aislar el flujo de su proceso.


Si se produce una excepción en un hilo, la mejor manera es volver a subirlo en el hilo de la persona que llama durante la join . Puede obtener información sobre la excepción que actualmente se maneja utilizando la función sys.exc_info() . Esta información simplemente se puede almacenar como una propiedad del objeto de subproceso hasta que se llame a join , en cuyo punto se puede volver a generar.

Tenga en cuenta que un Queue.Queue (como se sugiere en otras respuestas) no es necesario en este caso simple en el que el hilo arroja a lo sumo 1 excepción y se completa justo después de lanzar una excepción . Evitamos las condiciones de carrera simplemente esperando a que se complete el hilo.

Por ejemplo, extienda ExcThread (abajo), anulando excRun (en lugar de run ).

Python 2.x:

import threading class ExcThread(threading.Thread): def excRun(self): pass def run(self): self.exc = None try: # Possibly throws an exception self.excRun() except: import sys self.exc = sys.exc_info() # Save details of the exception thrown but don''t rethrow, # just complete the function def join(self): threading.Thread.join(self) if self.exc: msg = "Thread ''%s'' threw an exception: %s" % (self.getName(), self.exc[1]) new_exc = Exception(msg) raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

La forma de 3 argumentos para raise se ha ido en Python 3, así que cambie la última línea a:

raise new_exc.with_traceback(self.exc[2])


Un método que me gusta está basado en el patrón del observador . Defino una clase de señal que utiliza mi hilo para emitir excepciones a los oyentes. También se puede usar para devolver valores de hilos. Ejemplo:

import threading class Signal: def __init__(self): self._subscribers = list() def emit(self, *args, **kwargs): for func in self._subscribers: func(*args, **kwargs) def connect(self, func): self._subscribers.append(func) def disconnect(self, func): try: self._subscribers.remove(func) except ValueError: raise ValueError(''Function {0} not removed from {1}''.format(func, self)) class WorkerThread(threading.Thread): def __init__(self, *args, **kwargs): super(WorkerThread, self).__init__(*args, **kwargs) self.Exception = Signal() self.Result = Signal() def run(self): if self._Thread__target is not None: try: self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs) except Exception as e: self.Exception.emit(e) else: self.Result.emit(self._return_value) if __name__ == ''__main__'': import time def handle_exception(exc): print exc.message def handle_result(res): print res def a(): time.sleep(1) raise IOError(''a failed'') def b(): time.sleep(2) return ''b returns'' t = WorkerThread(target=a) t2 = WorkerThread(target=b) t.Exception.connect(handle_exception) t2.Result.connect(handle_result) t.start() t2.start() print ''Threads started'' t.join() t2.join() print ''Done''

No tengo suficiente experiencia trabajando con hilos para afirmar que este es un método completamente seguro. Pero me ha funcionado y me gusta la flexibilidad.


Una forma simple de capturar la excepción de thread y comunicarse de nuevo con el método de la persona que llama podría ser pasando el diccionario o una lista al método de worker .

Ejemplo (pasar diccionario al método de trabajador):

import threading def my_method(throw_me): raise Exception(throw_me) def worker(shared_obj, *args, **kwargs): try: shared_obj[''target''](*args, **kwargs) except Exception as err: shared_obj[''err''] = err shared_obj = {''err'':'''', ''target'': my_method} throw_me = "Test" th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={}) th.start() th.join() if shared_obj[''err'']: print(">>%s" % shared_obj[''err''])