util threads thread suma resueltos que hilos example ejemplos ejemplo create concurrent con arreglo java multithreading blockingqueue threadpoolexecutor

threads - suma con hilos en java



¿Cómo hacer que ThreadPoolExecutor aumente los hilos al máximo antes de hacer cola? (8)

Me he sentido frustrado por algún tiempo con el comportamiento predeterminado de ThreadPoolExecutor que respalda los grupos de hilos ExecutorService que muchos de nosotros usamos. Para citar de los Javadocs:

Si hay más de corePoolSize pero menos de máximo subprocesos dePoolSize en ejecución, solo se creará un nuevo subproceso si la cola está llena .

Lo que esto significa es que si defines un grupo de subprocesos con el siguiente código, nunca iniciará el segundo subproceso porque LinkedBlockingQueue no tiene límites.

ExecutorService threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Solo si tiene una cola acotada y la cola está llena, se inicia cualquier subproceso que esté por encima del número central. Sospecho que una gran cantidad de programadores multiproceso de Java junior no conocen este comportamiento del ThreadPoolExecutor .

Ahora tengo un caso de uso específico donde esto no es óptimo. Estoy buscando formas, sin escribir mi propia clase de TPE, de evitarlo.

Mis requisitos son para un servicio web que hace call-backs a un tercero posiblemente no confiable.

  • No deseo realizar la devolución de llamada de forma sincrónica con la solicitud web, por lo que quiero utilizar un grupo de subprocesos.
  • Normalmente obtengo un par de estos por minuto, así que no quiero tener un nuevo newFixedThreadPool(...) con una gran cantidad de subprocesos que en su mayoría están inactivos.
  • De vez en cuando recibo un estallido de este tráfico y quiero ampliar el número de subprocesos a algún valor máximo (digamos 50).
  • Necesito hacer un mejor intento para hacer todas las devoluciones de llamada, así que quiero poner en cola cualquier otra por encima de 50. No quiero abrumar al resto de mi servidor web usando un newCachedThreadPool() .

¿Cómo puedo evitar esta limitación en ThreadPoolExecutor donde la cola debe estar limitada y completa antes de que se inicien más hilos? ¿Cómo puedo hacer para que comience más hilos antes de poner en cola las tareas?

Editar:

@Flavio hace una buena observación sobre el uso de ThreadPoolExecutor.allowCoreThreadTimeOut(true) para que los hilos principales expiren y salgan. Lo consideré pero aún quería la función de hilos centrales. No quería que la cantidad de subprocesos en el grupo cayera por debajo del tamaño del núcleo si era posible.


¿Cómo puedo evitar esta limitación en ThreadPoolExecutor donde la cola debe estar limitada y completa antes de que se inicien más hilos?

Creo que finalmente encontré una solución algo elegante (tal vez un poco chiflada) a esta limitación con ThreadPoolExecutor . Implica extender LinkedBlockingQueue para que devuelva false para queue.offer(...) cuando ya hay algunas tareas en cola. Si los subprocesos actuales no se mantienen al día con las tareas en cola, el TPE agregará subprocesos adicionales. Si el grupo ya está en el máximo de hilos, se llamará a RejectedExecutionHandler . Es el controlador que luego pone el put(...) en la cola.

Ciertamente es extraño escribir una cola donde offer(...) puede devolver false y put() never blocks, así que esa es la parte de hack. Pero esto funciona bien con el uso de la cola por parte de TPE, así que no veo ningún problema para hacer esto.

Aquí está el código:

// extend LinkedBlockingQueue to force offer() to return false conditionally BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { private static final long serialVersionUID = -6903933921423432194L; @Override public boolean offer(Runnable e) { /* * Offer it to the queue if there is 0 items already queued, else * return false so the TPE will add another thread. If we return false * and max threads have been reached then the RejectedExecutionHandler * will be called which will do the put into the queue. */ if (size() == 0) { return super.offer(e); } else { return false; } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*secs*/, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { /* * This does the actual put into the queue. Once the max threads * have been reached, the tasks will then queue up. */ executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } } });

Con este mecanismo, cuando envíe tareas a la cola, el ThreadPoolExecutor :

  1. Escala el número de subprocesos hasta el tamaño del núcleo inicialmente (aquí 1).
  2. Ofrecerlo a la cola. Si la cola está vacía, se pondrá en cola para ser manejada por los hilos existentes.
  3. Si la cola ya tiene 1 o más elementos, la offer(...) devolverá falsa.
  4. Si se devuelve falso, amplíe el número de subprocesos en el grupo hasta que alcancen el número máximo (aquí 50).
  5. Si está al máximo, llama a RejectedExecutionHandler
  6. RejectedExecutionHandler luego coloca la tarea en la cola para ser procesada por la primera cadena disponible en orden FIFO.

