procesos multitarea ejemplos crear concurrentes con python events select synchronization multiprocessing

multitarea - "Seleccionar" en mĂșltiples colas de multiprocesamiento de Python?



multitarea en python (7)

En realidad, puede usar objetos multiprocesamiento.Queue en select.select. es decir

que = multiprocessing.Queue() (input,[],[]) = select.select([que._reader],[],[])

seleccionaría que solo si está listo para ser leído.

Sin embargo, no hay documentación al respecto. Estaba leyendo el código fuente de la biblioteca multiprocesamiento.queue (en Linux suele ser algo así como /usr/lib/python2.6/multiprocessing/queue.py) para averiguarlo.

Con Queue.Queue no encontré ninguna manera inteligente de hacer esto (y realmente me encantaría).

¿Cuál es la mejor manera de esperar (sin girar) hasta que haya algo disponible en una de las dos Queues (multiprocesamiento), donde ambas residen en el mismo sistema?


No estoy seguro de qué tan bien funciona la selección en una cola de multiprocesamiento en Windows. Como select on windows escucha los sockets y no los manejadores de archivos, sospecho que podría haber problemas.

Mi respuesta es hacer un hilo para escuchar cada cola de forma bloqueada, y poner todos los resultados en una sola cola escuchada por el hilo principal, esencialmente multiplexando las colas individuales en una sola.

Mi código para hacer esto es:

""" Allow multiple queues to be waited upon. queue,value = multiq.select(list_of_queues) """ import queue import threading class queue_reader(threading.Thread): def __init__(self,inq,sharedq): threading.Thread.__init__(self) self.inq = inq self.sharedq = sharedq def run(self): while True: data = self.inq.get() print ("thread reads data=",data) result = (self.inq,data) self.sharedq.put(result) class multi_queue(queue.Queue): def __init__(self,list_of_queues): queue.Queue.__init__(self) for q in list_of_queues: qr = queue_reader(q,self) qr.start() def select(list_of_queues): outq = queue.Queue() for q in list_of_queues: qr = queue_reader(q,outq) qr.start() return outq.get()

La siguiente rutina de prueba muestra cómo usarlo:

import multiq import queue q1 = queue.Queue() q2 = queue.Queue() q3 = multiq.multi_queue([q1,q2]) q1.put(1) q2.put(2) q1.put(3) q1.put(4) res=0 while not res==4: while not q3.empty(): res = q3.get()[1] print ("returning result =",res)

Espero que esto ayude.

Tony Wallace


No lo hagas

Coloque un encabezado en los mensajes y envíelos a una cola común. Esto simplifica el código y será más limpio en general.



Nueva versión del código anterior ...

No estoy seguro de qué tan bien funciona la selección en una cola de multiprocesamiento en Windows. Como select on windows escucha los sockets y no los manejadores de archivos, sospecho que podría haber problemas.

Mi respuesta es hacer un hilo para escuchar cada cola de forma bloqueada, y poner todos los resultados en una sola cola escuchada por el hilo principal, esencialmente multiplexando las colas individuales en una sola.

Mi código para hacer esto es:

""" Allow multiple queues to be waited upon. An EndOfQueueMarker marks a queue as "all data sent on this queue". When this marker has been accessed on all input threads, this marker is returned by the multi_queue. """ import queue import threading class EndOfQueueMarker: def __str___(self): return "End of data marker" pass class queue_reader(threading.Thread): def __init__(self,inq,sharedq): threading.Thread.__init__(self) self.inq = inq self.sharedq = sharedq def run(self): q_run = True while q_run: data = self.inq.get() result = (self.inq,data) self.sharedq.put(result) if data is EndOfQueueMarker: q_run = False class multi_queue(queue.Queue): def __init__(self,list_of_queues): queue.Queue.__init__(self) self.qList = list_of_queues self.qrList = [] for q in list_of_queues: qr = queue_reader(q,self) qr.start() self.qrList.append(qr) def get(self,blocking=True,timeout=None): res = [] while len(res)==0: if len(self.qList)==0: res = (self,EndOfQueueMarker) else: res = queue.Queue.get(self,blocking,timeout) if res[1] is EndOfQueueMarker: self.qList.remove(res[0]) res = [] return res def join(self): for qr in self.qrList: qr.join() def select(list_of_queues): outq = queue.Queue() for q in list_of_queues: qr = queue_reader(q,outq) qr.start() return outq.get()

El siguiente código es mi rutina de prueba para mostrar cómo funciona:

import multiq import queue q1 = queue.Queue() q2 = queue.Queue() q3 = multiq.multi_queue([q1,q2]) q1.put(1) q2.put(2) q1.put(3) q1.put(4) q1.put(multiq.EndOfQueueMarker) q2.put(multiq.EndOfQueueMarker) res=0 have_data = True while have_data: res = q3.get()[1] print ("returning result =",res) have_data = not(res==multiq.EndOfQueueMarker)


Parece que usar subprocesos que reenvían los elementos entrantes a una cola única que luego espera es una opción práctica cuando se usa multiprocesamiento de una manera independiente de la plataforma.

Evitar los hilos requiere el manejo de tuberías / FD de bajo nivel, que es específico de la plataforma y no es fácil de manejar de manera consistente con la API de nivel superior.

O bien, necesitaría Queues con la capacidad de establecer devoluciones de llamadas que creo que son la interfaz adecuada de nivel superior. Es decir, escribirías algo como:

singlequeue = Queue() incoming_queue1.setcallback(singlequeue.put) incoming_queue2.setcallback(singlequeue.put) ... singlequeue.get()

Quizás el paquete de multiprocesamiento podría hacer crecer esta API, pero aún no está allí. El concepto funciona bien con py.execnet, que utiliza el término "canal" en lugar de "colas", consulte aquí http://tinyurl.com/nmtr4w


Puede usar algo como el patrón Observer , donde los suscriptores de Queue son notificados de los cambios de estado.

En este caso, podría tener su hilo de trabajo designado como oyente en cada cola, y cada vez que reciba una señal de listo, puede funcionar en el nuevo elemento, de lo contrario, dormirá.