threads - suma con hilos en java
¿Cómo hacer que ThreadPoolExecutor aumente los hilos al máximo antes de hacer cola? (8)
Me he sentido frustrado por algún tiempo con el comportamiento predeterminado de ThreadPoolExecutor
que respalda los grupos de hilos ExecutorService
que muchos de nosotros usamos. Para citar de los Javadocs:
Si hay más de corePoolSize pero menos de máximo subprocesos dePoolSize en ejecución, solo se creará un nuevo subproceso si la cola está llena .
Lo que esto significa es que si defines un grupo de subprocesos con el siguiente código, nunca iniciará el segundo subproceso porque LinkedBlockingQueue
no tiene límites.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
Solo si tiene una cola acotada y la cola está llena, se inicia cualquier subproceso que esté por encima del número central. Sospecho que una gran cantidad de programadores multiproceso de Java junior no conocen este comportamiento del ThreadPoolExecutor
.
Ahora tengo un caso de uso específico donde esto no es óptimo. Estoy buscando formas, sin escribir mi propia clase de TPE, de evitarlo.
Mis requisitos son para un servicio web que hace call-backs a un tercero posiblemente no confiable.
- No deseo realizar la devolución de llamada de forma sincrónica con la solicitud web, por lo que quiero utilizar un grupo de subprocesos.
- Normalmente obtengo un par de estos por minuto, así que no quiero tener un nuevo
newFixedThreadPool(...)
con una gran cantidad de subprocesos que en su mayoría están inactivos. - De vez en cuando recibo un estallido de este tráfico y quiero ampliar el número de subprocesos a algún valor máximo (digamos 50).
- Necesito hacer un mejor intento para hacer todas las devoluciones de llamada, así que quiero poner en cola cualquier otra por encima de 50. No quiero abrumar al resto de mi servidor web usando un
newCachedThreadPool()
.
¿Cómo puedo evitar esta limitación en ThreadPoolExecutor
donde la cola debe estar limitada y completa antes de que se inicien más hilos? ¿Cómo puedo hacer para que comience más hilos antes de poner en cola las tareas?
Editar:
@Flavio hace una buena observación sobre el uso de ThreadPoolExecutor.allowCoreThreadTimeOut(true)
para que los hilos principales expiren y salgan. Lo consideré pero aún quería la función de hilos centrales. No quería que la cantidad de subprocesos en el grupo cayera por debajo del tamaño del núcleo si era posible.
¿Cómo puedo evitar esta limitación en
ThreadPoolExecutor
donde la cola debe estar limitada y completa antes de que se inicien más hilos?
Creo que finalmente encontré una solución algo elegante (tal vez un poco chiflada) a esta limitación con ThreadPoolExecutor
. Implica extender LinkedBlockingQueue
para que devuelva false
para queue.offer(...)
cuando ya hay algunas tareas en cola. Si los subprocesos actuales no se mantienen al día con las tareas en cola, el TPE agregará subprocesos adicionales. Si el grupo ya está en el máximo de hilos, se llamará a RejectedExecutionHandler
. Es el controlador que luego pone el put(...)
en la cola.
Ciertamente es extraño escribir una cola donde offer(...)
puede devolver false
y put()
never blocks, así que esa es la parte de hack. Pero esto funciona bien con el uso de la cola por parte de TPE, así que no veo ningún problema para hacer esto.
Aquí está el código:
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
/*
* Offer it to the queue if there is 0 items already queued, else
* return false so the TPE will add another thread. If we return false
* and max threads have been reached then the RejectedExecutionHandler
* will be called which will do the put into the queue.
*/
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
/*
* This does the actual put into the queue. Once the max threads
* have been reached, the tasks will then queue up.
*/
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
Con este mecanismo, cuando envíe tareas a la cola, el ThreadPoolExecutor
:
- Escala el número de subprocesos hasta el tamaño del núcleo inicialmente (aquí 1).
- Ofrecerlo a la cola. Si la cola está vacía, se pondrá en cola para ser manejada por los hilos existentes.
- Si la cola ya tiene 1 o más elementos, la
offer(...)
devolverá falsa. - Si se devuelve falso, amplíe el número de subprocesos en el grupo hasta que alcancen el número máximo (aquí 50).
- Si está al máximo, llama a
RejectedExecutionHandler
-
RejectedExecutionHandler
luego coloca la tarea en la cola para ser procesada por la primera cadena disponible en orden FIFO.
Aunque en mi código de ejemplo anterior, la cola no tiene límites, también podría definirla como una cola acotada. Por ejemplo, si agrega una capacidad de 1000 a LinkedBlockingQueue
entonces:
- escalar los hilos hasta max
- luego haz cola hasta que esté lleno con 1000 tareas
- luego bloquea a la persona que llama hasta que haya espacio disponible para la cola.
Además, si realmente necesita usar la offer(...)
en el RejectedExecutionHandler
entonces podría usar el método de offer(E, long, TimeUnit)
lugar de Long.MAX_VALUE
como tiempo de espera.
Editar:
Modifiqué mi método de offer(...)
por comentarios de @ Ralf. Esto solo aumentará el número de subprocesos en el grupo si no se mantienen al día con la carga.
Editar:
Otro ajuste a esta respuesta podría ser preguntar al TPE si hay subprocesos inactivos y solo poner en cola el elemento si es así. Tendría que hacer una verdadera clase para esto y agregar un ourQueue.setThreadPoolExecutor(tpe);
método en él.
Entonces su método de offer(...)
podría ser algo así como:
- Compruebe si el
tpe.getPoolSize() == tpe.getMaximumPoolSize()
en cuyo caso simplemente llame asuper.offer(...)
. - De lo contrario, si
tpe.getPoolSize() > tpe.getActiveCount()
llama asuper.offer(...)
ya que parece haber hilos inactivos. - De lo contrario, devuelve
false
para bifurcar otro hilo.
Tal vez esto:
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
Tenga en cuenta que los métodos de obtención de TPE son caros, ya que acceden a campos volatile
o (en el caso de getActiveCount()
) bloquean el TPE y recorren la lista de hilos. Además, existen condiciones de carrera que pueden ocasionar que una tarea se ponga en cola incorrectamente o se bifurca otro hilo cuando hay un hilo inactivo.
Establezca el tamaño del núcleo y el tamaño máximo en el mismo valor, y permita que los hilos del núcleo se eliminen del conjunto con allowCoreThreadTimeOut(true)
.
La mejor solución que puedo pensar es extenderla.
ThreadPoolExecutor
ofrece algunos métodos de beforeExecute
: beforeExecute
y afterExecute
. En su extensión, puede mantener el uso de una cola acotada para alimentar tareas y una segunda cola ilimitada para manejar el desbordamiento. Cuando alguien llama a submit
, puede intentar colocar la solicitud en la cola acotada. Si se encuentra con una excepción, solo pega la tarea en su cola de desbordamiento. A continuación, puede utilizar el gancho afterExecute
para ver si hay algo en la cola de desbordamiento después de terminar una tarea. De esta forma, el ejecutor se encargará primero de las cosas en su cola acotada, y automáticamente extraerá de esta cola ilimitada a medida que el tiempo lo permita.
Parece más trabajo que tu solución, pero al menos no implica poner en cola comportamientos inesperados. También me imagino que hay una mejor manera de verificar el estado de la cola y los hilos en lugar de confiar en las excepciones, que son bastante lentas.
La respuesta recomendada resuelve solo uno (1) del problema con el grupo de subprocesos JDK:
Los grupos de subprocesos de JDK están sesgados hacia la cola. Entonces, en lugar de generar un nuevo hilo, pondrán la tarea en cola. Solo si la cola alcanza su límite, el grupo de subprocesos generará un nuevo subproceso.
La retirada del hilo no ocurre cuando la carga se aclara. Por ejemplo, si tenemos un estallido de trabajos que golpea el grupo que hace que el grupo llegue al máximo, seguido de una carga ligera de un máximo de 2 tareas a la vez, el grupo utilizará todos los hilos para mantener la carga ligera evitando la retirada del hilo. (solo se necesitarían 2 hilos ...)
Descontento con el comportamiento anterior, seguí adelante e implementé un grupo para superar las deficiencias anteriores.
Para resolver 2) El uso de la programación de Lifo resuelve el problema. Esta idea fue presentada por Ben Maurer en la conferencia ACM Applicative 2015: escala Systems @ Facebook
Entonces nació una nueva implementación:
Hasta ahora, esta implementación mejora el rendimiento de ejecución asíncrona para ZEL .
La implementación tiene la capacidad de girar para reducir la sobrecarga del cambio de contexto, ofreciendo un rendimiento superior para ciertos casos de uso.
Espero eso ayude...
PD: JDK Fork Join Pool implementa ExecutorService y funciona como un grupo de subprocesos "normal", la implementación es efectiva, utiliza la programación de subprocesos LIFO, sin embargo no hay control sobre el tamaño de cola interna, el tiempo de espera de jubilación ...
Nota: ahora prefiero y recomiendo mi otra respuesta .
Aquí hay una versión que me parece mucho más sencilla: aumentar el corePoolSize (hasta el límite de maximumPoolSize) siempre que se ejecute una nueva tarea, luego disminuir el corePoolSize (hasta el límite del "tamaño del grupo de núcleos" especificado por el usuario) siempre que tarea completa.
Para decirlo de otra manera, realice un seguimiento del número de tareas en ejecución o en cola, y asegúrese de que corePoolSize sea igual al número de tareas, siempre que se encuentre entre el "tamaño del grupo principal" especificado y el tamaño máximo de la cuenta.
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;
public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}
@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}
private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}
Como está escrito, la clase no admite el cambio de corePoolSize o maximumPoolSize especificado por el usuario después de la construcción, y no admite la manipulación de la cola de trabajo directamente o mediante remove()
o purge()
.
Nota: ahora prefiero y recomiendo mi otra respuesta .
Tengo otra propuesta, siguiendo con la idea original de cambiar la cola para devolver falsa. En esta, todas las tareas pueden ingresar a la cola, pero cada vez que una tarea se pone en cola después de execute()
, la seguimos con una tarea centinela no operativa que la cola rechaza, lo que hace que se engendre un nuevo hilo, que ejecutará el no-op seguido inmediatamente por algo de la cola.
Dado que los hilos de trabajo pueden sondear LinkedBlockingQueue
para una nueva tarea, es posible que una tarea se ponga en cola incluso cuando hay un hilo disponible. Para evitar generar nuevos subprocesos incluso cuando hay subprocesos disponibles, necesitamos realizar un seguimiento de cuántos subprocesos están esperando nuevas tareas en la cola y solo generar un nuevo subproceso cuando hay más tareas en cola que esperar subprocesos.
final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };
final AtomicInteger waitingThreads = new AtomicInteger(0);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
// offer returning false will cause the executor to spawn a new thread
if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
else return super.offer(e);
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.poll(timeout, unit);
} finally {
waitingThreads.decrementAndGet();
}
}
@Override
public Runnable take() throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.take();
} finally {
waitingThreads.decrementAndGet();
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
@Override
public void execute(Runnable command) {
super.execute(command);
if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
}
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r == SENTINEL_NO_OP) return;
else throw new RejectedExecutionException();
}
});
Tenemos una subclase de ThreadPoolExecutor
que toma un creationThreshold
adicional y anula la execute
.
public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}
tal vez eso ayude también, pero el suyo se ve más artístico, por supuesto ...
Ya tengo otras dos respuestas sobre esta pregunta, pero sospecho que esta es la mejor.
Se basa en la técnica de la respuesta actualmente aceptada , a saber:
- Reemplazar el método de la
offer()
a (a veces) devolver falso - lo que hace que
ThreadPoolExecutor
un nuevo hilo o rechace la tarea, y - establece el
RejectedExecutionHandler
para poner en cola la tarea en el momento del rechazo.
El problema es cuando offer()
debería devolver falso. La respuesta actualmente aceptada devuelve falso cuando la cola tiene un par de tareas, pero como he señalado en mi comentario allí, esto causa efectos no deseados. Alternativamente, si siempre devuelve falso, seguirá generando nuevos subprocesos incluso cuando haya subprocesos esperando en la cola.
La solución es usar Java 7 LinkedTransferQueue
y tener la llamada a tryTransfer()
. Cuando hay un hilo de consumidor en espera, la tarea se pasará a ese hilo. De lo contrario, offer()
devolverá false y ThreadPoolExecutor
generará un nuevo hilo.
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});