java - await - CompletableFuture: ¿Esperando el primero que normalmente regresa?
java 8 promise (4)
Bueno, ese es un método que debería ser apoyado por el marco. Primero, pensé que CompletionStage.applyToEither hace algo similar, pero resulta que no lo hace. Entonces se me ocurrió esta solución:
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
final int count = stages.size();
if (count <= 0) {
throw new IllegalArgumentException("stages must not be empty");
}
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) -> {
if (exc == null) {
future.complete(val);
} else {
if (settled.incrementAndGet() >= count) {
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
}
}
};
for (CompletionStage<U> item : stages) {
item.whenComplete(consumer);
}
return future;
}
Para verlo en acción, aquí hay algunos usos:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
public class Main {
public static <U> CompletionStage<U> firstCompleted(Collection<CompletionStage<U>> stages) {
final int count = stages.size();
if (count <= 0) {
throw new IllegalArgumentException("stages must not be empty");
}
final AtomicInteger settled = new AtomicInteger();
final CompletableFuture<U> future = new CompletableFuture<U>();
BiConsumer<U, Throwable> consumer = (val, exc) -> {
if (exc == null) {
future.complete(val);
} else {
if (settled.incrementAndGet() >= count) {
// Complete with the last exception. You can aggregate all the exceptions if you wish.
future.completeExceptionally(exc);
}
}
};
for (CompletionStage<U> item : stages) {
item.whenComplete(consumer);
}
return future;
}
private static final ScheduledExecutorService worker = Executors.newSingleThreadScheduledExecutor();
public static <U> CompletionStage<U> delayed(final U value, long delay) {
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() -> {
future.complete(value);
}, delay, TimeUnit.MILLISECONDS);
return future;
}
public static <U> CompletionStage<U> delayedExceptionally(final Throwable value, long delay) {
CompletableFuture<U> future = new CompletableFuture<U>();
worker.schedule(() -> {
future.completeExceptionally(value);
}, delay, TimeUnit.MILLISECONDS);
return future;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("Started...");
/*
// Looks like applyToEither doesn''t work as expected
CompletableFuture<Integer> a = CompletableFuture.completedFuture(99);
CompletableFuture<Integer> b = Main.<Integer>completedExceptionally(new Exception("Exc")).toCompletableFuture();
System.out.println(b.applyToEither(a, x -> x).get()); // throws Exc
*/
try {
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #1"), 100));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #2"), 200));
futures.add(delayed(1, 1000));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #4"), 400));
futures.add(delayed(2, 500));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception #5"), 600));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
} catch (Exception ex) {
System.out.println("Completed exceptionally");
ex.printStackTrace();
}
try {
List<CompletionStage<Integer>> futures = new ArrayList<>();
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#1"), 400));
futures.add(Main.<Integer>delayedExceptionally(new Exception("Exception B#2"), 200));
Integer value = firstCompleted(futures).toCompletableFuture().get();
System.out.println("Completed normally: " + value);
} catch (Exception ex) {
System.out.println("Completed exceptionally");
ex.printStackTrace();
}
System.out.println("End...");
}
}
Tengo algunos CompletableFuture
s y quiero ejecutarlos en paralelo, esperando el primero que devuelve normalmente .
Sé que puedo usar CompletableFuture.anyOf
para esperar a que el primero regrese, pero esto volverá normal o excepcionalmente . Quiero ignorar las excepciones.
List<CompletableFuture<?>> futures = names.stream().map(
(String name) ->
CompletableFuture.supplyAsync(
() ->
// this calling may throw exceptions.
new Task(name).run()
)
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
logger.info(any.get().toString());
} catch (Exception e) {
e.printStackTrace();
}
Puedes usar el siguiente método de ayuda:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
l.forEach(s -> s.thenAccept(complete));
return f;
}
que puede usar de esta manera, para demostrar que ignorará las excepciones anteriores pero devolverá el primer valor proporcionado:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> { throw new RuntimeException("failing immediately"); }
),
CompletableFuture.supplyAsync(
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return "with 5s delay";
}),
CompletableFuture.supplyAsync(
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
return "with 10s delay";
})
);
CompletableFuture<String> c = anyOf(futures);
logger.info(c.join());
Una desventaja de esta solución es que nunca se completará si todos los futuros se completan excepcionalmente. Un poco más involucrado es una solución que proporcionará el primer valor si hay un cálculo exitoso, pero falla excepcionalmente si no hay un cálculo exitoso.
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
CompletableFuture.allOf(
l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
return f;
}
Utiliza el hecho de que el controlador excepcional de allOf
solo se invoca después de que todos los futuros se hayan completado (excepcionalmente o no) y que un futuro se pueda completar solo una vez (dejando cosas especiales como obtrude…
aparte). Cuando se ejecuta el controlador excepcionalmente, cualquier intento de completar el futuro con un resultado se ha realizado, si hubo uno, por lo que el intento de completarlo excepcionalmente solo tiene éxito, si no hubo una finalización exitosa previa.
Se puede usar exactamente de la misma manera que la primera solución y solo muestra un comportamiento diferente si todos los cálculos fallan, por ejemplo:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> { throw new RuntimeException("failing immediately"); }
),
CompletableFuture.supplyAsync(
// delayed to demonstrate that the solution will wait for all completions
// to ensure it doesn''t miss a possible successful computation
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
throw new RuntimeException("failing later"); }
)
);
CompletableFuture<String> c = anyOf(futures);
try { logger.info(c.join()); }
catch(CompletionException ex) { logger.severe(ex.toString()); }
El ejemplo anterior utiliza un retraso que demuestra que la solución esperará a que se complete cuando no haya éxito, mientras que este ejemplo en ideone demostrará cómo un éxito posterior convertirá el resultado en un éxito. Tenga en cuenta que debido al almacenamiento en caché de los resultados de Ideones, es posible que no note la demora.
Tenga en cuenta que en el caso de que todos los futuros fracasen, no hay garantía sobre cuál de las excepciones se informará. Dado que espera todas las finalizaciones en el caso erróneo, cualquiera podría llegar al resultado final.
Teniendo en cuenta que:
Uno de los fundamentos de la filosofía de Java es prevenir o desalentar las malas prácticas de programación.
(Hasta qué punto ha tenido éxito al hacerlo es el tema de otro debate; el punto sigue siendo que este ha sido sin duda uno de los objetivos principales de la lengua).
Ignorar las excepciones es una muy mala práctica.
Una excepción siempre debe ser devuelta a la capa superior, o manejada, o al menos informada. Específicamente, una excepción nunca debe ser tragada en silencio.
Los errores deben informarse lo antes posible.
por ejemplo, vea los problemas por los que atraviesa el tiempo de ejecución para proporcionar iteradores rápidos de falla que generan una excepción ConcurrentModificationException si la colección se modifica mientras se repite la iteración.
Ignorar un
CompletableFuture
excepcionalmente completado significa que a) no está informando un error lo antes posible, y b) es probable que planee no reportarlo en absoluto.La imposibilidad de simplemente esperar a la primera finalización no excepcional y, en su lugar, ser molestado por completaciones excepcionales no impone ninguna carga significativa, ya que siempre puede eliminar el elemento excepcionalmente completado de la lista, (aunque al mismo tiempo no se olvida de reportar el fallo, ¿verdad? ) y repetir la espera.
Por lo tanto, no me sorprendería si la característica buscada se pierde intencionalmente en Java, y estaría dispuesto a argumentar que está justamente ausente.
(Lo siento Sotirios, no hay respuesta canónica.)
esto funcionara? Devuelve una secuencia de todos los futuros que se completaron normalmente y devuelve uno de ellos.
futures.stream()
.filter(f -> {
try{
f.get();
return true;
}catch(ExecutionException | InterruptedException e){
return false;
}
})
.findAny();