hilos ejemplo java multithreading concurrency parallel-processing executorservice

java - ejemplo - ¿Cómo esperar a que todos los subprocesos terminen, utilizando ExecutorService?



executorservice java ejemplo (22)

Acabo de escribir un programa de muestra que resuelve tu problema. No se dio una implementación concisa, así que agregaré una. Si bien puede usar executor.shutdown() y executor.awaitTermination() , no es la mejor práctica, ya que el tiempo empleado por diferentes subprocesos sería impredecible.

ExecutorService es = Executors.newCachedThreadPool(); List<Callable<Integer>> tasks = new ArrayList<>(); for (int j = 1; j <= 10; j++) { tasks.add(new Callable<Integer>() { @Override public Integer call() throws Exception { int sum = 0; System.out.println("Starting Thread " + Thread.currentThread().getId()); for (int i = 0; i < 1000000; i++) { sum += i; } System.out.println("Stopping Thread " + Thread.currentThread().getId()); return sum; } }); } try { List<Future<Integer>> futures = es.invokeAll(tasks); int flag = 0; for (Future<Integer> f : futures) { Integer res = f.get(); System.out.println("Sum: " + res); if (!f.isDone()) flag = 1; } if (flag == 0) System.out.println("SUCCESS"); else System.out.println("FAILED"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }

Necesito ejecutar una cantidad de tareas 4 a la vez, algo como esto:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } //...wait for completion somehow

¿Cómo puedo recibir una notificación una vez que todos estén completos? Por ahora no puedo pensar en nada mejor que configurar un contador de tareas global y disminuirlo al final de cada tarea, luego monitorear en bucle infinito este contador para que se convierta en 0; u obtener una lista de futuros y en infinito monitor de monitorización se realizó para todos ellos. ¿Cuáles son las mejores soluciones que no involucran bucles infinitos?

Gracias.


Aquí hay dos opciones, solo un poco confundir cuál es la mejor para ir.

Opción 1:

ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown();

Opcion 2:

