java concurrency producer-consumer executorservice blockingqueue

java - ¿Es recomendable agregar tareas a BlockingQueue de ThreadPoolExecutor?



concurrency producer-consumer (5)

Es una pregunta muy importante si la cola que está utilizando es una implementación completamente diferente de la LinkedBlockingQueue o ArrayBlockingQueue estándar en memoria.

Por ejemplo, si está implementando el patrón productor-consumidor utilizando varios productores en diferentes máquinas, y utiliza un mecanismo de cola basado en un subsistema de persistencia separado (como Redis), la pregunta se vuelve relevante por sí sola, incluso si no lo hace. Quiero una offer() bloqueo offer() como el OP.

Por lo tanto, la respuesta dada, que se debe llamar a prestartAllCoreThreads() (o suficientes veces a prestartCoreThread() ) para que los subprocesos de trabajo estén disponibles y en ejecución, es lo suficientemente importante como para enfatizarlo.

El ThreadPoolExecutor para ThreadPoolExecutor no está claro si es aceptable agregar tareas directamente al BlockingQueue respalda al ejecutor. Los documentos dicen que llamar a executor.getQueue() está "destinado principalmente para la depuración y el monitoreo".

Estoy construyendo un ThreadPoolExecutor con mi propio BlockingQueue . Conservo una referencia a la cola para poder agregarle tareas directamente. getQueue() devuelve la misma cola, así que asumo que la advertencia en getQueue() aplica a una referencia a la cola de respaldo adquirida a través de mis medios.

Ejemplo

El patrón general del código es:

int n = ...; // number of threads queue = new ArrayBlockingQueue<Runnable>(queueSize); executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue); executor.prestartAllCoreThreads(); // ... while (...) { Runnable job = ...; queue.offer(job, 1, TimeUnit.HOURS); } while (jobsOutstanding.get() != 0) { try { Thread.sleep(...); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } executor.shutdownNow();

queue.offer() vs executor.execute()

Como lo entiendo, el uso típico es agregar tareas a través de executor.execute() . El enfoque en mi ejemplo anterior tiene la ventaja de bloquear en la cola mientras que execute() falla inmediatamente si la cola está llena y rechaza mi tarea. También me gusta que el envío de trabajos interactúe con una cola de bloqueo; esto me parece más "puro" productor-consumidor.

Una implicación de agregar tareas a la cola directamente: debo llamar a prestartAllCoreThreads() contrario no se están ejecutando subprocesos de trabajo. Suponiendo que no haya otras interacciones con el ejecutor, nada estará monitoreando la cola (el examen de la fuente ThreadPoolExecutor confirma). Esto también implica para la ThreadPoolExecutor directa de que ThreadPoolExecutor debe configurarse adicionalmente para> 0 hilos principales y no debe estar configurado para permitir que los hilos principales se agoten.

tl; dr

Dado un ThreadPoolExecutor configurado de la siguiente manera:

  • hilos de rosca> 0
  • subprocesos del núcleo no se les permite tiempo de espera
  • Los hilos del núcleo están pre-arranque
  • Mantener una referencia a BlockingQueue respaldando al ejecutor.

¿Es aceptable agregar tareas directamente a la cola en lugar de llamar a executor.execute() ?

Relacionado

Esta pregunta ( colas de trabajo del productor / consumidor ) es similar, pero no cubre específicamente agregar a la cola directamente.


Si es necesario, también podemos usar un estacionamiento que separe el procesamiento principal de las tareas rechazadas:

final CountDownLatch taskCounter = new CountDownLatch(TASKCOUNT); final List<Runnable> taskParking = new LinkedList<Runnable>(); BlockingQueue<Runnable> taskPool = new ArrayBlockingQueue<Runnable>(1); RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.err.println(Thread.currentThread().getName() + " -->rejection reported - adding to parking lot " + r); taskCounter.countDown(); taskParking.add(r); } }; ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.SECONDS, taskPool, rejectionHandler); for(int i=0 ; i<TASKCOUNT; i++){ //main threadPoolExecutor.submit(getRandomTask()); } taskCounter.await(TASKCOUNT * 5 , TimeUnit.SECONDS); System.out.println("Checking the parking lot..." + taskParking); while(taskParking.size() > 0){ Runnable r = taskParking.remove(0); System.out.println("Running from parking lot..." + r); if(taskParking.size() > LIMIT){ waitForSometime(...); } threadPoolExecutor.submit(r); } threadPoolExecutor.shutdown();


Si fuera yo, preferiría usar Executor#execute() sobre Queue#offer() , simplemente porque ya estoy usando todo lo demás desde java.util.concurrent .

Tu pregunta es buena y despertó mi interés, así que eché un vistazo a la fuente de ThreadPoolExecutor#execute() :

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }

Podemos ver que se ejecutan las llamadas offer() en la cola de trabajo, pero no antes de hacer algunas manipulaciones de pool agradables y sabrosas si es necesario. Por esa razón, creo que sería aconsejable utilizar execute() ; no usarlo puede (aunque no lo sé con certeza) hacer que la piscina funcione de una manera no óptima. Sin embargo, no creo que el uso de offer() rompa el ejecutor, parece que las tareas se eliminan de la cola con lo siguiente (también de ThreadPoolExecutor):

Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }

Este método getTask() solo se llama desde dentro de un bucle, por lo que si el ejecutor no se cierra, se bloqueará hasta que se asigne una nueva tarea a la cola (independientemente de la procedencia).

Nota : aunque he publicado fragmentos de código de origen aquí, no podemos confiar en ellos para obtener una respuesta definitiva, solo deberíamos estar codificando para la API. No sabemos cómo la implementación de execute() cambiará con el tiempo.


Un truco es implementar una subclase personalizada de ArrayBlockingQueue y anular el método offer () para llamar a su versión de bloqueo, entonces aún puede usar la ruta de código normal.

queue = new ArrayBlockingQueue<Runnable>(queueSize) { @Override public boolean offer(Runnable runnable) { try { return offer(runnable, 1, TimeUnit.HOURS); } catch(InterruptedException e) { // return interrupt status to caller Thread.currentThread().interrupt(); } return false; } };

(como probablemente pueda adivinar, creo que llamar a la oferta directamente en la cola, ya que su ruta de código normal es probablemente una mala idea).


Uno puede realmente configurar el comportamiento de la agrupación cuando la cola está llena, especificando un RejectedExecutionHandler en la instanciación. ThreadPoolExecutor define cuatro políticas como clases internas, incluyendo AbortPolicy , DiscardOldestPolicy , DiscardPolicy , así como mi favorito personal, CallerRunsPolicy , que ejecuta el nuevo trabajo en el hilo de control.

Por ejemplo:

ThreadPoolExecutor threadPool = new ThreadPoolExecutor( nproc, // core size nproc, // max size 60, // idle timeout TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(4096, true), // Fairness = true guarantees FIFO new ThreadPoolExecutor.CallerRunsPolicy() ); // If we have to reject a task, run it in the calling thread.

El comportamiento deseado en la pregunta se puede obtener usando algo como:

public class BlockingPolicy implements RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { executor.getQueue.put(r); // Self contained, no queue reference needed. }

En algún punto se debe acceder a la cola. El mejor lugar para hacerlo es en un RejectedExecutionHandler autocontenido, que guarda cualquier duplicación de código o errores potenciales que surjan de la manipulación directa de la cola en el ámbito del objeto del grupo. Tenga en cuenta que los manejadores incluidos en ThreadPoolExecutor usan getQueue() .