parallel - Python: multiprocessing.map: Si un proceso genera una excepción, ¿por qué no se llaman finalmente los bloques de otros procesos?
python multiprocessor (3)
Mi entendimiento es que finalmente las cláusulas deben * siempre * ejecutarse si se ha ingresado el intento.
import random
from multiprocessing import Pool
from time import sleep
def Process(x):
try:
print x
sleep(random.random())
raise Exception(''Exception: '' + x)
finally:
print ''Finally: '' + x
Pool(3).map(Process, [''1'',''2'',''3''])
La salida esperada es que para cada una de las x que se imprimen solas por la línea 8, debe haber una aparición de ''Finalmente x''.
Ejemplo de salida:
$ python bug.py
1
2
3
Finally: 2
Traceback (most recent call last):
File "bug.py", line 14, in <module>
Pool(3).map(Process, [''1'',''2'',''3''])
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map
return self.map_async(func, iterable, chunksize).get()
File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get
raise self._value
Exception: Exception: 2
Parece que una excepción que termina un proceso termina los procesos padre y hermano, aunque hay trabajo adicional que se requiere en otros procesos.
¿Por qué me equivoco? ¿Por qué es esto correcto? Si esto es correcto, ¿cómo se deben limpiar los recursos de forma segura en Python multiproceso?
La answer de definitivamente explica por qué obtienes el comportamiento que observas. Sin embargo, se debe enfatizar que SIGTERM se envía solo debido a la forma en que se implementa multiprocessing.pool._terminate_pool
. Si puede evitar usar Pool
, puede obtener el comportamiento que desea. Aquí hay un ejemplo prestado :
from multiprocessing import Process
from time import sleep
import random
def f(x):
try:
sleep(random.random()*10)
raise Exception
except:
print "Caught exception in process:", x
# Make this last longer than the except clause in main.
sleep(3)
finally:
print "Cleaning up process:", x
if __name__ == ''__main__'':
processes = []
for i in range(4):
p = Process(target=f, args=(i,))
p.start()
processes.append(p)
try:
for process in processes:
process.join()
except:
print "Caught exception in main."
finally:
print "Cleaning up main."
Después de enviar un SIGINT es, el resultado de ejemplo es:
Caught exception in process: 0
^C
Cleaning up process: 0
Caught exception in main.
Cleaning up main.
Caught exception in process: 1
Caught exception in process: 2
Caught exception in process: 3
Cleaning up process: 1
Cleaning up process: 2
Cleaning up process: 3
Tenga en cuenta que la cláusula finally
se ejecuta para todos los procesos. Si necesita memoria compartida, considere usar Queue
, Pipe
, Manager
o alguna tienda externa como redis
o sqlite3
.
finally
vuelva a elevar la excepción original a menos que return
de ella . Pool.map
y mata toda la aplicación. Los subprocesos se terminan y no ve otras excepciones.
Puedes agregar un return
para tragar la excepción:
def Process(x):
try:
print x
sleep(random.random())
raise Exception(''Exception: '' + x)
finally:
print ''Finally: '' + x
return
Entonces debería tener None
en el resultado de su map
cuando se produjo una excepción.
Respuesta corta: SIGTERM
triunfa finally
.
Respuesta larga: mp.log_to_stderr()
el registro con mp.log_to_stderr()
:
import random
import multiprocessing as mp
import time
import logging
logger=mp.log_to_stderr(logging.DEBUG)
def Process(x):
try:
logger.info(x)
time.sleep(random.random())
raise Exception(''Exception: '' + x)
finally:
logger.info(''Finally: '' + x)
result=mp.Pool(3).map(Process, [''1'',''2'',''3''])
La salida de registro incluye:
[DEBUG/MainProcess] terminating workers
Que corresponde a este código en multiprocessing.pool._terminate_pool
:
if pool and hasattr(pool[0], ''terminate''):
debug(''terminating workers'')
for p in pool:
p.terminate()
Cada p
en el pool
es un proceso de multiprocessing.Process
, y la terminate
llamadas (al menos en máquinas que no son Windows) llama a SIGTERM:
desde multiprocessing/forking.py
:
class Popen(object)
def terminate(self):
...
try:
os.kill(self.pid, signal.SIGTERM)
except OSError, e:
if self.wait(timeout=0.1) is None:
raise
Así que todo se reduce a lo que sucede cuando se envía un SIGTERM
un proceso de Python en una suite de try
.
Considere el siguiente ejemplo (test.py):
import time
def worker():
try:
time.sleep(100)
finally:
print(''enter finally'')
time.sleep(2)
print(''exit finally'')
worker()
Si lo ejecuta, luego envíele un SIGTERM
, entonces el proceso finaliza de inmediato, sin ingresar al paquete final, como lo demuestra la ausencia de salida y el retraso.
En una terminal:
% test.py
En la segunda terminal:
% pkill -TERM -f "test.py"
Resultado en la primera terminal:
Terminated
Compare eso con lo que sucede cuando se envía un proceso SIGINT
( Cc
):
En la segunda terminal:
% pkill -INT -f "test.py"
Resultado en la primera terminal:
enter finally
exit finally
Traceback (most recent call last):
File "/home/unutbu/pybin/test.py", line 14, in <module>
worker()
File "/home/unutbu/pybin/test.py", line 8, in worker
time.sleep(100)
KeyboardInterrupt
Conclusión: SIGTERM
triunfa finally
.