thread hilos example concurrent java multithreading concurrency executorservice

example - pool de hilos java



Java ExecutorService: awaitTermination de todas las tareas creadas recursivamente (10)

Utilizo un ExecutorService para ejecutar una tarea. Esta tarea puede crear recursivamente otras tareas que se envían al mismo ExecutorService y esas tareas secundarias también pueden hacer eso.

Ahora tengo el problema de que quiero esperar hasta que se hayan completado todas las tareas (es decir, todas las tareas hayan finalizado y no hayan enviado nuevas) antes de continuar.

No puedo llamar a ExecutorService.shutdown() en el hilo principal porque esto impide que el ExecutorService acepte nuevas tareas.

Y Calling ExecutorService.awaitTermination() parece no hacer nada si no se ha llamado al shutdown .

Así que estoy un poco atrapado aquí. No puede ser tan difícil para ExecutorService ver que todos los trabajadores están inactivos, ¿o sí? La única solución poco elegante que pude encontrar es usar directamente un ThreadPoolExecutor y consultar su getPoolSize() vez en cuando. ¿Realmente no hay mejor manera de hacer eso?


La única solución poco elegante que pude encontrar es usar directamente un ThreadPoolExecutor y consultar su getPoolSize () de vez en cuando. ¿Realmente no hay mejor manera de hacer eso?

Debe utilizar los métodos shutdown() , awaitTermination () and shutdownNow() en una secuencia adecuada.

shutdown() : inicia un cierre ordenado en el que se ejecutan las tareas enviadas anteriormente, pero no se aceptarán nuevas tareas.

awaitTermination() : Bloquea hasta que todas las tareas hayan completado la ejecución después de una solicitud de cierre, o se agote el tiempo de espera, o se interrumpa el hilo actual, lo que ocurra primero.

shutdownNow() : intenta detener todas las tareas que se ejecutan activamente, detiene el procesamiento de las tareas en espera y devuelve una lista de las tareas que estaban esperando su ejecución.

Forma recomendada desde la página de documentación de ExecutorService de ExecutorService :

void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }

Puede reemplazar la condición con la condición while en caso de una larga duración en la finalización de las tareas de la siguiente manera:

Cambio

if (!pool.awaitTermination(60, TimeUnit.SECONDS))

A

while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000); }

Puede consultar otras alternativas (excepto join() , que se pueden usar con subprocesos independientes) en:

Espera hasta que todos los hilos terminen su trabajo en java.


(mea culpa: es un ''bit'' pasado mi hora de dormir;) pero aquí hay un primer intento de un cierre dinámico):

package oss.alphazero.sto4958330; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class DynamicCountDownLatch { @SuppressWarnings("serial") private static final class Sync extends AbstractQueuedSynchronizer { private final CountDownLatch toplatch; public Sync() { setState(0); this.toplatch = new CountDownLatch(1); } @Override protected int tryAcquireShared(int acquires){ try { toplatch.await(); } catch (InterruptedException e) { throw new RuntimeException("Interrupted", e); } return getState() == 0 ? 1 : -1; } public boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } public boolean tryExtendState(int acquires) { for (;;) { int s = getState(); int exts = s+1; if (compareAndSetState(s, exts)) { toplatch.countDown(); return exts > 0; } } } } private final Sync sync; public DynamicCountDownLatch(){ this.sync = new Sync(); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void countDown() { sync.releaseShared(1); } public void join() { sync.tryExtendState(1); } }

Este pestillo introduce un nuevo método join () para la API CountDownLatch existente (clonada), que las tareas utilizan para señalar su entrada en el grupo de tareas más grande.

El pestillo se pasa de la tarea principal a la tarea secundaria. Cada tarea, según el patrón de Suraj, primero ''unirá ()'' el pestillo, hará su tarea (), y luego cuenta abajo ().

