java multithreading concurrency completion-service

java - ¿Cuándo debería usar un CompletionService en lugar de ExecutorService?



multithreading concurrency (8)

Básicamente, utiliza un CompletionService si desea ejecutar múltiples tareas en paralelo y luego trabajar con ellas en su orden de finalización. Entonces, si ejecuto 5 trabajos, el CompletionService me dará el primero que termine. El ejemplo en el que solo hay una tarea no confiere ningún valor extra a un Executor aparte de la capacidad de presentar un Callable .

Acabo de encontrar CompletionService en esta publicación de blog . Sin embargo, esto realmente no muestra las ventajas de CompletionService sobre un ExecutorService estándar. El mismo código se puede escribir con cualquiera. Entonces, ¿cuándo es útil un CompletionService?

¿Puedes dar una muestra de código corto para que sea claro como el cristal? Por ejemplo, este ejemplo de código solo muestra dónde no se necesita un CompletionService (= equivalente a ExecutorService)

ExecutorService taskExecutor = Executors.newCachedThreadPool(); // CompletionService<Long> taskCompletionService = // new ExecutorCompletionService<Long>(taskExecutor); Callable<Long> callable = new Callable<Long>() { @Override public Long call() throws Exception { return 1L; } }; Future<Long> future = // taskCompletionService.submit(callable); taskExecutor.submit(callable); while (!future.isDone()) { // Do some work... System.out.println("Working on something..."); } try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }


Con ExecutorService , una vez que haya enviado las tareas para ejecutar, necesita codificar manualmente para obtener de manera eficiente los resultados de las tareas completadas.

Con CompletionService , esto está prácticamente automatizado. La diferencia no es muy evidente en el código que ha presentado porque está enviando solo una tarea. Sin embargo, imagine que tiene una lista de tareas para enviar. En el siguiente ejemplo, se envían múltiples tareas al CompletionService. Luego, en lugar de tratar de averiguar qué tarea se ha completado (para obtener los resultados), solo le pide a la instancia CompletionService que devuelva los resultados a medida que estén disponibles.

public class CompletionServiceTest { class CalcResult { long result ; CalcResult(long l) { result = l; } } class CallableTask implements Callable<CalcResult> { String taskName ; long input1 ; int input2 ; CallableTask(String name , long v1 , int v2 ) { taskName = name; input1 = v1; input2 = v2 ; } public CalcResult call() throws Exception { System.out.println(" Task " + taskName + " Started -----"); for(int i=0;i<input2 ;i++) { try { Thread.sleep(200); } catch (InterruptedException e) { System.out.println(" Task " + taskName + " Interrupted !! "); e.printStackTrace(); } input1 += i; } System.out.println(" Task " + taskName + " Completed @@@@@@"); return new CalcResult(input1) ; } } public void test(){ ExecutorService taskExecutor = Executors.newFixedThreadPool(3); CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor); int submittedTasks = 5; for (int i=0;i< submittedTasks;i++) { taskCompletionService.submit(new CallableTask ( String.valueOf(i), (i * 10), ((i * 10) + 10 ) )); System.out.println("Task " + String.valueOf(i) + "subitted"); } for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) { try { System.out.println("trying to take from Completion service"); Future<CalcResult> result = taskCompletionService.take(); System.out.println("result for a task availble in queue.Trying to get()"); // above call blocks till atleast one task is completed and results availble for it // but we dont have to worry which one // process the result here by doing result.get() CalcResult l = result.get(); System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result)); } catch (InterruptedException e) { // Something went wrong with a task submitted System.out.println("Error Interrupted exception"); e.printStackTrace(); } catch (ExecutionException e) { // Something went wrong with the result e.printStackTrace(); System.out.println("Error get() threw exception"); } } } }


Creo que el javadoc responde mejor a la pregunta de cuándo el CompletionService es útil de una manera que no es ExecutorService .

Un servicio que desacopla la producción de nuevas tareas asíncronas del consumo de los resultados de las tareas completadas.

Básicamente, esta interfaz permite que un programa tenga productores que creen y presenten tareas (e incluso examinen los resultados de esas presentaciones) sin conocer a ningún otro consumidor los resultados de esas tareas. Mientras tanto, los consumidores que conocen el CompletionService pueden poll o take resultados sin conocer a los productores que envían las tareas.

Para el registro, y podría estar equivocado porque es bastante tarde, pero estoy bastante seguro de que el código de muestra en esa publicación de blog provoca una pérdida de memoria. Sin un consumidor activo que saque los resultados de la cola interna de ExecutorCompletionService , no estoy seguro de cómo el blogger esperaba que la cola se agotara.


En primer lugar, si no queremos perder el tiempo del procesador, no usaremos

while (!future.isDone()) { // Do some work... }

Debemos usar

service.shutdown(); service.awaitTermination(14, TimeUnit.DAYS);

Lo malo de este código es que cerrará ExecutorService . Si queremos continuar trabajando con él (es decir, tenemos alguna creación de tareas recursivas), tenemos dos alternativas: invokeAll o ExecutorService .

invokeAll esperará hasta que se completen todas las tareas. ExecutorService nos otorga la capacidad de tomar o sondear resultados uno por uno.

Y, finalmente, ejemplo recursivo:

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER); ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService); while (Tasks.size() > 0) { for (final Task task : Tasks) { completionService.submit(new Callable<String>() { @Override public String call() throws Exception { return DoTask(task); } }); } try { int taskNum = Tasks.size(); Tasks.clear(); for (int i = 0; i < taskNum; ++i) { Result result = completionService.take().get(); if (result != null) Tasks.add(result.toTask()); } } catch (InterruptedException e) { // error :( } catch (ExecutionException e) { // error :( } }


Omitiendo muchos detalles:

  • ExecutorService = cola entrante + hilos de trabajo
  • CompletionService = cola entrante + hilos de trabajo + cola de salida

Si el productor de la tarea no está interesado en los resultados y es responsabilidad de otro componente procesar los resultados de la tarea asíncrona ejecutada por el servicio del ejecutor, entonces debe usar CompletionService. Le ayuda a separar el procesador de resultados de tareas del productor de tareas. Ver ejemplo http://www.zoftino.com/java-concurrency-executors-framework-tutorial


Supongamos que tiene 5 tareas de ejecución larga (tarea invocable) y que ha enviado esas tareas al servicio de ejecución. Ahora imagina que no quieres esperar a que compitan las 5 tareas, sino que quieres hacer algún tipo de procesamiento en estas tareas si las completas. Ahora esto se puede hacer escribiendo lógica de sondeo en objetos futuros o usando esta API.