ejemplos - Entendiendo Java FixedThreadPool
java concurrency (5)
Está creando 200 objetos usando la new Task()
y esas tareas se envían al ejecutor. Los ejecutores mantienen una referencia a este objeto de Task
. Entonces, si en el constructor de la Task
está construyendo y reteniendo recursos, todas las 200 Tareas contendrán los recursos.
Si es posible, puede construir y usar el recurso en el método de llamada de Task
si no quiere que 200 instancias construyan y mantengan los recursos. En ese caso, solo 3 Task
a la vez construirán y mantendrán el recurso.
Estoy tratando de entender cómo funciona Java FixedThreadPool en la práctica, pero los documentos no responden a mi pregunta.
Supongamos un escenario simple como:
ExecutorService ES= Executors.newFixedThreadPool(3);
List<Future> FL;
for(int i=1;i<=200;i++){
FL.add(ES.submit(new Task()));
}
ES.shutdown();
donde Task
es un Callable
que construye algunos recursos, los usa y devuelve algunos resultados.
Mi pregunta : ¿cuántas Task
hay en la memoria al completar el bucle for
? En otras palabras: ¿habrá solo 3 Task
a la vez construyendo sus recursos, o todos ellos se crearán por adelantado, de modo que, después de .submit
, tengo 200 Task
(y sus recursos) en espera de ejecutarse?
Nota : la construcción de recursos ocurre en el constructor de Task
, no en el método call()
.
En el javadoc (siéntase libre de omitir lo siguiente): lo que me confunde es la siguiente explicación en la documentación de Java
Crea un grupo de subprocesos que reutiliza un número fijo de subprocesos que operan en una cola compartida ilimitada. En cualquier momento, como máximo, las hebras nThreads serán tareas de procesamiento activas.
Supongo que esto significa que, en mi ejemplo, todas las 200 tareas están en la cola, pero solo 3 de ellas se ejecutan en cualquier momento.
Cualquier ayuda es muy apreciada.
Las 200 tareas se crean y consumen los recursos, y todas ellas están en la cola.
El grupo de subprocesos, solo invoca su método run () / call () uno por uno, cuando hay un subproceso libre disponible para la ejecución.
Las tareas se eliminarán una a una de la cola, de modo que a medida que la ejecución continúe, las tareas se eliminarán y solo el resultado de ellas se almacenará en esos objetos futuros.
Así que básicamente en la memoria:
3 hilos
200 -> 0 Tarea
0 -> 200 Futuro
(con cada tarea ejecutada)
Para comprender esto, deberá ver qué sucede cuando envía la tarea al Ejecutor en un bucle. Primero solo veremos el envío de una sola tarea al Ejecutor. Ahora me JDK 1.7.0_51
código fuente JDK 1.7.0_51
El método estático Executor.newFixedThreadPool
método devuelve un ThreadPoolExecutor
contiene una cola de bloqueo para mantener la tarea
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
En el momento en que agrega una tarea a este Ejecutor, este va al método de ThreadPoolExecutor
de ThreadPoolExecutor
extiende AbstractExecutorService
donde se escribe la implementación del método de envío.
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
El método de ejecución es específico de la implementación (es decir, los diferentes tipos de Ejecutores lo implementan de diferentes maneras)
Ahora viene la verdadera carne. Este es el método de ejecución definido en ThreadPoolExecutor
. Especial atención en los comentarios. Aquí corePoolSize
en juego algunos parámetros de configuración de ThreadPoolExecutor como corePoolSize
.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn''t, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Su código es equivalente a
for (int i = 1; i <= 200; i++){
Task t = new Task();
FL.add(ES.submit(t));
}
Y después del bucle for, el constructor de Tarea ha sido llamado 200 veces, y el código que contiene ha sido ejecutado 200 veces. Si la tarea se envía a un ejecutor o no es irrelevante: está invocando a un constructor 200 veces en un bucle, y después de que se haya construido cada tarea, se envía al ejecutor. El ejecutor no es el que llama al constructor de tareas.