hilos executorcompletionservice ejemplo awaittermination java concurrency threadpool executorservice

executorcompletionservice - Java: ExecutorService que bloquea en el envío después de un cierto tamaño de cola



java threadpoolexecutor (7)

Estoy intentando codificar una solución en la que un solo hilo produce tareas intensivas de E / S que se pueden realizar en paralelo. Cada tarea tiene datos significativos en la memoria. Así que quiero poder limitar el número de tareas que están pendientes en un momento.

Si creo ThreadPoolExecutor de esta manera:

ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(maxQueue));

Luego, executor.submit(callable) lanza la RejectedExecutionException cuando la cola se llena y todos los subprocesos ya están ocupados.

¿Qué puedo hacer para hacer que se executor.submit(callable) bloque de executor.submit(callable) cuando la cola está llena y todos los subprocesos están ocupados?

EDIT : he intentado this :

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

Y de alguna manera logra el efecto que quiero que se logre, pero de manera poco elegante (básicamente, los subprocesos rechazados se ejecutan en el subproceso de llamada, por lo que esto impide que el subproceso de la llamada envíe más).

EDITAR: (5 años después de hacer la pregunta)

Para cualquier persona que lea esta pregunta y sus respuestas, no tome la respuesta aceptada como una solución correcta. Por favor, lea todas las respuestas y comentarios.


Aquí es cómo resolví esto en mi final:

(nota: esta solución bloquea el subproceso que envía el Callable, por lo que evita que se lance RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{ private final Semaphore semaphore; public BoundedExecutor(int bound) { super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); semaphore = new Semaphore(bound); } /**Submits task to execution pool, but blocks while number of running threads * has reached the bound limit */ public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{ semaphore.acquire(); return submit(task); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); semaphore.release(); } }


Creo que es tan simple como usar un ArrayBlockingQueue lugar de un LinkedBlockingQueue .

Ignorame ... eso es totalmente incorrecto ThreadPoolExecutor llama a Queue#offer no se put que tendría el efecto que necesita.

Podría extender ThreadPoolExecutor y proporcionar una implementación de execute(Runnable) que las llamadas se put en lugar de la offer .

Eso no parece ser una respuesta completamente satisfactoria, me temo.


He hecho lo mismo. El truco es crear un BlockingQueue donde el método offer () es realmente un put (). (Puedes usar la base que necesites de BlockingQueue).

public class LimitedQueue<E> extends LinkedBlockingQueue<E> { public LimitedQueue(int maxSize) { super(maxSize); } @Override public boolean offer(E e) { // turn offer() and add() into a blocking calls (unless interrupted) try { put(e); return true; } catch(InterruptedException ie) { Thread.currentThread().interrupt(); } return false; } }

Tenga en cuenta que esto solo funciona para el grupo de subprocesos donde corePoolSize==maxPoolSize así que tenga cuidado allí (vea los comentarios).


He implementado una solución siguiendo el patrón de decorador y utilizando un semáforo para controlar el número de tareas ejecutadas. Puedes usarlo con cualquier Executor y:

  • Especificar el máximo de tareas en curso.
  • Especifique el tiempo de espera máximo para esperar un permiso de ejecución de la tarea (si el tiempo de espera pasa y no se adquiere ningún permiso, se lanza una RejectedExecutionException )

import static java.util.concurrent.TimeUnit.MILLISECONDS; import java.time.Duration; import java.util.Objects; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.Semaphore; import javax.annotation.Nonnull; public class BlockingOnFullQueueExecutorDecorator implements Executor { private static final class PermitReleasingDecorator implements Runnable { @Nonnull private final Runnable delegate; @Nonnull private final Semaphore semaphore; private PermitReleasingDecorator(@Nonnull final Runnable task, @Nonnull final Semaphore semaphoreToRelease) { this.delegate = task; this.semaphore = semaphoreToRelease; } @Override public void run() { try { this.delegate.run(); } finally { // however execution goes, release permit for next task this.semaphore.release(); } } @Override public final String toString() { return String.format("%s[delegate=''%s'']", getClass().getSimpleName(), this.delegate); } } @Nonnull private final Semaphore taskLimit; @Nonnull private final Duration timeout; @Nonnull private final Executor delegate; public BlockingOnFullQueueExecutorDecorator(@Nonnull final Executor executor, final int maximumTaskNumber, @Nonnull final Duration maximumTimeout) { this.delegate = Objects.requireNonNull(executor, "''executor'' must not be null"); if (maximumTaskNumber < 1) { throw new IllegalArgumentException(String.format("At least one task must be permitted, not ''%d''", maximumTaskNumber)); } this.timeout = Objects.requireNonNull(maximumTimeout, "''maximumTimeout'' must not be null"); if (this.timeout.isNegative()) { throw new IllegalArgumentException("''maximumTimeout'' must not be negative"); } this.taskLimit = new Semaphore(maximumTaskNumber); } @Override public final void execute(final Runnable command) { Objects.requireNonNull(command, "''command'' must not be null"); try { // attempt to acquire permit for task execution if (!this.taskLimit.tryAcquire(this.timeout.toMillis(), MILLISECONDS)) { throw new RejectedExecutionException(String.format("Executor ''%s'' busy", this.delegate)); } } catch (final InterruptedException e) { // restore interrupt status Thread.currentThread().interrupt(); throw new IllegalStateException(e); } this.delegate.execute(new PermitReleasingDecorator(command, this.taskLimit)); } @Override public final String toString() { return String.format("%s[availablePermits=''%s'',timeout=''%s'',delegate=''%s'']", getClass().getSimpleName(), this.taskLimit.availablePermits(), this.timeout, this.delegate); } }


La respuesta actualmente aceptada tiene un problema potencialmente importante: cambia el comportamiento de ThreadPoolExecutor.execute de modo que si tiene un corePoolSize < maxPoolSize , la lógica ThreadPoolExecutor nunca agregará trabajadores adicionales más allá del núcleo.

Desde ThreadPoolExecutor .execute (ejecutable):

if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command);

Específicamente, ese último bloque ''else'' nunca será golpeado.

Una mejor alternativa es hacer algo similar a lo que OP ya está haciendo: use un RejectedExecutionHandler para hacer la misma lógica de colocación:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { if (!executor.isShutdown()) { executor.getQueue().put(r); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e); } }

Hay algunas cosas a tener en cuenta con este enfoque, como se señala en los comentarios (refiriéndose a esta respuesta ):

  1. Si corePoolSize==0 , entonces hay una condición de carrera donde todos los subprocesos de la agrupación pueden morir antes de que la tarea sea visible
  2. El uso de una implementación que envuelva las tareas de la cola (no aplicable a ThreadPoolExecutor ) dará lugar a problemas a menos que el controlador también lo ajuste de la misma manera.

Teniendo en cuenta esos errores, esta solución funcionará para la mayoría de los Ejecutores de ThreadPool, y manejará adecuadamente el caso donde corePoolSize < maxPoolSize .


Sé que esta es una pregunta antigua, pero tenía un problema similar de que la creación de nuevas tareas era muy rápida y si se producían demasiados OutOfMemoryError porque la tarea existente no se había completado lo suficientemente rápido.

En mi caso, los Callables se envían y necesito el resultado, por lo que necesito almacenar todos los Futures devueltos por executor.submit() . Mi solución fue colocar los Futures en un BlockingQueue con un tamaño máximo. Una vez que la cola está llena, no se generan más tareas hasta que se completan algunas (elementos eliminados de la cola). En pseudocódigo:

final ExecutorService executor = Executors.newFixedThreadPool(numWorkerThreads); final LinkedBlockingQueue<Future> futures = new LinkedBlockingQueue<>(maxQueueSize); try { Thread taskGenerator = new Thread() { @Override public void run() { while (reader.hasNext) { Callable task = generateTask(reader.next()); Future future = executor.submit(task); try { // if queue is full blocks until a task // is completed and hence no future tasks are submitted. futures.put(compoundFuture); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } executor.shutdown(); } } taskGenerator.start(); // read from queue as long as task are being generated // or while Queue has elements in it while (taskGenerator.isAlive() || !futures.isEmpty()) { Future compoundFuture = futures.take(); // do something } } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } catch (ExecutionException ex) { throw new MyException(ex); } finally { executor.shutdownNow(); }


Tuve el problema similar y lo implementé utilizando los beforeExecute/afterExecute de ThreadPoolExecutor :

import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** * Blocks current task execution if there is not enough resources for it. * Maximum task count usage controlled by maxTaskCount property. */ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTaskCount) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); this.maxTaskCount = maxTaskCount; } /** * Executes task if there is enough system resources for it. Otherwise * waits. */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // Spin while we will not have enough capacity for this job while (maxTaskCount < currentTaskCount) { try { unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; } finally { taskLock.unlock(); } } /** * Signalling that one more task is welcome */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { currentTaskCount--; unpaused.signalAll(); } finally { taskLock.unlock(); } } }

Esto debería ser lo suficientemente bueno para ti. Por cierto, la implementación original se basó en el tamaño de la tarea porque una tarea podría ser 100 veces más grande que otra y enviar dos tareas enormes fue matar la caja, pero ejecutar una grande y bastante pequeña fue correcto. Si sus tareas de E / S intensivas son aproximadamente del mismo tamaño que podría usar esta clase, de lo contrario, hágamelo saber y publicaré una implementación basada en el tamaño.

PS ThreadPoolExecutor comprobar ThreadPoolExecutor javadoc. Es realmente agradable la guía de usuario de Doug Lea sobre cómo se puede personalizar fácilmente.