Para abordar las situaciones en las que el subproceso principal inicia el grupo de tareas y luego espera inmediatamente (), antes de que cualquiera de los subprocesos de la tarea haya tenido la oportunidad de unirse (), se utiliza topLatch la clase Sync interna. Este es un pestillo que se contará en cada combinación (); solo la primera cuenta regresiva es por supuesto significativa, ya que todas las subsecuentes son nops.

La implementación inicial anterior introduce una especie de arruga semántica ya que se supone que el tryAcquiredShared (int) no está lanzando una InterruptedException, pero entonces necesitamos lidiar con la interrupción de la espera en el topLatch.

¿Es esto una mejora sobre la propia solución de OP usando contadores atómicos? Yo diría que probablemente no IFF, él insiste en usar Ejecutores, pero es, creo, un enfoque alternativo igualmente válido que usa el AQS en ese caso, y también se puede usar con hilos genéricos.

Destruye a tus compañeros hackers.


Debo decir que las soluciones descritas anteriormente del problema con la tarea de llamada recursiva y esperar las tareas del suborden final no me satisfacen. Mi solución está inspirada en la documentación original de Oracle: CountDownLatch y, por ejemplo, recursos humanos CountDownLatch .

El primer hilo común en proceso en el caso de la clase HRManagerCompact tiene el pestillo de espera para dos subprocesos de hija, que tiene pestillos de espera para los subsiguientes subprocesos de 2 hijas ... etc.

Por supuesto, el latch se puede establecer en un valor diferente a 2 (en el constructor de CountDownLatch), así como el número de objetos ejecutables en la iteración, es decir, ArrayList, pero debe corresponder (el número de cuentas regresivas debe ser igual al parámetro en el constructor CountDownLatch).

Tenga cuidado, el número de cierres aumenta exponencialmente según la condición de restricción: ''level.get () <2'', así como el número de objetos. 1, 2, 4, 8, 16 ... y pestillos 0, 1, 2, 4 ... Como puede ver, para cuatro niveles (level.get () <4) habrá 15 hilos en espera y 7 pestillos en el tiempo, cuando el pico 16 hilos se están ejecutando.

