completionstage completablefuture await async java concurrency java-8 completable-future

completablefuture - java 8 promise



Listar la secuencia<Future> to Future<List> (8)

Además de la biblioteca de Spotify Futures, puede probar mi código ubicado aquí: https://github.com/vsilaev/java-async-await/blob/master/net.tascalate.async.examples/src/main/java/net/tascalate/concurrent/CompletionStages.java (tiene dependencias de otras clases en el mismo paquete)

Implementa una lógica para devolver "al menos N de M" CompletionStage-s con una política de cuántos errores está permitido tolerar. Existen métodos convenientes para todos / todos los casos, más una política de cancelación para los futuros restantes, más el código trata con CompletionStage-s (interfaz) en lugar de CompletableFuture (clase concreta).

Estoy tratando de convertir List<CompletableFuture<X>> a CompletableFuture<List<T>> . Esto es bastante útil como cuando tiene muchas tareas asincrónicas y necesita obtener resultados de todas ellas.

Si alguno de ellos falla, el futuro final falla. Así es como lo he implementado:

public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) { if(com.isEmpty()){ throw new IllegalArgumentException(); } Stream<? extends CompletableFuture<T>> stream = com.stream(); CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>()); return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> { x.add(y); return x; },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> { ls1.addAll(ls2); return ls1; },exec)); }

Para ejecutarlo:

ExecutorService executorService = Executors.newCachedThreadPool(); Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> { try { Thread.sleep((long) (Math.random() * 10)); } catch (InterruptedException e) { e.printStackTrace(); } return x; }, executorService)); CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

Si alguno de ellos falla, entonces falla. Da salida como se esperaba incluso si hay un millón de futuros. El problema que tengo es: digamos que si hay más de 5000 futuros y si alguno de ellos falla, obtengo un StackOverflowError :

Excepción en el thread "pool-1-thread-2611" java.lang.StackOverflowError en java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) en java.util.concurrent.CompletableFuture $ ThenCompose.run (CompletableFuture.java : 1487) en java.util.concurrent.CompletableFuture.postComplete (CompletableFuture.java:193) en java.util.concurrent.CompletableFuture.internalComplete (CompletableFuture.java:210) en java.util.concurrent.CompletableFuture $ ThenCompose.run CompletableFuture.java:1487)

¿Qué estoy haciendo mal?

Nota: El futuro devuelto anterior falla correctamente cuando falla alguno de los futuros. La respuesta aceptada también debe tomar este punto.


Como Misha ha señalado , estás usando en exceso …Async Operaciones …Async . Además, está componiendo una compleja cadena de operaciones que modela una dependencia que no refleja la lógica de su programa:

  • crea un trabajo x que depende del primer y segundo trabajo de su lista
  • crea un trabajo x + 1 que depende del trabajo xy del tercer trabajo de su lista
  • crea un trabajo x + 2 que depende del trabajo x + 1 y el cuarto trabajo de su lista
  • ...
  • crea un trabajo x + 5000 que depende del trabajo x + 4999 y el último trabajo de su lista

Luego, al cancelar (explícitamente o debido a una excepción), este trabajo compuesto recursivamente podría realizarse recursivamente y podría fallar con un Error . Eso depende de la implementación.

Como ya lo mostró Misha , hay un método, allOf , que le permite modelar su intención original, para definir un trabajo que depende de todos los trabajos de su lista.

Sin embargo, vale la pena señalar que incluso eso no es necesario. Dado que está utilizando un ejecutor de grupo de subprocesos ilimitado, simplemente puede publicar un trabajo asincrónico recogiendo los resultados en una lista y listo. Esperar a que se complete está implícito al pedir el resultado de cada trabajo de todos modos.

ExecutorService executorService = Executors.newCachedThreadPool(); List<CompletableFuture<Integer>> que = IntStream.range(0, 100000) .mapToObj(x -> CompletableFuture.supplyAsync(() -> { LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10))); return x; }, executorService)).collect(Collectors.toList()); CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync( () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()), executorService);

El uso de métodos para componer operaciones dependientes es importante, cuando el número de subprocesos es limitado y los trabajos pueden generar trabajos asincrónicos adicionales, para evitar que trabajos en espera roben subprocesos de trabajos que deben completarse primero, pero tampoco es el caso aquí.

En este caso específico, un trabajo simplemente iterando sobre este gran número de trabajos de requisitos previos y esperar si es necesario puede ser más eficiente que modelar este gran número de dependencias y hacer que cada trabajo notifique al trabajo dependiente sobre la finalización.


Para agregar a la respuesta aceptada por @Misha, se puede ampliar aún más como recopilador:

public static <T> Collector<CompletableFuture<T>, ?, CompletableFuture<List<T>>> sequenceCollector() { return Collectors.collectingAndThen(Collectors.toList(), com -> sequence(com)); }

Ahora usted puede:

Stream<CompletableFuture<Integer>> stream = Stream.of( CompletableFuture.completedFuture(1), CompletableFuture.completedFuture(2), CompletableFuture.completedFuture(3) ); CompletableFuture<List<Integer>> ans = stream.collect(sequenceCollector());


Puede obtener la biblioteca CompletableFutures de Spotify y usar el método allAsList . Creo que está inspirado en el método Futures.allAsList de Guava.

public static <T> CompletableFuture<List<T>> allAsList( List<? extends CompletionStage<? extends T>> stages) {

Y aquí hay una implementación simple si no desea utilizar una biblioteca:

public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) { return CompletableFuture.allOf( futures.toArray(new CompletableFuture[futures.size()]) ).thenApply(ignored -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList()) ); }


Una operación de secuencia de ejemplo usando thenCombine en CompletableFuture

