example - java swing que es
Esperando en una lista de Future (7)
Tengo un método que devuelve una List
de futuros
List<Future<O>> futures = getFutures();
Ahora quiero esperar hasta que todos los futuros finalicen el procesamiento correctamente o cualquiera de las tareas cuyo resultado sea devuelto por un futuro genere una excepción. Incluso si una tarea arroja una excepción, no tiene sentido esperar a los otros futuros.
El enfoque simple sería
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
Pero el problema aquí es si, por ejemplo, el cuarto futuro arroja una excepción, entonces esperaré innecesariamente para que los primeros 3 futuros estén disponibles.
¿Cómo resolver esto? ¿Contestará la ayuda del pestillo de alguna manera? No puedo usar Future isDone
porque el documento de Java dice
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
CompletionService tomará Callables con el método .submit () y podrá recuperar los futuros calculados con el método .take ().
Una cosa que no debes olvidar es terminar el ExecutorService llamando al método .shutdown (). Además, solo puede llamar a este método cuando haya guardado una referencia al servicio del ejecutor, así que asegúrese de guardar uno.
Código de ejemplo:
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
CompletionService<YourCallableImplementor> completionService =
new ExecutorCompletionService<YourCallableImplementor>(service);
ArrayList<Future<YourCallableImplementor>> futures = new ArrayList<Future<YourCallableImplementor>>();
for (String computeMe : elementsToCompute) {
futures.add(completionService.submit(new YourCallableImplementor(computeMe)));
}
//now retrieve the futures after computation (auto wait for it)
int received = 0;
while(received < elementsToCompute.size()) {
Future<YourCallableImplementor> resultFuture = completionService.take();
YourCallableImplementor result = resultFuture.get();
received ++;
}
//important: shutdown your ExecutorService
service.shutdown();
Puede usar un docs.oracle.com/javase/7/docs/api/java/util/concurrent/… . La documentación incluso tiene un ejemplo para su caso de uso exacto:
Supongamos, en cambio, que desea utilizar el primer resultado no nulo del conjunto de tareas, ignorando las excepciones y cancelando todas las demás tareas cuando la primera esté lista:
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
Lo importante a notar aquí es que ecs.take () obtendrá la primera tarea completada , no solo la primera enviada. Por lo tanto, debe obtenerlos en el orden de finalización de la ejecución (o lanzar una excepción).
Puede utilizar un CompletionService para recibir los futuros tan pronto como estén listos y si uno de ellos lanza una excepción, cancele el procesamiento. Algo como esto:
Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);
//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}
int received = 0;
boolean erros = false;
while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}
Creo que puede mejorar aún más para cancelar cualquier tarea que aún se esté ejecutando si uno de ellos arroja un error.
EDITAR: He encontrado un ejemplo más completo aquí: http://blog.teamlazerbeez.com/2009/04/29/java-completionservice/
Si está utilizando Java 8 , puede hacerlo de manera más fácil con CompletableFuture y CompletableFuture.allOf , que aplica la devolución de llamada solo después de que se hayan completado todos los CompletaFuturos proporcionados.
// Waits for all futures to complete and returns a list of results.
// If a future completes exceptionally then the resulting future will too.
public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
return CompletableFuture.allOf(cfs)
.thenApply(() -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
Si está utilizando Java 8 y no desea manipular CompletableFuture
s, he escrito una herramienta para recuperar los resultados de una List<Future<T>>
utilizando la transmisión. La clave es que tienes prohibido map(Future::get)
mientras tira.
public final class Futures
{
private Futures()
{}
public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
{
return new FutureCollector<>();
}
private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
{
private final List<Throwable> exceptions = new LinkedList<>();
@Override
public Supplier<Collection<T>> supplier()
{
return LinkedList::new;
}
@Override
public BiConsumer<Collection<T>, Future<T>> accumulator()
{
return (r, f) -> {
try
{
r.add(f.get());
}
catch (InterruptedException e)
{}
catch (ExecutionException e)
{
exceptions.add(e.getCause());
}
};
}
@Override
public BinaryOperator<Collection<T>> combiner()
{
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<Collection<T>, List<T>> finisher()
{
return l -> {
List<T> ret = new ArrayList<>(l);
if (!exceptions.isEmpty())
throw new AggregateException(exceptions, ret);
return ret;
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics()
{
return java.util.Collections.emptySet();
}
}
Esto necesita una AggregateException
que funcione como C # ''s
public class AggregateException extends RuntimeException
{
/**
*
*/
private static final long serialVersionUID = -4477649337710077094L;
private final List<Throwable> causes;
private List<?> successfulElements;
public AggregateException(List<Throwable> causes, List<?> l)
{
this.causes = causes;
successfulElements = l;
}
public AggregateException(List<Throwable> causes)
{
this.causes = causes;
}
@Override
public synchronized Throwable getCause()
{
return this;
}
public List<Throwable> getCauses()
{
return causes;
}
public List<?> getSuccessfulElements()
{
return successfulElements;
}
public void setSuccessfulElements(List<?> successfulElements)
{
this.successfulElements = successfulElements;
}
}
Este componente actúa exactamente como la Task.WaitAll C #. Task.WaitAll . Estoy trabajando en una variante que hace lo mismo que CompletableFuture.allOf
(equivalente a Task.WhenAll
)
La razón por la que hice esto es que estoy usando Spring''s ListenableFuture
y no quiero CompletableFuture
a CompletableFuture
pesar de que es una forma más estándar
Utilice CompletableFuture
en Java 8
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
logger.info("--> " + page1.get());
tal vez esto ayude (nada se reemplazaría con hilo sin procesar, ¡sí!) Sugiero ejecutar cada tipo Future
con un hilo separado (van en paralelo), luego, cuando uno de los errores tiene, simplemente señala al administrador (clase Handler
).
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}
private List<Future<Object>> getFutures() {
return null;
}
public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}
Debo decir que el código anterior sería un error (no se verificó), pero espero poder explicar la solución. por favor, inténtalo.