practice - ¿Es una buena manera de usar java.util.concurrent.FutureTask?
java concurrency in practice (6)
En primer lugar, debo decir que soy bastante nuevo en la API java.util.concurrent, así que tal vez lo que estoy haciendo es completamente incorrecto.
¿Qué quiero hacer?
Tengo una aplicación Java que básicamente ejecuta 2 procesos separados (llamados myFirstProcess , mySecondProcess ), pero estos procesos deben ejecutarse al mismo tiempo.
Entonces, traté de hacer eso:
public void startMyApplication() {
ExecutorService executor = Executors.newFixedThreadPool(2);
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess);
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess);
executor.execute(futureOne);
executor.execute(futureTwo);
while (!(futureOne.isDone() && futureTwo.isDone())) {
try {
// I wait until both processes are finished.
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("Processing finished");
executor.shutdown();
// Do some processing on results
...
}
myFirstProcess y mySecondProcess son clases que implementan Callable<Object>
, y donde todo su procesamiento se realiza en el método call ().
Está funcionando bastante bien, pero no estoy seguro de que sea la forma correcta de hacerlo. ¿Es una buena manera de hacer lo que quiero? Si no, ¿puede darme algunos consejos para mejorar mi código (y aún así mantenerlo lo más simple posible).
Si su futureTasks es más de 2, considere [ListenableFuture][1]
.
Cuando varias operaciones comiencen tan pronto como se inicie otra operación, " despliegue en abanico ", ListenableFuture simplemente funciona: desencadena todas las devoluciones de llamada solicitadas. Con un poco más de trabajo, podemos " fan-in " o activar un ListenableFuture para calcularlo tan pronto como otros futuros hayan terminado.
Es posible que desee utilizar un CyclicBarrier si está interesado en iniciar los hilos al mismo tiempo, o esperar a que terminen y luego realizar un procesamiento adicional. Ver el javadoc para más información.
Sería mejor utilizar el método get()
.
futureOne.get();
futureTwo.get();
Ambos esperan la notificación de que el subproceso terminó de procesarse, esto le ahorra el ocupado-espera-con-temporizador que está utilizando ahora que no es eficiente ni elegante.
Como get(long timeout, TimeUnit unit)
adicional, tiene el API get(long timeout, TimeUnit unit)
que le permite definir un tiempo máximo para que el hilo duerma y esperar una respuesta, y de lo contrario continúa ejecutándose.
Vea la API de Java para más información.
Los usos de FutureTask
anteriores son tolerables, pero definitivamente no son idiomáticos. En realidad está envolviendo un FutureTask
adicional alrededor del que envió al ExecutorService
. Your FutureTask
es tratado como Runnable
por ExecutorService
. Internamente, envuelve su FutureTask
-as- Runnable
en un nuevo FutureTask
y se lo devuelve como Future<?>
.
En su lugar, debe enviar sus Callable<Object>
a un CompletionService
. Puede soltar dos Callable
s en el submit(Callable<V>)
, luego dar la vuelta y llamar a CompletionService#take()
dos veces (una vez por cada Callable
). Esas llamadas se bloquearán hasta que una y luego las otras tareas enviadas se completen.
Dado que ya tienes un Executor
en la mano, construye un nuevo ExecutorCompletionService
alrededor y deja tus tareas ahí. No gire y espere a la espera; CompletionService#take()
se bloqueará hasta que se complete una de sus tareas (ya sea que se termine de ejecutar o cancelar) o se interrumpa el hilo que está esperando en take()
.
La solución de Yuval está bien. Como alternativa, también puedes hacer esto:
ExecutorService executor = Executors.newFixedThreadPool();
FutureTask<Object> futureOne = new FutureTask<Object>(myFirstProcess);
FutureTask<Object> futureTwo = new FutureTask<Object>(mySecondProcess);
executor.execute(futureOne);
executor.execute(futureTwo);
executor.shutdown();
try {
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// interrupted
}
¿Cuál es la ventaja de este enfoque? En realidad, no hay mucha diferencia, excepto que de esta forma se evita que el ejecutor acepte más tareas (puede hacerlo a la inversa). Aunque tiendo a preferir este idioma a ese.
Además, si get () arroja una excepción, puede terminar en una parte de su código que asume que ambas tareas están hechas, lo que podría ser malo.
Puede usar el método invokeall (Colelction ....)
package concurrent.threadPool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class InvokeAll {
public static void main(String[] args) throws Exception {
ExecutorService service = Executors.newFixedThreadPool(5);
List<Future<java.lang.String>> futureList = service.invokeAll(Arrays.asList(new Task1<String>(),new Task2<String>()));
System.out.println(futureList.get(1).get());
System.out.println(futureList.get(0).get());
}
private static class Task1<String> implements Callable<String>{
@Override
public String call() throws Exception {
Thread.sleep(1000 * 10);
return (String) "1000 * 5";
}
}
private static class Task2<String> implements Callable<String>{
@Override
public String call() throws Exception {
Thread.sleep(1000 * 2);
int i=3;
if(i==3)
throw new RuntimeException("Its Wrong");
return (String) "1000 * 2";
}
}
}