ExecutorService es = Executors.newFixedThreadPool(4); List< Future<?>> futures = new ArrayList<>(); for(Runnable task : taskList) { futures.add(es.submit(task)); } for(Future<?> future : futures) { try { future.get(); }catch(Exception e){ // do logging and nothing else } } es.shutdown();

Aquí poniendo future.get (); En el intento de captura es buena idea ¿verdad?


Así que publico mi respuesta de la pregunta vinculada aquí, en caso de que alguien quiera una forma más sencilla de hacer esto

ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture[] futures = new CompletableFuture[10]; int i = 0; while (...) { futures[i++] = CompletableFuture.runAsync(runner, executor); } CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.


Básicamente, en un ExecutorService usted llama a shutdown() y luego awaitTermination() :

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } taskExecutor.shutdown(); try { taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { ... }


En Java8 puedes hacerlo con CompletableFuture :

ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream() .map(task -> CompletableFuture.runAsync(task, es)) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown();


Esta es mi solución, basada en el consejo "AdamSkywalker", y funciona.

package frss.main; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestHilos { void procesar() { ExecutorService es = Executors.newFixedThreadPool(4); List<Runnable> tasks = getTasks(); CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new); CompletableFuture.allOf(futures).join(); es.shutdown(); System.out.println("FIN DEL PROCESO DE HILOS"); } private List<Runnable> getTasks() { List<Runnable> tasks = new ArrayList<Runnable>(); Hilo01 task1 = new Hilo01(); tasks.add(task1); Hilo02 task2 = new Hilo02(); tasks.add(task2); return tasks; } private class Hilo01 extends Thread { @Override public void run() { System.out.println("HILO 1"); } } private class Hilo02 extends Thread { @Override public void run() { try { sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("HILO 2"); } } public static void main(String[] args) { TestHilos test = new TestHilos(); test.procesar(); } }


Esto podría ayudar

Log.i(LOG_TAG, "shutting down executor..."); executor.shutdown(); while (true) { try { Log.i(LOG_TAG, "Waiting for executor to terminate..."); if (executor.isTerminated()) break; if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { break; } } catch (InterruptedException ignored) {} }


Hay un método en el ejecutor getActiveCount() que da el recuento de subprocesos activos.

Después de abarcar el hilo, podemos verificar si el valor de activeCount() es 0 . Una vez que el valor es cero, significa que no hay subprocesos activos actualmente en ejecución, lo que significa que la tarea ha finalizado:

while (true) { if (executor.getActiveCount() == 0) { //ur own piece of code break; } }


Java 8: podemos usar la API de transmisión para procesar la transmisión. Por favor, vea el fragmento a continuación.

final List<Runnable> tasks = ...; //or any other functional interface tasks.stream().parallel().forEach(Runnable::run) // Uses default pool //alternatively to specify parallelism new ForkJoinPool(15).submit( () -> tasks.stream().parallel().forEach(Runnable::run) ).get();



Podría usar su propia subclase de ExecutorCompletionService para ajustar taskExecutor , y su propia implementación de BlockingQueue para informarse cuando cada tarea se complete y realizar la devolución de llamada u otra acción que desee cuando el número de tareas completadas alcance su objetivo deseado.


Podrías envolver tus tareas en otro ejecutable, que enviará notificaciones:

taskExecutor.execute(new Runnable() { public void run() { taskStartedNotification(); new MyTask().run(); taskFinishedNotification(); } });


Podrías usar este código:

public class MyTask implements Runnable { private CountDownLatch countDownLatch; public MyTask(CountDownLatch countDownLatch { this.countDownLatch = countDownLatch; } @Override public void run() { try { //Do somethings // this.countDownLatch.countDown();//important } catch (InterruptedException ex) { Thread.currentThread().interrupt(); } } } CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); for (int i = 0; i < NUMBER_OF_TASKS; i++){ taskExecutor.execute(new MyTask(countDownLatch)); } countDownLatch.await(); System.out.println("Finish tasks");


Puedes llamar a waitTillDone () en esta clase Runner :

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool while(...) { runner.run(new MyTask()); // here you submit your task } runner.waitTillDone(); // and this blocks until all tasks are finished (or failed) runner.shutdown(); // once you done you can shutdown the runner

Puede reutilizar esta clase y llamar a waitTillDone () tantas veces como desee antes de llamar a shutdown (), además de que su código es extremadamente simple . Además , no tienes que saber la cantidad de tareas por adelantado.

Para usarlo, simplemente agregue esta dependencia de gradle / maven compile ''com.github.matejtymes:javafixes:1.1.1'' a su proyecto.

Más detalles se pueden encontrar aquí:

https://github.com/MatejTymes/JavaFixes

http://matejtymes.blogspot.com/2016/04/executor-that-notifies-you-when-task.html


Puedes usar listas de futuros, también:

List<Future> futures = new ArrayList<Future>(); // now add to it: futures.add(executorInstance.submit(new Callable<Void>() { public Void call() throws IOException { // do something return null; } }));

luego, cuando desea unirse a todos ellos, es esencialmente el equivalente a unirse a cada uno (con el beneficio adicional de que vuelve a generar excepciones de subprocesos secundarios a la principal):

for(Future f: this.futures) { f.get(); }

Básicamente, el truco consiste en llamar a .get () en cada uno de los Futuros a la vez, en lugar de hacer un bucle infinito, la llamada isDone () en (todos o cada uno). Por lo tanto, tiene la garantía de "seguir adelante" a través de este bloque tan pronto como finalice el último hilo. La advertencia es que, dado que la llamada .get () vuelve a generar excepciones, si uno de los subprocesos muere, podría surgir de esto posiblemente antes de que los otros subprocesos finalicen [para evitarlo, puede agregar una catch ExecutionException alrededor del obtener llamada]. La otra advertencia es que mantiene una referencia a todos los subprocesos, por lo que si tienen variables locales de subprocesos, no se recopilarán hasta después de que haya pasado este bloque (aunque podría evitarlo, si se convirtió en un problema, eliminando El futuro está fuera del ArrayList). Si quisiera saber qué Futuro "termina primero", podría usar algo como https://.com/a/31885029/32453


Sólo mis dos centavos. Para superar el requisito de CountDownLatch para saber la cantidad de tareas de antemano, puede hacerlo a la antigua usando un Semaphore simple.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); int numberOfTasks=0; Semaphore s=new Semaphore(0); while(...) { taskExecutor.execute(new MyTask()); numberOfTasks++; } try { s.aquire(numberOfTasks); ...

En su tarea, simplemente llame a s.release() como lo haría latch.countDown();


Siga uno de los siguientes enfoques.

  1. Iterar a través de todas las tareas Future , devuelto desde el submit en ExecutorService y verificar el estado con el bloqueo de la llamada get() en el objeto Future como lo sugiere Kiran
  2. Utilice invokeAll() en ExecutorService
  3. CountDownLatch
  4. ForkJoinPool o Executors.html#newWorkStealingPool
  5. Use shutdown, awaitTermination, shutdownNow APIs de ThreadPoolExecutor en la secuencia correcta

Preguntas de SE relacionadas:

¿Cómo se usa CountDownLatch en Java Multithreading?

Cómo apagar adecuadamente el servicio de ejecución de Java


Solo para proporcionar más alternativas aquí diferentes para usar pestillos / barreras. También puede obtener los resultados parciales hasta que todos ellos terminen usando CompletionService .

Desde la concurrencia de Java en la práctica: "Si tiene un lote de cálculos para enviar a un Ejecutor y desea recuperar sus resultados a medida que estén disponibles, puede conservar el Futuro asociado con cada tarea y repetidamente sondear la finalización llamando a get con un tiempo de espera de cero. Esto es posible, pero tedioso . Afortunadamente hay una mejor manera : un servicio de finalización ".

Aquí la implementación

public class TaskSubmiter { private final ExecutorService executor; TaskSubmiter(ExecutorService executor) { this.executor = executor; } void doSomethingLarge(AnySourceClass source) { final List<InterestedResult> info = doPartialAsyncProcess(source); CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor); for (final InterestedResult interestedResultItem : info) completionService.submit(new Callable<PartialResult>() { public PartialResult call() { return InterestedResult.doAnOperationToGetPartialResult(); } }); try { for (int t = 0, n = info.size(); t < n; t++) { Future<PartialResult> f = completionService.take(); PartialResult PartialResult = f.get(); processThisSegment(PartialResult); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { throw somethinghrowable(e.getCause()); } } }


Un poco tarde para el juego, pero para completar ...

En lugar de "esperar" a que todas las tareas terminen, puedes pensar en términos del principio de Hollywood: "no me llames, te llamaré", cuando termine. Creo que el código resultante es más elegante ...

La guayaba ofrece algunas herramientas interesantes para lograr esto.

Un ejemplo ::

Envuelva un ExecutorService en un ListeningExecutorService ::

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Presentar una colección de callables para su ejecución:

for (Callable<Integer> callable : callables) { ListenableFuture<Integer> lf = service.submit(callable); // listenableFutures is a collection listenableFutures.add(lf) });

Ahora la parte esencial:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

Adjunte una devolución de llamada a ListenableFuture, que puede utilizar para recibir una notificación cuando se completen todos los futuros:

Futures.addCallback(lf, new FutureCallback<List<Integer>>() { @Override public void onSuccess(List<Integer> result) { log.info("@@ finished processing {} elements", Iterables.size(result)); // do something with all the results } @Override public void onFailure(Throwable t) { log.info("@@ failed because of :: {}", t); } });

Esto también ofrece la ventaja de que puede recopilar todos los resultados en un solo lugar una vez que finalice el procesamiento ...

Mas informacion here


Utilice un CountDownLatch :

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks); ExecutorService taskExecutor = Executors.newFixedThreadPool(4); while(...) { taskExecutor.execute(new MyTask()); } try { latch.await(); } catch (InterruptedException E) { // handle }

y dentro de su tarea (encierre en prueba / finalmente)

latch.countDown();


debe usar el método executorService.shutdown() y executorService.awaitTermination .

Un ejemplo de la siguiente manera:

public class ScheduledThreadPoolExample { public static void main(String[] args) throws InterruptedException { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); executorService.scheduleAtFixedRate(() -> System.out.println("process task."), 0, 1, TimeUnit.SECONDS); TimeUnit.SECONDS.sleep(10); executorService.shutdown(); executorService.awaitTermination(1, TimeUnit.DAYS); } }


ExecutorService.invokeAll() hace por usted.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4); List<Callable<?>> tasks; // your tasks // invokeAll() returns when all tasks are complete List<Future<?>> futures = taskExecutor.invokeAll(tasks);