example - my icon java
Cola de bloqueo y consumidor multiproceso, cómo saber cuándo detenerse (6)
¿No podemos hacerlo usando un CountDownLatch
, donde el tamaño es la cantidad de registros en el productor? Y cada consumidor contará countDown
luego de procesar un registro. Y cruza el método awaits()
cuando terminaron todas las tareas. Entonces detén todos tus consumidores. Como se procesan todos los registros.
Tengo un único productor de hilos que crea algunos objetos de tareas que luego se agregan a un ArrayBlockingQueue
(que es de tamaño fijo).
También comienzo un consumidor de subprocesos múltiples. Esto es compilación como un grupo de subprocesos fijos ( Executors.newFixedThreadPool(threadCount);
). A continuación, presento algunas invitaciones de ConsumerWorker a este threadPool, cada ConsumerWorker tiene una referencia a la instancia de ArrayBlockingQueue mencionada anteriormente.
Cada trabajador hará una take()
en la cola y se ocupará de la tarea.
Mi problema es cuál es la mejor manera de que un trabajador sepa cuándo no habrá más trabajo por hacer. En otras palabras, ¿cómo le digo a los trabajadores que el productor ha terminado de agregar a la cola, y desde este momento, cada trabajador debe detenerse cuando ve que la cola está vacía?
Lo que tengo ahora es una configuración donde mi Productor se inicializa con una devolución de llamada que se activa cuando termina su trabajo (de agregar cosas a la cola). También guardo una lista de todos los ConsumerWorkers que he creado y enviado a ThreadPool. Cuando la devolución de llamada del productor me dice que el productor ha terminado, puedo decirle esto a cada uno de los trabajadores. En este punto, simplemente deberían seguir comprobando si la cola no está vacía y cuando se vacíe deberían detenerse, lo que me permite cerrar con gracia el grupo de subprocesos de ExecutorService. Es algo como esto
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}
}
El problema aquí es que tengo una condición de carrera obvia en la que a veces el productor terminará, lo señalizará y ConsumerWorkers se detendrá ANTES de consumir todo en la cola.
Mi pregunta es ¿cuál es la mejor manera de sincronizar esto para que todo funcione bien? ¿Debería sincronizar toda la parte donde comprueba si el productor está ejecutándose más si la cola está vacía y tomar algo de la cola en un bloque (en el objeto de la cola)? ¿Debo simplemente sincronizar la actualización de isRunning
boolean en la instancia de ConsumerWorker? ¿Alguna otra sugerencia?
ACTUALIZAR, AQUÍ ESTÁ LA IMPLEMENTACIÓN DE TRABAJO QUE HE TERMINADO UTILIZANDO:
public class ConsumerWorker implements Runnable{
private BlockingQueue<Produced> inputQueue;
private final static Produced POISON = new Produced(-1);
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Esto fue inspirado en gran medida por la respuesta de JohnVint a continuación, con solo algunas modificaciones menores.
=== Actualización debido al comentario de @ vendhan.
Gracias por tu obeservación. Tienes razón, el primer fragmento de código en esta pregunta tiene (entre otros temas) aquel en el while(isRunning || !inputQueue.isEmpty())
el while(isRunning || !inputQueue.isEmpty())
realmente no tiene sentido.
En mi implementación final real de esto, hago algo que está más cerca de su sugerencia de reemplazar "||" (o) con "&&" (y), en el sentido de que cada trabajador (consumidor) ahora solo verifica si el elemento que obtuvo de la lista es una píldora venenosa, y si es así se detiene (entonces teóricamente podemos decir que el trabajador para ejecutar Y la cola no debe estar vacía).
Debería continuar take()
de la cola. Puede usar una píldora venenosa para decirle al trabajador que se detenga. Por ejemplo:
private final Object POISON_PILL = new Object();
@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
if(queueElement == POISON_PILL) {
inputQueue.add(POISON_PILL);//notify other threads to stop
return;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}
//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}
En su bloque de código donde intenta recuperar el elemento de la cola, use poll(time,unit)
lugar de take()
.
try {
Object queueElement = inputQueue.poll(timeout,unit);
//process queueElement
} catch (InterruptedException e) {
if(!isRunning && queue.isEmpty())
return ;
}
Al especificar los valores apropiados de tiempo de espera, se asegura de que los hilos no sigan bloqueando en caso de que haya una secuencia desafortunada de
-
isRunning
es cierto - La cola queda vacía, por lo que los hilos entran en espera bloqueada (si se usa
take()
-
isRunning
está configurado como falso
Enviaría a los trabajadores un paquete de trabajo especial para señalar que deberían cerrar:
public class ConsumerWorker implements Runnable{
private static final Produced DONE = new Produced();
private BlockingQueue<Produced> inputQueue;
public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}
@Override
public void run() {
for (;;) {
try {
Produced item = inputQueue.take();
if (item == DONE) {
inputQueue.add(item); // keep in the queue so all workers stop
break;
}
// process `item`
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
Para detener a los trabajadores, simplemente agregue ConsumerWorker.DONE
a la cola.
Hay una serie de estrategias que podría usar, pero una simple es tener una subclase de tareas que señale el final del trabajo. El productor no envía esta señal directamente. En cambio, pone en cola una instancia de esta subclase de tarea. Cuando uno de sus consumidores realiza esta tarea y la ejecuta, eso hace que se envíe la señal.
Tuve que usar un productor multiproceso y un consumidor multiproceso. Terminé con un Scheduler -- N Producers -- M Consumers
esquema Scheduler -- N Producers -- M Consumers
, cada dos se comunican a través de una cola (dos colas en total). El Programador llena la primera cola con solicitudes para producir datos, y luego la llena con N "píldoras venenosas". Hay un contador de productores activos (int atómico), y el último productor que recibe la última píldora de veneno envía M píldoras venenosas a la lista de consumidores.