package processes.countdownlatch.hr; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** Recursively latching running classes to wait for the peak threads * * @author hariprasad */ public class HRManagerCompact extends Thread { final int N = 2; // number of daughter''s tasks for latch CountDownLatch countDownLatch; CountDownLatch originCountDownLatch; AtomicInteger level = new AtomicInteger(0); AtomicLong order = new AtomicLong(0); // id latched thread waiting for HRManagerCompact techLead1 = null; HRManagerCompact techLead2 = null; HRManagerCompact techLead3 = null; // constructor public HRManagerCompact(CountDownLatch countDownLatch, String name, AtomicInteger level, AtomicLong order){ super(name); this.originCountDownLatch=countDownLatch; this.level = level; this.order = order; } private void doIt() { countDownLatch = new CountDownLatch(N); AtomicInteger leveli = new AtomicInteger(level.get() + 1); AtomicLong orderi = new AtomicLong(Thread.currentThread().getId()); techLead1 = new HRManagerCompact(countDownLatch, "first", leveli, orderi); techLead2 = new HRManagerCompact(countDownLatch, "second", leveli, orderi); //techLead3 = new HRManagerCompact(countDownLatch, "third", leveli); techLead1.start(); techLead2.start(); //techLead3.start(); try { synchronized (Thread.currentThread()) { // to prevent print and latch in the same thread System.out.println("*** HR Manager waiting for recruitment to complete... " + level + ", " + order + ", " + orderi); countDownLatch.await(); // wait actual thread } System.out.println("*** Distribute Offer Letter, it means finished. " + level + ", " + order + ", " + orderi); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + ": working... " + level + ", " + order + ", " + Thread.currentThread().getId()); Thread.sleep(10*level.intValue()); if (level.get() < 2) doIt(); Thread.yield(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } /*catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); }*/ // TODO Auto-generated method stub System.out.println("--- " +Thread.currentThread().getName() + ": recruted " + level + ", " + order + ", " + Thread.currentThread().getId()); originCountDownLatch.countDown(); // count down } public static void main(String args[]){ AtomicInteger levelzero = new AtomicInteger(0); HRManagerCompact hr = new HRManagerCompact(null, "zero", levelzero, new AtomicLong(levelzero.longValue())); hr.doIt(); } }

Posible salida comentada (con alguna probabilidad):

first: working... 1, 1, 10 // thread 1, first daughter''s task (10) second: working... 1, 1, 11 // thread 1, second daughter''s task (11) first: working... 2, 10, 12 // thread 10, first daughter''s task (12) first: working... 2, 11, 14 // thread 11, first daughter''s task (14) second: working... 2, 11, 15 // thread 11, second daughter''s task (15) second: working... 2, 10, 13 // thread 10, second daughter''s task (13) --- first: recruted 2, 10, 12 // finished 12 --- first: recruted 2, 11, 14 // finished 14 --- second: recruted 2, 10, 13 // finished 13 (now can be opened latch 10) --- second: recruted 2, 11, 15 // finished 15 (now can be opened latch 11) *** HR Manager waiting for recruitment to complete... 0, 0, 1 *** HR Manager waiting for recruitment to complete... 1, 1, 10 *** Distribute Offer Letter, it means finished. 1, 1, 10 // latch on 10 opened --- first: recruted 1, 1, 10 // finished 10 *** HR Manager waiting for recruitment to complete... 1, 1, 11 *** Distribute Offer Letter, it means finished. 1, 1, 11 // latch on 11 opened --- second: recruted 1, 1, 11 // finished 11 (now can be opened latch 1) *** Distribute Offer Letter, it means finished. 0, 0, 1 // latch on 1 opened


Este es realmente un candidato ideal para un Phaser. Java 7 está saliendo con esta nueva clase. Es un CountdonwLatch / CyclicBarrier flexible. Puede obtener una versión estable en JSR 166 Interest Site .

La forma en que es un CountdownLatch / CyclicBarrier más flexible es porque no solo es compatible con un número desconocido de partes (hilos) sino que también es reutilizable (es ahí donde entra la parte de fase)

Para cada tarea que envíes, te registrarás; cuando la hayas completado, llegarás. Esto se puede hacer recursivamente.

Phaser phaser = new Phaser(); ExecutorService e = // Runnable recursiveRunnable = new Runnable(){ public void run(){ //do work recursively if you have to if(shouldBeRecursive){ phaser.register(); e.submit(recursiveRunnable); } phaser.arrive(); } } public void doWork(){ int phase = phaser.getPhase(); phaser.register(); e.submit(recursiveRunnable); phaser.awaitAdvance(phase); }

Edit: Gracias @depthofreality por señalar la condición de carrera en mi ejemplo anterior. Lo estoy actualizando para que el hilo de ejecución solo espere el avance de la fase actual, ya que bloquea para que se complete la función recursiva.

El número de fase no se disparará hasta que el número arrive s == register s. Dado que antes de cada invocación recursiva invoca register , se producirá un incremento de fase cuando se completen todas las invocaciones.


Puede crear su propio grupo de subprocesos que amplíe ThreadPoolExecutor . Desea saber cuándo se ha enviado una tarea y cuándo se completa.

public class MyThreadPoolExecutor extends ThreadPoolExecutor { private int counter = 0; public MyThreadPoolExecutor() { super(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); } @Override public synchronized void execute(Runnable command) { counter++; super.execute(command); } @Override protected synchronized void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); counter--; notifyAll(); } public synchronized void waitForExecuted() throws InterruptedException { while (counter == 0) wait(); } }


Si desea usar clases JSR166y, por ejemplo, Phaser o Fork / Join, cualquiera de las cuales podría funcionar para usted, siempre puede descargar el backport de Java 6 desde: http://gee.cs.oswego.edu/dl/concurrency-interest/ y usar eso como base en lugar de escribir una solución completamente casera. Luego, cuando salga el 7, simplemente puede eliminar la dependencia en el puerto trasero y cambiar algunos nombres de paquetes.

