studio settitle programacion móviles desarrollo curso aplicaciones java multithreading blockingqueue

java - settitle - "Cerrar" una cola de bloqueo



programacion android pdf 2018 (10)

Estoy usando java.util.concurrent.BlockingQueue en un escenario muy simple de productor-consumidor. Por ejemplo, este pseudo código representa la parte del consumidor:

class QueueConsumer implements Runnable { @Override public void run() { while(true) { try { ComplexObject complexObject = myBlockingQueue.take(); //do something with the complex object } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } }

Hasta aquí todo bien. En el javadoc de la cola de bloqueo leí:

Un BlockingQueue no admite intrínsecamente ningún tipo de operación de "cierre" o "apagado" para indicar que no se agregarán más elementos. Las necesidades y el uso de tales características tienden a ser dependientes de la implementación. Por ejemplo, una táctica común es que los productores inserten objetos especiales de final de flujo o envenenados, que se interpretan en consecuencia cuando los consumidores los toman.

Desafortunadamente, debido a los genéricos en uso y la naturaleza de ComplexObject, no es trivial empujar un "objeto venenoso" en la cola. Así que esta "táctica común" no es realmente conveniente en mi escenario.

Mi pregunta es: ¿qué otras buenas tácticas / patrones puedo usar para "cerrar" la cola?

¡Gracias!


¿Es posible extender ComplexObject y simular la funcionalidad de creación no trivial? Esencialmente, estás terminando con un objeto de shell, pero puedes hacer una instance of para ver si es el objeto de final de cola.


En esta situación, generalmente tiene que deshacerse de los genéricos y hacer que la cola mantenga el tipo de objeto. entonces, solo necesita verificar su objeto "envenenado" antes de lanzarlo al tipo real.


He usado este sistema:

ConsumerClass private boolean queueIsNotEmpty = true;//with setter ... do { ... sharedQueue.drainTo(docs); ... } while (queueIsNotEmpty || sharedQueue.isEmpty());

Cuando el productor finalizó, configuré el campo consumerObject, queueIsNotEmpty como falso


Hoy resolví este problema usando un objeto envoltorio. Dado que el ComplexObject es demasiado complejo para la subclase, envolví el ComplexObject en el objeto ComplexObjectWrapper. Luego usé ComplexObjectWrapper como el tipo genérico.

public class ComplexObjectWrapper { ComplexObject obj; } public class EndOfQueue extends ComplexObjectWrapper{}

Ahora, en lugar de BlockingQueue<ComplexObject> hice BlockingQueue<ComplexObjectWrapper>

Desde que tuve el control tanto del consumidor como del productor, esta solución funcionó para mí.


Me parece razonable implementar una BlockingQueue pueda cerrar:

import java.util.concurrent.BlockingQueue; public interface CloseableBlockingQueue<E> extends BlockingQueue<E> { /** Returns <tt>true</tt> if this queue is closed, <tt>false</tt> otherwise. */ public boolean isClosed(); /** Closes this queue; elements cannot be added to a closed queue. **/ public void close(); }

Sería bastante sencillo implementar esto con los siguientes comportamientos (consulte la tabla de resumen de métodos ):

  • Insertar :

    • Lanza la excepción , valor especial :

      Se comporta como una Queue completa, la responsabilidad de la persona que llama de probar isClosed() .

    • Blocks

      Lanza IllegalStateException si y cuando está cerrado.

    • Tiempo de espera :

      Devuelve false si y cuando está cerrado, la responsabilidad de la persona que llama de probar isClosed() .

  • Eliminar :

    • Lanza la excepción , valor especial :

      Se comporta como una Queue vacía, la responsabilidad de la persona que llama de probar isClosed() .

    • Blocks

      Lanza NoSuchElementException si y cuando está cerrado.

    • Tiempo de espera :

      Devuelve null si y cuando está cerrado, la responsabilidad de la persona que llama de probar isClosed() .

