java queue producer-consumer blockingqueue

Java BlockingQueue con lotes?



producer-consumer (4)

Estoy interesado en una estructura de datos idéntica a Java BlockingQueue, con la excepción de que debe poder agrupar objetos en la cola. En otras palabras, me gustaría que el productor pueda poner objetos en la cola, pero que el consumidor bloquee en take() hasta que la cola alcance un cierto tamaño (el tamaño del lote).

Luego, una vez que la cola ha alcanzado el tamaño del lote, el productor debe bloquear en put() hasta que el consumidor haya consumido todos los elementos de la cola (en cuyo caso el productor comenzará a producir nuevamente y el bloque del consumidor hasta que se alcance el lote). otra vez).

¿Existe una estructura de datos similar? O si lo escribo (lo cual no me importa), simplemente no quiero perder mi tiempo si hay algo ahí afuera.

ACTUALIZAR

Tal vez para aclarar un poco las cosas:

La situación siempre será la siguiente. Puede haber varios productores que agreguen elementos a la cola, pero nunca habrá más de un consumidor tomando elementos de la cola.

Ahora, el problema es que hay múltiples de estas configuraciones en paralelo y en serie. En otras palabras, los productores producen artículos para múltiples colas, mientras que los consumidores también pueden ser productores. Esto puede pensarse más fácilmente como un gráfico dirigido de productores, consumidores-productores y, finalmente, consumidores.

La razón por la que los productores deben bloquear hasta que las colas estén vacías (@Peter Lawrey) es porque cada uno de estos se ejecutará en un hilo. Si los deja simplemente para producir a medida que el espacio esté disponible, terminará con una situación en la que tiene demasiados hilos que intentan procesar demasiadas cosas a la vez.

¿Quizás unir esto con un servicio de ejecución podría resolver el problema?


Aquí hay una implementación rápida (= simple pero no totalmente probada) que creo que puede ser adecuada para sus solicitudes: debería poder extenderla para que sea compatible con la interfaz de cola completa si lo necesita.

para aumentar el rendimiento, puede cambiar a ReentrantLock en lugar de utilizar la palabra clave "sincronizada".

public class BatchBlockingQueue<T> { private ArrayList<T> queue; private Semaphore readerLock; private Semaphore writerLock; private int batchSize; public BatchBlockingQueue(int batchSize) { this.queue = new ArrayList<>(batchSize); this.readerLock = new Semaphore(0); this.writerLock = new Semaphore(batchSize); this.batchSize = batchSize; } public synchronized void put(T e) throws InterruptedException { writerLock.acquire(); queue.add(e); if (queue.size() == batchSize) { readerLock.release(batchSize); } } public synchronized T poll() throws InterruptedException { readerLock.acquire(); T ret = queue.remove(0); if (queue.isEmpty()) { writerLock.release(batchSize); } return ret; } }

Esperamos que te sea útil.


Esto suena como la forma en que funciona el RingBuffer en el patrón de LMAX Disruptor. Consulte http://code.google.com/p/disruptor/ para obtener más información.

Una explicación muy aproximada es que su principal estructura de datos es el RingBuffer. Los productores ponen los datos en el búfer de anillo en secuencia y los consumidores pueden extraer tantos datos como el productor ha puesto en el búfer (por lo tanto, esencialmente la preparación de lotes). Si el búfer está lleno, el productor se bloquea hasta que el consumidor haya terminado y liberado las ranuras en el búfer.


Le sugiero que utilice BlockingQueue.drainTo (Collection, int) . Puede usarlo con take () para asegurarse de obtener un número mínimo de elementos.

La ventaja de utilizar este enfoque es que el tamaño de su lote aumenta dinámicamente con la carga de trabajo y el productor no tiene que bloquear cuando el consumidor está ocupado. es decir, se optimiza para la latencia y el rendimiento.

Para implementar exactamente según lo solicitado (lo que creo que es una mala idea), puede utilizar una SynchronousQueue con un subproceso que consume mucho.

es decir, el hilo consumidor hace un

list.clear(); while(list.size() < required) list.add(queue.take()); // process list.

El productor bloqueará cuando el consumidor esté ocupado.


No que yo sepa. Si entiendo correctamente, desea que el productor trabaje (mientras el consumidor está bloqueado) hasta que llene la cola o que el consumidor trabaje (mientras el productor bloquea) hasta que se borre la cola. Si ese es el caso, puedo sugerir que no necesita una estructura de datos sino un mecanismo para bloquear a una parte mientras la otra está trabajando en una invasión mutex. Puede bloquear un objeto para eso y tener la lógica interna de si está lleno o vacío para liberar el bloqueo y pasarlo a la otra parte. Así que en resumen, deberías escribirlo tú mismo :)