(Revelación completa: hemos estado usando LinkedTransferQueue en prod durante un tiempo. Sin problemas)


Si inicialmente se desconoce la cantidad de tareas en el árbol de tareas recursivas, quizás la forma más fácil sea implementar su propia primitiva de sincronización, una especie de "semáforo inverso", y compartirla entre sus tareas. Antes de enviar cada tarea, incrementa un valor, cuando la tarea se completa, disminuye ese valor y espera hasta que el valor sea 0.

Implementarlo como una primitiva separada explícitamente llamada de tareas desacopla esta lógica de la implementación del grupo de subprocesos y le permite enviar varios árboles independientes de tareas recursivas en el mismo grupo.

Algo como esto:

public class InverseSemaphore { private int value = 0; private Object lock = new Object(); public void beforeSubmit() { synchronized(lock) { value++; } } public void taskCompleted() { synchronized(lock) { value--; if (value == 0) lock.notifyAll(); } } public void awaitCompletion() throws InterruptedException { synchronized(lock) { while (value > 0) lock.wait(); } } }

Tenga en cuenta que se debe llamar a taskCompleted() dentro de un bloque finally , para hacerlo inmune a posibles excepciones.

También tenga en cuenta que beforeSubmit() debe ser llamado por el hilo de envío antes de enviar la tarea, no por la tarea en sí, para evitar la posible "falsa finalización" cuando se completan tareas antiguas y aún no se inician nuevas.

EDITAR: Problema importante con el patrón de uso reparado.


Use un Future para sus tareas (en lugar de enviar Runnable ''s), una devolución de llamada actualiza su estado cuando se completa, por lo que puede usar Future.isDone para rastrear el estado de todas sus tareas.


Utilice CountDownLatch . Pase el objeto CountDownLatch a cada una de sus tareas y codifique sus tareas como a continuación.

public void doTask() { // do your task latch.countDown(); }

Considerando que el hilo que debe esperar debe ejecutar el siguiente código:

public void doWait() { latch.await(); }

Pero, por supuesto, esto supone que ya conoce la cantidad de tareas secundarias para poder iniciar el conteo del pestillo.


Wow, ustedes son rápidos :)

Gracias por todas las sugerencias. Los futuros no se integran fácilmente con mi modelo porque no sé cuántos runnables están programados de antemano. Así que si mantengo viva una tarea principal solo para esperar a que finalicen las tareas secundarias recursivas tengo mucha basura por ahí.

Resolví mi problema usando la sugerencia de AtomicInteger. Básicamente, he subclasificado ThreadPoolExecutor e incremento el contador en llamadas a execute () y decremento en llamadas a afterExecute (). Cuando el contador obtiene 0 llamo a shutdown (). Esto parece funcionar para mis problemas, no estoy seguro de que sea una buena manera de hacerlo. Especialmente, supongo que solo usas execute () para agregar Runnables.

Como nodo lateral: primero traté de registrar después de Ejecutar () el número de Runnables en la cola y el número de trabajadores que están activos y el apagado cuando esos son 0; pero eso no funcionó porque no todos los Runnables aparecieron en la cola y getActiveCount () tampoco hizo lo que yo esperaba.

De todos modos, aquí está mi solución: (si alguien encuentra problemas serios con esto, hágamelo saber :)

public class MyThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicInteger executing = new AtomicInteger(0); public MyThreadPoolExecutor(int coorPoolSize, int maxPoolSize, long keepAliveTime, TimeUnit seconds, BlockingQueue<Runnable> queue) { super(coorPoolSize, maxPoolSize, keepAliveTime, seconds, queue); } @Override public void execute(Runnable command) { //intercepting beforeExecute is too late! //execute() is called in the parent thread before it terminates executing.incrementAndGet(); super.execute(command); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); int count = executing.decrementAndGet(); if(count == 0) { this.shutdown(); } } }