ejemplo java concurrency blocking interrupt

ejemplo - blockingqueue java 8



¿Cómo se interrumpe un BlockingQueue que está bloqueando en take()? (5)

Tengo una clase que toma objetos de un BlockingQueue y los procesa llamando a take() en un bucle continuo. En algún momento sé que no se agregarán más objetos a la cola. ¿Cómo interrumpo el método take() para que deje de bloquear?

Aquí está la clase que procesa los objetos:

public class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public class MyObjHandler(BlockingQueue queue) { this.queue = queue; } public void run() { try { while (true) { MyObj obj = queue.take(); // process obj here // ... } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

Y aquí está el método que usa esta clase para procesar objetos:

public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler // to stop waiting for more objects? }


Interrumpir el hilo:

thread.interrupt()


O no interrumpas, es desagradable.

public class MyQueue<T> extends ArrayBlockingQueue<T> { private static final long serialVersionUID = 1L; private boolean done = false; public ParserQueue(int capacity) { super(capacity); } public void done() { done = true; } public boolean isDone() { return done; } /** * May return null if producer ends the production after consumer * has entered the element-await state. */ public T take() throws InterruptedException { T el; while ((el = super.poll()) == null && !done) { synchronized (this) { wait(); } } return el; } }

  1. cuando el productor pone el objeto en la cola, llame a queue.notify() , si termina, llame a queue.done()
  2. loop while (! queue.isDone () ||! queue.isEmpty ())
  3. valor de retorno de prueba de toma () para nulo

Si la interrupción del hilo no es una opción, otra es colocar un objeto "marcador" o "comando" en la cola que MyObjHandler reconocería como tal y salirse del ciclo.


Muy tarde, pero espero que esto ayude a otros también, ya que enfrenté el problema similar y utilicé el enfoque de poll sugerido por Erickson con algunos cambios menores,

class MyObjHandler implements Runnable { private final BlockingQueue<MyObj> queue; public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL public MyObjHandler(BlockingQueue queue) { this.queue = queue; Finished = false; } @Override public void run() { while (true) { try { MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return { // process obj here // ... } if(Finished && queue.isEmpty()) return; } catch (InterruptedException e) { return; } } } } public void testHandler() { BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjHandler handler = new MyObjHandler(queue); new Thread(handler).start(); // get objects for handler to process for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } // what code should go here to tell the handler to stop waiting for more objects? handler.Finished = true; //THIS TELLS HIM //If you need you can wait for the termination otherwise remove join myThread.join(); }

Esto resolvió ambos problemas

  1. Marcó la clave de BlockingQueue para que sepa que no debe esperar más elementos
  2. No se interrumpió en el medio para que los bloques de procesamiento finalicen solo cuando se procesen todos los elementos en la cola y no haya elementos restantes para agregar.

BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); MyObjectHandler handler = new MyObjectHandler(queue); Thread thread = new Thread(handler); thread.start(); for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext(); ) { queue.put(i.next()); } thread.interrupt();

Sin embargo, si hace esto, el hilo podría interrumpirse mientras todavía hay elementos en la cola, esperando ser procesados. Es posible que desee considerar el uso de poll lugar de take , lo que permitirá que el flujo de procesamiento expire y finalice cuando haya esperado un tiempo sin nuevas entradas.