Aunque en mi código de ejemplo anterior, la cola no tiene límites, también podría definirla como una cola acotada. Por ejemplo, si agrega una capacidad de 1000 a LinkedBlockingQueue entonces:

  1. escalar los hilos hasta max
  2. luego haz cola hasta que esté lleno con 1000 tareas
  3. luego bloquea a la persona que llama hasta que haya espacio disponible para la cola.

Además, si realmente necesita usar la offer(...) en el RejectedExecutionHandler entonces podría usar el método de offer(E, long, TimeUnit) lugar de Long.MAX_VALUE como tiempo de espera.

Editar:

Modifiqué mi método de offer(...) por comentarios de @ Ralf. Esto solo aumentará el número de subprocesos en el grupo si no se mantienen al día con la carga.

Editar:

Otro ajuste a esta respuesta podría ser preguntar al TPE si hay subprocesos inactivos y solo poner en cola el elemento si es así. Tendría que hacer una verdadera clase para esto y agregar un ourQueue.setThreadPoolExecutor(tpe); método en él.

Entonces su método de offer(...) podría ser algo así como:

  1. Compruebe si el tpe.getPoolSize() == tpe.getMaximumPoolSize() en cuyo caso simplemente llame a super.offer(...) .
  2. De lo contrario, si tpe.getPoolSize() > tpe.getActiveCount() llama a super.offer(...) ya que parece haber hilos inactivos.
  3. De lo contrario, devuelve false para bifurcar otro hilo.

Tal vez esto:

int poolSize = tpe.getPoolSize(); int maximumPoolSize = tpe.getMaximumPoolSize(); if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) { return super.offer(e); } else { return false; }

Tenga en cuenta que los métodos de obtención de TPE son caros, ya que acceden a campos volatile o (en el caso de getActiveCount() ) bloquean el TPE y recorren la lista de hilos. Además, existen condiciones de carrera que pueden ocasionar que una tarea se ponga en cola incorrectamente o se bifurca otro hilo cuando hay un hilo inactivo.


Establezca el tamaño del núcleo y el tamaño máximo en el mismo valor, y permita que los hilos del núcleo se eliminen del conjunto con allowCoreThreadTimeOut(true) .


La mejor solución que puedo pensar es extenderla.

ThreadPoolExecutor ofrece algunos métodos de beforeExecute : beforeExecute y afterExecute . En su extensión, puede mantener el uso de una cola acotada para alimentar tareas y una segunda cola ilimitada para manejar el desbordamiento. Cuando alguien llama a submit , puede intentar colocar la solicitud en la cola acotada. Si se encuentra con una excepción, solo pega la tarea en su cola de desbordamiento. A continuación, puede utilizar el gancho afterExecute para ver si hay algo en la cola de desbordamiento después de terminar una tarea. De esta forma, el ejecutor se encargará primero de las cosas en su cola acotada, y automáticamente extraerá de esta cola ilimitada a medida que el tiempo lo permita.

Parece más trabajo que tu solución, pero al menos no implica poner en cola comportamientos inesperados. También me imagino que hay una mejor manera de verificar el estado de la cola y los hilos en lugar de confiar en las excepciones, que son bastante lentas.


La respuesta recomendada resuelve solo uno (1) del problema con el grupo de subprocesos JDK:

  1. Los grupos de subprocesos de JDK están sesgados hacia la cola. Entonces, en lugar de generar un nuevo hilo, pondrán la tarea en cola. Solo si la cola alcanza su límite, el grupo de subprocesos generará un nuevo subproceso.

  2. La retirada del hilo no ocurre cuando la carga se aclara. Por ejemplo, si tenemos un estallido de trabajos que golpea el grupo que hace que el grupo llegue al máximo, seguido de una carga ligera de un máximo de 2 tareas a la vez, el grupo utilizará todos los hilos para mantener la carga ligera evitando la retirada del hilo. (solo se necesitarían 2 hilos ...)

Descontento con el comportamiento anterior, seguí adelante e implementé un grupo para superar las deficiencias anteriores.

Para resolver 2) El uso de la programación de Lifo resuelve el problema. Esta idea fue presentada por Ben Maurer en la conferencia ACM Applicative 2015: escala Systems @ Facebook

Entonces nació una nueva implementación:

LifoThreadPoolExecutorSQP

Hasta ahora, esta implementación mejora el rendimiento de ejecución asíncrona para ZEL .

La implementación tiene la capacidad de girar para reducir la sobrecarga del cambio de contexto, ofreciendo un rendimiento superior para ciertos casos de uso.

Espero eso ayude...

PD: JDK Fork Join Pool implementa ExecutorService y funciona como un grupo de subprocesos "normal", la implementación es efectiva, utiliza la programación de subprocesos LIFO, sin embargo no hay control sobre el tamaño de cola interna, el tiempo de espera de jubilación ...


