java - example - Bloque ThreadPoolExecutor cuando la cola está llena?
java concurrency example (4)
Estoy tratando de ejecutar muchas tareas usando un ThreadPoolExecutor. A continuación se muestra un ejemplo hipotético:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
El problema es que obtengo rápidamente una java.util.concurrent.RejectedExecutionException ya que el número de tareas excede el tamaño de la cola de trabajo. Sin embargo, el comportamiento deseado que estoy buscando es tener el bloque de hilos principal hasta que haya espacio en la cola. Cuál es la mejor manera de lograr esto?
En algunas circunstancias muy limitadas, puede implementar un java.util.concurrent.RejectedExecutionHandler que hace lo que necesita.
RejectedExecutionHandler block = new RejectedExecutionHandler() {
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue().put( r );
}
};
ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);
Ahora. Esta es una muy mala idea por los siguientes motivos
- Es propenso a un punto muerto porque todos los hilos en el conjunto pueden morir antes de que lo que coloque en la cola sea visible. Mitigue esto estableciendo un tiempo razonable de mantener vivo.
- La tarea no está envuelta de la manera que su ejecutor puede esperar. Muchas implementaciones de ejecutores envuelven sus tareas en algún tipo de objeto de seguimiento antes de la ejecución. Mira la fuente de los tuyos.
- La API desaconseja agregar mediante getQueue () y puede estar prohibido en algún momento.
Una estrategia casi siempre mejor es instalar ThreadPoolExecutor.CallerRunsPolicy que estrangulará tu aplicación ejecutando la tarea en el hilo que llama a execute ().
Sin embargo, a veces una estrategia de bloqueo, con todos sus riesgos inherentes, es realmente lo que desea. Yo diría bajo estas condiciones
- Solo tiene un hilo que ejecuta execute ()
- Tienes que (o quieres) tener una longitud de cola muy pequeña
- Es absolutamente necesario limitar el número de subprocesos que ejecutan este trabajo (generalmente por razones externas), y una estrategia de ejecución de llamada podría romper eso.
- Sus tareas son de un tamaño impredecible, por lo que las ejecuciones de llamadas pueden provocar inanición si el grupo estuvo momentáneamente ocupado con 4 tareas breves y su ejecución de llamada de un solo hilo se quedó atascada con una grande.
Entonces, como digo. Rara vez se necesita y puede ser peligroso, pero ahí lo tienes.
Buena suerte.
Compruebe la respuesta de los puntos de referencia aquí: https://.com/a/2001205 tomada del libro "Concurrencia de Java en la práctica".
Aquí está mi fragmento de código en este caso:
public void executeBlocking( Runnable command ) {
if ( threadPool == null ) {
logger.error( "Thread pool ''{}'' not initialized.", threadPoolName );
return;
}
ThreadPool threadPoolMonitor = this;
boolean accepted = false;
do {
try {
threadPool.execute( new Runnable() {
@Override
public void run() {
try {
command.run();
}
// to make sure that the monitor is freed on exit
finally {
// Notify all the threads waiting for the resource, if any.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.notifyAll();
}
}
}
} );
accepted = true;
}
catch ( RejectedExecutionException e ) {
// Thread pool is full
try {
// Block until one of the threads finishes its job and exits.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.wait();
}
}
catch ( InterruptedException ignored ) {
// return immediately
break;
}
}
} while ( !accepted );
}
threadPool es una instancia local de java.util.concurrent.ExecutorService que ya se ha inicializado.
Resolví este problema usando un RejectedExecutionHandler personalizado, que simplemente bloquea el hilo de llamada durante un momento y luego intenta enviar la tarea de nuevo:
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
Esta clase solo se puede usar en el ejecutor de grupo de hilos como un RejectedExecutionHandler como cualquier otro. En este ejemplo:
executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
El único inconveniente que veo es que el hilo de llamada puede bloquearse un poco más de lo estrictamente necesario (hasta 250 ms). Para muchas tareas de ejecución breve, quizás disminuya el tiempo de espera a 10 ms o menos. Además, dado que este ejecutor se está llamando efectivamente de forma recursiva, esperar mucho tiempo para que un hilo esté disponible (horas) puede provocar un desbordamiento de la pila.
Sin embargo, personalmente me gusta este método. Es compacto, fácil de entender y funciona bien. ¿Me estoy perdiendo algo importante?