thread parallel lock async python multithreading multiprocessing try-catch-finally

parallel - Python: multiprocessing.map: Si un proceso genera una excepción, ¿por qué no se llaman finalmente los bloques de otros procesos?



python multiprocessor (3)

Mi entendimiento es que finalmente las cláusulas deben * siempre * ejecutarse si se ha ingresado el intento.

import random from multiprocessing import Pool from time import sleep def Process(x): try: print x sleep(random.random()) raise Exception(''Exception: '' + x) finally: print ''Finally: '' + x Pool(3).map(Process, [''1'',''2'',''3''])

La salida esperada es que para cada una de las x que se imprimen solas por la línea 8, debe haber una aparición de ''Finalmente x''.

Ejemplo de salida:

$ python bug.py 1 2 3 Finally: 2 Traceback (most recent call last): File "bug.py", line 14, in <module> Pool(3).map(Process, [''1'',''2'',''3'']) File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map return self.map_async(func, iterable, chunksize).get() File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get raise self._value Exception: Exception: 2

Parece que una excepción que termina un proceso termina los procesos padre y hermano, aunque hay trabajo adicional que se requiere en otros procesos.

¿Por qué me equivoco? ¿Por qué es esto correcto? Si esto es correcto, ¿cómo se deben limpiar los recursos de forma segura en Python multiproceso?


La answer de definitivamente explica por qué obtienes el comportamiento que observas. Sin embargo, se debe enfatizar que SIGTERM se envía solo debido a la forma en que se implementa multiprocessing.pool._terminate_pool . Si puede evitar usar Pool , puede obtener el comportamiento que desea. Aquí hay un ejemplo prestado :

from multiprocessing import Process from time import sleep import random def f(x): try: sleep(random.random()*10) raise Exception except: print "Caught exception in process:", x # Make this last longer than the except clause in main. sleep(3) finally: print "Cleaning up process:", x if __name__ == ''__main__'': processes = [] for i in range(4): p = Process(target=f, args=(i,)) p.start() processes.append(p) try: for process in processes: process.join() except: print "Caught exception in main." finally: print "Cleaning up main."

Después de enviar un SIGINT es, el resultado de ejemplo es:

Caught exception in process: 0 ^C Cleaning up process: 0 Caught exception in main. Cleaning up main. Caught exception in process: 1 Caught exception in process: 2 Caught exception in process: 3 Cleaning up process: 1 Cleaning up process: 2 Cleaning up process: 3

Tenga en cuenta que la cláusula finally se ejecuta para todos los procesos. Si necesita memoria compartida, considere usar Queue , Pipe , Manager o alguna tienda externa como redis o sqlite3 .


finally vuelva a elevar la excepción original a menos que return de ella . Pool.map y mata toda la aplicación. Los subprocesos se terminan y no ve otras excepciones.

Puedes agregar un return para tragar la excepción:

def Process(x): try: print x sleep(random.random()) raise Exception(''Exception: '' + x) finally: print ''Finally: '' + x return

Entonces debería tener None en el resultado de su map cuando se produjo una excepción.


Respuesta corta: SIGTERM triunfa finally .

Respuesta larga: mp.log_to_stderr() el registro con mp.log_to_stderr() :

import random import multiprocessing as mp import time import logging logger=mp.log_to_stderr(logging.DEBUG) def Process(x): try: logger.info(x) time.sleep(random.random()) raise Exception(''Exception: '' + x) finally: logger.info(''Finally: '' + x) result=mp.Pool(3).map(Process, [''1'',''2'',''3''])

La salida de registro incluye:

[DEBUG/MainProcess] terminating workers

Que corresponde a este código en multiprocessing.pool._terminate_pool :

if pool and hasattr(pool[0], ''terminate''): debug(''terminating workers'') for p in pool: p.terminate()

Cada p en el pool es un proceso de multiprocessing.Process , y la terminate llamadas (al menos en máquinas que no son Windows) llama a SIGTERM:

desde multiprocessing/forking.py :

class Popen(object) def terminate(self): ... try: os.kill(self.pid, signal.SIGTERM) except OSError, e: if self.wait(timeout=0.1) is None: raise

Así que todo se reduce a lo que sucede cuando se envía un SIGTERM un proceso de Python en una suite de try .

Considere el siguiente ejemplo (test.py):

import time def worker(): try: time.sleep(100) finally: print(''enter finally'') time.sleep(2) print(''exit finally'') worker()

Si lo ejecuta, luego envíele un SIGTERM , entonces el proceso finaliza de inmediato, sin ingresar al paquete final, como lo demuestra la ausencia de salida y el retraso.

En una terminal:

% test.py

En la segunda terminal:

% pkill -TERM -f "test.py"

Resultado en la primera terminal:

Terminated

Compare eso con lo que sucede cuando se envía un proceso SIGINT ( Cc ):

En la segunda terminal:

% pkill -INT -f "test.py"

Resultado en la primera terminal:

enter finally exit finally Traceback (most recent call last): File "/home/unutbu/pybin/test.py", line 14, in <module> worker() File "/home/unutbu/pybin/test.py", line 8, in worker time.sleep(100) KeyboardInterrupt

Conclusión: SIGTERM triunfa finally .