Nota: ahora prefiero y recomiendo mi otra respuesta .

Aquí hay una versión que me parece mucho más sencilla: aumentar el corePoolSize (hasta el límite de maximumPoolSize) siempre que se ejecute una nueva tarea, luego disminuir el corePoolSize (hasta el límite del "tamaño del grupo de núcleos" especificado por el usuario) siempre que tarea completa.

Para decirlo de otra manera, realice un seguimiento del número de tareas en ejecución o en cola, y asegúrese de que corePoolSize sea igual al número de tareas, siempre que se encuentre entre el "tamaño del grupo principal" especificado y el tamaño máximo de la cuenta.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor { private int userSpecifiedCorePoolSize; private int taskCount; public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); userSpecifiedCorePoolSize = corePoolSize; } @Override public void execute(Runnable runnable) { synchronized (this) { taskCount++; setCorePoolSizeToTaskCountWithinBounds(); } super.execute(runnable); } @Override protected void afterExecute(Runnable runnable, Throwable throwable) { super.afterExecute(runnable, throwable); synchronized (this) { taskCount--; setCorePoolSizeToTaskCountWithinBounds(); } } private void setCorePoolSizeToTaskCountWithinBounds() { int threads = taskCount; if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize; if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize(); setCorePoolSize(threads); } }

Como está escrito, la clase no admite el cambio de corePoolSize o maximumPoolSize especificado por el usuario después de la construcción, y no admite la manipulación de la cola de trabajo directamente o mediante remove() o purge() .


Nota: ahora prefiero y recomiendo mi otra respuesta .

Tengo otra propuesta, siguiendo con la idea original de cambiar la cola para devolver falsa. En esta, todas las tareas pueden ingresar a la cola, pero cada vez que una tarea se pone en cola después de execute() , la seguimos con una tarea centinela no operativa que la cola rechaza, lo que hace que se engendre un nuevo hilo, que ejecutará el no-op seguido inmediatamente por algo de la cola.

Dado que los hilos de trabajo pueden sondear LinkedBlockingQueue para una nueva tarea, es posible que una tarea se ponga en cola incluso cuando hay un hilo disponible. Para evitar generar nuevos subprocesos incluso cuando hay subprocesos disponibles, necesitamos realizar un seguimiento de cuántos subprocesos están esperando nuevas tareas en la cola y solo generar un nuevo subproceso cuando hay más tareas en cola que esperar subprocesos.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } }; final AtomicInteger waitingThreads = new AtomicInteger(0); BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() { @Override public boolean offer(Runnable e) { // offer returning false will cause the executor to spawn a new thread if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get(); else return super.offer(e); } @Override public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.poll(timeout, unit); } finally { waitingThreads.decrementAndGet(); } } @Override public Runnable take() throws InterruptedException { try { waitingThreads.incrementAndGet(); return super.take(); } finally { waitingThreads.decrementAndGet(); } } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) { @Override public void execute(Runnable command) { super.execute(command); if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP); } }; threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r == SENTINEL_NO_OP) return; else throw new RejectedExecutionException(); } });


Tenemos una subclase de ThreadPoolExecutor que toma un creationThreshold adicional y anula la execute .

public void execute(Runnable command) { super.execute(command); final int poolSize = getPoolSize(); if (poolSize < getMaximumPoolSize()) { if (getQueue().size() > creationThreshold) { synchronized (this) { setCorePoolSize(poolSize + 1); setCorePoolSize(poolSize); } } } }

tal vez eso ayude también, pero el suyo se ve más artístico, por supuesto ...


Ya tengo otras dos respuestas sobre esta pregunta, pero sospecho que esta es la mejor.

Se basa en la técnica de la respuesta actualmente aceptada , a saber:

  1. Reemplazar el método de la offer() a (a veces) devolver falso
  2. lo que hace que ThreadPoolExecutor un nuevo hilo o rechace la tarea, y
  3. establece el RejectedExecutionHandler para poner en cola la tarea en el momento del rechazo.

El problema es cuando offer() debería devolver falso. La respuesta actualmente aceptada devuelve falso cuando la cola tiene un par de tareas, pero como he señalado en mi comentario allí, esto causa efectos no deseados. Alternativamente, si siempre devuelve falso, seguirá generando nuevos subprocesos incluso cuando haya subprocesos esperando en la cola.

La solución es usar Java 7 LinkedTransferQueue y tener la llamada a tryTransfer() . Cuando hay un hilo de consumidor en espera, la tarea se pasará a ese hilo. De lo contrario, offer() devolverá false y ThreadPoolExecutor generará un nuevo hilo.

BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() { @Override public boolean offer(Runnable e) { return tryTransfer(e); } }; ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue); threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor.getQueue().put(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } });