event - ¿Cuál es la forma más limpia de detener a un trabajador de multiprocesamiento python adjunto a una cola en un bucle infinito?
python parallel execution (1)
Resolví el problema. De acuerdo con la documentación para multiprocessing.Pool.join()
, el pool
debe estar close()ed
antes de poder join()ed
. Agregar pool.close()
antes que pool.join()
resolvió el problema.
Estoy implementando un patrón de productor-consumidor en Python usando multiprocessing.Pool
y multiprocessing.Queue
. Los consumidores son procesos pre-bifurcados que utilizan gevent
para generar múltiples tareas.
Aquí hay una versión recortada del código:
import gevent
from Queue import Empty as QueueEmpty
from multiprocessing import Process, Queue, Pool
import signal
import time
# Task queue
queue = Queue()
def init_worker ():
# Ignore signals in worker
signal.signal( signal.SIGTERM, signal.SIG_IGN )
signal.signal( signal.SIGINT, signal.SIG_IGN )
signal.signal( signal.SIGQUIT, signal.SIG_IGN )
# One of the worker task
def worker_task1( ):
while True:
try:
m = queue.get( timeout = 2 )
# Break out if producer says quit
if m == ''QUIT'':
print ''TIME TO QUIT''
break
except QueueEmpty:
pass
# Worker
def work( ):
gevent.joinall([
gevent.spawn( worker_task1 ),
])
pool = Pool( 2, init_worker )
for i in xrange( 2 ):
pool.apply_async( work )
try:
while True:
queue.put( ''Some Task'' )
time.sleep( 2 )
except KeyboardInterrupt as e:
print ''STOPPING''
# Signal all workers to quit
for i in xrange( 2 ):
queue.put( ''QUIT'' )
pool.join()
Ahora cuando trato de dejarlo, obtengo el siguiente estado:
- El proceso de los padres está a la espera de que uno de los niños se una.
- Uno de los niños está en estado difunto. Tan terminado, pero el padre está esperando que el otro niño termine.
- Otro niño muestra:
futex(0x7f99d9188000, FUTEX_WAIT, 0, NULL ...
Entonces, ¿cuál es la forma correcta de terminar este proceso de manera limpia?