isterminated - threadpoolexecutor java 8
Tamaño del grupo principal frente al tamaño máximo del grupo en ThreadPoolExecutor (7)
¿Cuál es exactamente la diferencia entre el tamaño del grupo principal y el tamaño máximo del grupo cuando hablamos en términos de ThreadPoolExecutor? ¿Se puede explicar con la ayuda de un ejemplo?
Buena explicación en this blog:
public class ThreadPoolExecutorExample {
public static void main (String[] args) {
createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(3), "Bounded");
createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");
createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");
}
private static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,
String msg) {
System.out.println("---- " + msg + " queue instance = " +
queue.getClass()+ " -------------");
ThreadPoolExecutor e = new ThreadPoolExecutor(2, 5, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, queue);
for (int i = 0; i < 10; i++) {
try {
e.execute(new Task());
} catch (RejectedExecutionException ex) {
System.out.println("Task rejected = " + (i + 1));
}
printStatus(i + 1, e);
}
e.shutdownNow();
System.out.println("--------------------/n");
}
private static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {
StringBuilder s = new StringBuilder();
s.append("poolSize = ")
.append(e.getPoolSize())
.append(", corePoolSize = ")
.append(e.getCorePoolSize())
.append(", queueSize = ")
.append(e.getQueue()
.size())
.append(", queueRemainingCapacity = ")
.append(e.getQueue()
.remainingCapacity())
.append(", maximumPoolSize = ")
.append(e.getMaximumPoolSize())
.append(", totalTasksSubmitted = ")
.append(taskSubmitted);
System.out.println(s.toString());
}
private static class Task implements Runnable {
@Override
public void run () {
while (true) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
break;
}
}
}
}
}
Salida:
---- Bounded queue instance = class java.util.concurrent.ArrayBlockingQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueCapacity = 1, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 3, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 4, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
---- Unbounded queue instance = class java.util.concurrent.LinkedBlockingDeque -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2147483646, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueRemainingCapacity = 2147483645, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 2147483644, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 2, corePoolSize = 2, queueSize = 4, queueRemainingCapacity = 2147483643, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 2, corePoolSize = 2, queueSize = 5, queueRemainingCapacity = 2147483642, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 2, corePoolSize = 2, queueSize = 6, queueRemainingCapacity = 2147483641, maximumPoolSize = 5, totalTasksSubmitted = 8
poolSize = 2, corePoolSize = 2, queueSize = 7, queueRemainingCapacity = 2147483640, maximumPoolSize = 5, totalTasksSubmitted = 9
poolSize = 2, corePoolSize = 2, queueSize = 8, queueRemainingCapacity = 2147483639, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
---- Direct hand-off queue instance = class java.util.concurrent.SynchronousQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 3, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 4, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
Task rejected = 6
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
Task rejected = 7
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
Task rejected = 8
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------
Process finished with exit code 0
De esta entrada de blog :
Toma este ejemplo. El tamaño inicial del grupo de subprocesos es 1, el tamaño del grupo principal es 5, el tamaño máximo del grupo es 10 y la cola es 100.
A medida que llegan las solicitudes, los subprocesos se crearán hasta 5 y luego las tareas se agregarán a la cola hasta que llegue a 100. Cuando la cola esté llena, se crearán nuevos subprocesos hasta
maxPoolSize
. Una vez que todos los subprocesos estén en uso y la cola esté llena, se rechazarán las tareas. A medida que la cola se reduce, también lo hace el número de subprocesos activos.
Desde el doc :
Cuando se envía una nueva tarea en el método execute (java.lang.Runnable), y se ejecutan menos hilos de corePoolSize, se crea un nuevo hilo para manejar la solicitud, incluso si otros hilos de trabajo están inactivos. Si hay más de corePoolSize pero menos de maximumPoolSize se están ejecutando, se creará un nuevo thread solo si la cola está llena.
Además:
Al establecer corePoolSize y maximumPoolSize de la misma forma, creará un grupo de subprocesos de tamaño fijo. Al establecer maximumPoolSize en un valor esencialmente ilimitado, como Integer.MAX_VALUE, permite que la agrupación acomode un número arbitrario de tareas simultáneas. Normalmente, los tamaños de agrupación máxima y máxima se establecen solo en la construcción, pero también pueden cambiarse dinámicamente usando setCorePoolSize (int) y setMaximumPoolSize (int).
Puede encontrar la definición de los términos corepoolsize y maxpoolsize en el javadoc. http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html
El enlace de arriba tiene la respuesta a tu pregunta. Sin embargo, sólo para dejarlo claro. La aplicación seguirá creando subprocesos hasta que llegue al corePoolSize. Creo que la idea aquí es que estos muchos hilos deberían ser suficientes para manejar la entrada de tareas. Si una nueva tarea se produce después de que se crean los subprocesos corePoolSize, las tareas se pondrán en cola. Una vez que la cola esté llena, el ejecutor comenzará a crear nuevos hilos. Es una especie de equilibrio. Lo que esencialmente significa es que la entrada de tareas es más que la capacidad de procesamiento. Por lo tanto, Executor comenzará a crear nuevos subprocesos nuevamente hasta que alcance el número máximo de subprocesos. Nuevamente, se crearán nuevos hilos si y solo si la cola está llena.
SI ejecuta subprocesos> corePoolSize & <maxPoolSize , cree un nuevo subproceso si la cola de tareas Total está llena y llega una nueva.
Formulario doc: (Si hay más de corePoolSize pero menos de maximumPoolSize subprocesos en ejecución, se creará un nuevo subproceso solo si la cola está llena.)
Ahora, tomemos un ejemplo simple,
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
Aquí, 5 es el corePoolSize - significa que Jvm creará un nuevo hilo para la nueva tarea para las primeras 5 tareas. y otras tareas se agregarán a la cola hasta que la cola se llene (50 tareas).
10 es el maxPoolSize - JVM puede crear un máximo de 10 hilos. Significa si ya hay 5 tareas / subproceso en ejecución y la cola está llena con 50 tareas pendientes y si una nueva solicitud / tarea llega a la cola, JVM creará un nuevo subproceso hasta 10 (subprocesos totales = 5 anteriores + 5 nuevos) ;
El nuevo ArrayBlockingQueue (50) = es un tamaño de cola total, puede poner en cola 50 tareas.
una vez que se ejecutan los 10 subprocesos y si una nueva tarea está llegando, esa nueva tarea será rechazada.
Reglas para crear hilos internamente por SUN:
Si el número de subprocesos es menor que el corePoolSize, cree un nuevo subproceso para ejecutar una nueva tarea.
Si el número de subprocesos es igual (o mayor que) corePoolSize, ponga la tarea en la cola.
Si la cola está llena y el número de subprocesos es menor que el maxPoolSize, cree un nuevo subproceso para ejecutar tareas.
Si la cola está llena y el número de subprocesos es mayor o igual que maxPoolSize, rechace la tarea.
Hope, esto es HelpFul ... y corríjame si me equivoco ...
Si decide crear un ThreadPoolExecutor
manualmente en lugar de usar la clase de fábrica de Executors
, deberá crear y configurar uno usando uno de sus constructores. El constructor más extenso de esta clase es:
public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
);
Como puedes ver, puedes configurar:
- El tamaño de la agrupación principal (el tamaño con el que la agrupación de hilos intentará atenerse).
- El tamaño máximo de la piscina.
- El tiempo de mantenimiento, que es un tiempo después del cual un hilo inactivo es elegible para ser demolido.
- La cola de trabajo para mantener las tareas pendientes de ejecución.
- La política a aplicar cuando un envío de tarea es rechazado.
Limitar el número de tareas en cola
La limitación del número de tareas simultáneas que se ejecutan, el tamaño de su grupo de subprocesos, representa un gran beneficio para su aplicación y su entorno de ejecución en términos de previsibilidad y estabilidad: una creación de subprocesos ilimitada agotará los recursos de tiempo de ejecución y su aplicación podría experimentar como consecuencia , graves problemas de rendimiento que pueden llevar incluso a la inestabilidad de la aplicación.
Esa es una solución para solo una parte del problema: está limitando el número de tareas que se están ejecutando, pero no está limitando el número de trabajos que se pueden enviar y poner en cola para una ejecución posterior. La aplicación experimentará una escasez de recursos más adelante, pero finalmente la experimentará si la tasa de envío supera la tasa de ejecución.
La solución a este problema es: Proporcionar una cola de bloqueo al ejecutor para mantener las tareas en espera. En el caso de que la cola se llene, la tarea enviada será "rechazada". RejectedExecutionHandler
se invoca cuando se rechaza el envío de una tarea, y es por eso que el verbo rechazado se citó en el elemento anterior. Puede implementar su propia política de rechazo o usar una de las políticas integradas que proporciona el marco.
Las políticas de rechazo predeterminadas hacen que el ejecutor lance una RejectedExecutionException
. Sin embargo, otras políticas integradas le permiten:
- Deseche un trabajo en silencio.
- Deseche el trabajo más antiguo e intente volver a enviar el último.
- Ejecutar la tarea rechazada en el hilo de la persona que llama.
java.util.concurrent.ThreadPoolExecutor
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn''t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}