public<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com){ CompletableFuture<List<T>> identity = CompletableFuture.completedFuture(new ArrayList<T>()); BiFunction<CompletableFuture<List<T>>,CompletableFuture<T>,CompletableFuture<List<T>>> combineToList = (acc,next) -> acc.thenCombine(next,(a,b) -> { a.add(b); return a;}); BinaryOperator<CompletableFuture<List<T>>> combineLists = (a,b)-> a.thenCombine(b,(l1,l2)-> { l1.addAll(l2); return l1;}) ; return com.stream() .reduce(identity, combineToList, combineLists); } }

Si no le importa usar bibliotecas de terceros, cyclops-react (soy el autor) tiene un conjunto de métodos de utilidad para CompletableFutures (y Opcionales, Streams, etc.)

List<CompletableFuture<String>> listOfFutures; CompletableFuture<ListX<String>> sequence =CompletableFutures.sequence(listOfFutures);


Utilice CompletableFuture.allOf(...) :

static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) { return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(toList()) ); }

Algunos comentarios sobre su implementación:

Su uso de .thenComposeAsync , .thenApplyAsync y .thenCombineAsync probablemente no esté haciendo lo que espera. Estos ...Async métodos ...Async ejecutan la función que se les proporciona en un hilo separado. Entonces, en su caso, está causando que se agregue el nuevo elemento a la lista para ejecutarse en el ejecutor proporcionado. No es necesario meter operaciones livianas en un ejecutor de subprocesos en caché. No use los thenXXXXAsync sin una buena razón.

Además, reduce no debe usarse para acumularse en contenedores mutables. Aunque podría funcionar correctamente cuando la secuencia es secuencial, fallará si la secuencia se hace paralela. Para realizar una reducción mutable, use .collect en .collect lugar.

Si desea completar todo el cálculo excepcionalmente inmediatamente después del primer error, haga lo siguiente en su método de sequence :

CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[com.size()])) .thenApply(v -> com.stream() .map(CompletableFuture::join) .collect(toList()) ); com.forEach(f -> f.whenComplete((t, ex) -> { if (ex != null) { result.completeExceptionally(ex); } })); return result;

Si, además, desea cancelar las operaciones restantes en el primer fallo, agregue exec.shutdownNow(); justo después de result.completeExceptionally(ex); . Esto, por supuesto, supone que exec solo existe para este cálculo. Si no es así, tendrá que recorrer y cancelar cada Future restante individualmente.



Descargo de responsabilidad: Esto no responderá completamente la pregunta inicial. Carecerá la parte de "falla todo si uno falla". Sin embargo, no puedo responder la pregunta real, más genérica, porque se cerró como un duplicado de esta: Java 8 CompletableFuture.allOf (...) con Colección o Lista . Entonces responderé aquí:

¿Cómo convertir List<CompletableFuture<V>> a CompletableFuture<List<V>> utilizando la API de flujo de Java 8?

Resumen: use lo siguiente:

private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) { CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>()); BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) -> futureValue.thenCombine(futureList, (value, list) -> { List<V> newList = new ArrayList<>(list.size() + 1); newList.addAll(list); newList.add(value); return newList; }); BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> { List<V> newList = new ArrayList<>(list1.size() + list2.size()); newList.addAll(list1); newList.addAll(list2); return newList; }); return listOfFutures.stream().reduce(identity, accumulator, combiner); }

Ejemplo de uso:

List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads) .mapToObj(i -> loadData(i, executor)).collect(toList()); CompletableFuture<List<String>> futureList = sequence(listOfFutures);

Ejemplo completo:

import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; import java.util.function.BiFunction; import java.util.function.BinaryOperator; import java.util.stream.IntStream; import static java.util.stream.Collectors.toList; public class ListOfFuturesToFutureOfList { public static void main(String[] args) { ListOfFuturesToFutureOfList test = new ListOfFuturesToFutureOfList(); test.load(10); } public void load(int numThreads) { final ExecutorService executor = Executors.newFixedThreadPool(numThreads); List<CompletableFuture<String>> listOfFutures = IntStream.range(0, numThreads) .mapToObj(i -> loadData(i, executor)).collect(toList()); CompletableFuture<List<String>> futureList = sequence(listOfFutures); System.out.println("Future complete before blocking? " + futureList.isDone()); // this will block until all futures are completed List<String> data = futureList.join(); System.out.println("Loaded data: " + data); System.out.println("Future complete after blocking? " + futureList.isDone()); executor.shutdown(); } public CompletableFuture<String> loadData(int dataPoint, Executor executor) { return CompletableFuture.supplyAsync(() -> { ThreadLocalRandom rnd = ThreadLocalRandom.current(); System.out.println("Starting to load test data " + dataPoint); try { Thread.sleep(500 + rnd.nextInt(1500)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Successfully loaded test data " + dataPoint); return "data " + dataPoint; }, executor); } private <V> CompletableFuture<List<V>> sequence(List<CompletableFuture<V>> listOfFutures) { CompletableFuture<List<V>> identity = CompletableFuture.completedFuture(new ArrayList<>()); BiFunction<CompletableFuture<List<V>>, CompletableFuture<V>, CompletableFuture<List<V>>> accumulator = (futureList, futureValue) -> futureValue.thenCombine(futureList, (value, list) -> { List<V> newList = new ArrayList<>(list.size() + 1); newList.addAll(list); newList.add(value); return newList; }); BinaryOperator<CompletableFuture<List<V>>> combiner = (futureList1, futureList2) -> futureList1.thenCombine(futureList2, (list1, list2) -> { List<V> newList = new ArrayList<>(list1.size() + list2.size()); newList.addAll(list1); newList.addAll(list2); return newList; }); return listOfFutures.stream().reduce(identity, accumulator, combiner); } }