  • Examinar

    Ningún cambio.

Hice esto editando la source , encuéntrela en github.com .


Otra idea para hacer esto simple:

class ComplexObject implements QueueableComplexObject { /* the meat of your complex object is here as before, just need to * add the following line and the "implements" clause above */ @Override public ComplexObject asComplexObject() { return this; } } enum NullComplexObject implements QueueableComplexObject { INSTANCE; @Override public ComplexObject asComplexObject() { return null; } } interface QueueableComplexObject { public ComplexObject asComplexObject(); }

Luego use BlockingQueue<QueueableComplexObject> como cola. Cuando desee finalizar el procesamiento de la cola, haga queue.offer(NullComplexObject.INSTANCE) . En el lado del consumidor, hacer

boolean ok = true; while (ok) { ComplexObject obj = queue.take().asComplexObject(); if (obj == null) ok = false; else process(obj); } /* interrupt handling elided: implement this as you see fit, * depending on whether you watch to swallow interrupts or propagate them * as in your original post */

No se requiere ninguna instanceof , y no tiene que construir un ComplexObject falso que puede ser costoso / difícil dependiendo de su implementación.


Otra posibilidad para hacer un objeto venenoso: haz que sea una instancia particular de la clase. De esta manera, no tiene que eliminar los subtipos o arruinar su genérico.

Inconveniente: esto no funcionará si hay algún tipo de barrera de serialización entre el productor y el consumidor.

public class ComplexObject { public static final POISON_INSTANCE = new ComplexObject(); public ComplexObject(whatever arguments) { } // Empty constructor for creating poison instance. private ComplexObject() { } } class QueueConsumer implements Runnable { @Override public void run() { while(!(Thread.currentThread().interrupted())) { try { final ComplexObject complexObject = myBlockingQueue.take(); if (complexObject == ComplexObject.POISON_INSTANCE) return; // Process complex object. } catch (InterruptedException e) { // Set interrupted flag. Thread.currentThread().interrupt(); } } } }


Puede envolver su objeto genérico en un objeto de datos. En este objeto de datos puede agregar datos adicionales como el estado del objeto envenenado. El objeto de datos es una clase con 2 campos. T complexObject; y boolean poison; .

Su consumidor toma los objetos de datos de la cola. Si se devuelve un objeto venenoso, cierra el consumidor; de lo contrario, desenvuelve el genérico y llama a ''process (complexObject)''.

Estoy usando un java.util.concurrent.LinkedBlockingDeque<E> para que pueda agregar objetos al final de la cola y tomarlos desde el frente. De esa manera, su objeto se manejará en orden, pero lo más importante es que es seguro cerrar la cola después de que se encuentre con el objeto envenenado.

Para admitir a varios consumidores, agrego el objeto envenenado a la cola cuando me encuentro con él.

public final class Data<T> { private boolean poison = false; private T complexObject; public Data() { this.poison = true; } public Data(T complexObject) { this.complexObject = complexObject; } public boolean isPoison() { return poison; } public T getComplexObject() { return complexObject; } }

public class Consumer <T> implements Runnable { @Override public final void run() { Data<T> data; try { while (!(data = queue.takeFirst()).isPoison()) { process(data.getComplexObject()); } } catch (final InterruptedException e) { Thread.currentThread().interrupt(); return; } // add the poison object back so other consumers can stop too. queue.addLast(line); } }


Si tiene un identificador para el subproceso del consumidor, puede interrumpirlo. Con el código que le diste, eso matará al consumidor. No esperaría que el productor tuviera esto; Probablemente tendría que volver a llamar al controlador del programa de alguna manera para que sepa que está hecho. Entonces el controlador interrumpiría el hilo del consumidor.

Siempre puedes terminar de trabajar antes de obedecer la interrupción. Por ejemplo:

class QueueConsumer implements Runnable { @Override public void run() { while(!(Thread.currentThread().isInterrupted())) { try { final ComplexObject complexObject = myBlockingQueue.take(); this.process(complexObject); } catch (InterruptedException e) { // Set interrupted flag. Thread.currentThread().interrupt(); } } // Thread is getting ready to die, but first, // drain remaining elements on the queue and process them. final LinkedList<ComplexObject> remainingObjects; myBlockingQueue.drainTo(remainingObjects); for(ComplexObject complexObject : remainingObjects) { this.process(complexObject); } } private void process(final ComplexObject complexObject) { // Do something with the complex object. } }

De hecho, preferiría que de alguna manera envenene la cola. Si quieres matar el hilo, pídele que se mate a sí mismo.

(Es bueno ver a alguien manejando InterruptedException correctamente).

Parece que hay alguna discusión sobre el manejo de las interrupciones aquí. Primero, me gustaría que todos lean este artículo: http://www.ibm.com/developerworks/java/library/j-jtp05236.html

Ahora, con el entendimiento de que nadie realmente lee eso, aquí está el trato. Un subproceso solo recibirá una InterruptedException si estaba bloqueando actualmente en el momento de la interrupción. En este caso, Thread.interrupted() devolverá false . Si no estaba bloqueando, NO recibirá esta excepción, y en Thread.interrupted() lugar Thread.interrupted() devolverá true . Por lo tanto, su protección de bucle debería absolutamente, Thread.interrupted() lo que Thread.interrupted() , comprobar Thread.interrupted() , o de lo contrario, podría perder una interrupción en el hilo.

Por lo tanto, dado que está verificando Thread.interrupted() sin importar qué, y se ve obligado a capturar InterruptedException (y debe tratar con él incluso si no se lo obligó a hacerlo), ahora tiene dos áreas de código que manejan el mismo evento , interrupción del hilo. Una forma de manejar esto es normalizarlos en una condición, lo que significa que la comprobación del estado booleano puede generar la excepción o la excepción puede establecer el estado booleano. Elijo el más tarde.

Edición: tenga en cuenta que el método estático Subproceso # interrumpido borra el estado interrumpido de la secuencia actual.


Una alternativa sería envolver el procesamiento que está haciendo con un ExecutorService y dejar que ExecutorService controle si los trabajos se agregan o no a la cola.

Básicamente, aprovechas ExecutorService.shutdown() , que cuando se le llama, no permite que el ejecutor procese más tareas.

No estoy seguro de cómo está actualmente enviando tareas a QueueConsumer en su ejemplo. Supongo que tiene algún tipo de método submit() , y usé un método similar en el ejemplo.

import java.util.concurrent.*; class QueueConsumer { private final ExecutorService executor = Executors.newSingleThreadExecutor(); public void shutdown() { executor.shutdown(); // gracefully shuts down the executor } // ''Result'' is a class you''ll have to write yourself, if you want. // If you don''t need to provide a result, you can just Runnable // instead of Callable. public Future<Result> submit(final ComplexObject complexObject) { if(executor.isShutdown()) { // handle submitted tasks after the executor has been told to shutdown } return executor.submit(new Callable<Result>() { @Override public Result call() { return process(complexObject); } }); } private Result process(final ComplexObject complexObject) { // Do something with the complex object. } }

Este ejemplo es solo una ilustración inmediata de lo que ofrece el paquete java.util.concurrent ; probablemente haya algunas optimizaciones que se podrían realizar (por ejemplo, QueueConsumer como su propia clase probablemente ni siquiera sea necesaria; simplemente puede proporcionar el Servicio de ExecutorService a los productores que envíen las tareas).

Excave a través del paquete java.util.concurrent (comenzando en algunos de los enlaces anteriores). Podría encontrar que le ofrece muchas opciones excelentes para lo que está tratando de hacer, y ni siquiera tiene que preocuparse por regular la cola de trabajo.