punto - Multiproceso de Python: el proceso se bloquea al unirse para una larga cola
programa de hilos en python (3)
La cola qout
en el subproceso se llena. Los datos que coloca en él desde foo()
no caben en el búfer de los conductos del sistema operativo que se utilizan internamente, por lo que el subproceso bloquea el intento de incluir más datos. Pero el proceso principal no está leyendo estos datos: simplemente también está bloqueado, esperando a que termine el subproceso. Este es un punto muerto típico.
Estoy ejecutando Python 2.7.3 y noté el siguiente comportamiento extraño. Considera este ejemplo mínimo:
from multiprocessing import Process, Queue
def foo(qin, qout):
while True:
bar = qin.get()
if bar is None:
break
qout.put({''bar'': bar})
if __name__ == ''__main__'':
import sys
qin = Queue()
qout = Queue()
worker = Process(target=foo,args=(qin,qout))
worker.start()
for i in range(100000):
print i
sys.stdout.flush()
qin.put(i**2)
qin.put(None)
worker.join()
Cuando worker.join()
más de 10,000 o más, mi script se cuelga en worker.join()
. Funciona bien cuando el ciclo solo llega a 1,000.
¿Algunas ideas?
Tuve el mismo problema en python3
cuando traté de poner cadenas en una cola de tamaño total de 5000 cahrs.
En mi proyecto, hubo un proceso de host que configura una cola y comienza un subproceso, luego se une. Afrer join
proceso de host lee desde la cola. Cuando el subproceso produce demasiados datos, el host cuelga al join
. Lo arreglé usando la siguiente función para esperar el subproceso en el proceso de host:
def yield_from_process(q, p):
while p.is_alive():
p.join(timeout=1)
while True:
try:
yield q.get(block=False)
except Empty:
break
Leí de la cola tan pronto como se llena, por lo que nunca es muy grande
Debe haber un límite en el tamaño de las colas. Puedo reproducir el mismo problema (colgado en qin.join()
) incluso eliminando por completo qout
. Considere la siguiente modificación:
from multiprocessing import Process, Queue
def foo(qin,qout):
while True:
bar = qin.get()
if bar is None:
break
#qout.put({''bar'':bar})
if __name__==''__main__'':
import sys
qin=Queue()
qout=Queue() ## POSITION 1
for i in range(100):
#qout=Queue() ## POSITION 2
worker=Process(target=foo,args=(qin,))
worker.start()
for j in range(1000):
x=i*100+j
print x
sys.stdout.flush()
qin.put(x**2)
qin.put(None)
worker.join()
print ''Done!''
Esto funciona como está (con la línea qout.put
comentada). Si intentas guardar los 100000 resultados, entonces qout
vuelve demasiado grande: si qout.put({''bar'':bar})
el qout.put({''bar'':bar})
en foo
, y dejo la definición de qout
en POSITION 1, el código se bloquea. Sin embargo, si muevo la definición de qout
a la POSICIÓN 2, entonces la secuencia de comandos termina.
En resumen, debes tener cuidado de que ni el qin
ni el qout
se qout
demasiado grandes. (Consulte también: El límite máximo de la cola de multiprocesamiento es 32767 )