streams procesamiento print parte libreria examples ejemplo datos con collection java asynchronous java-8 java-stream

procesamiento - stream java 8 ejemplo



¿Es posible utilizar Java 8 Streams API para el procesamiento asíncrono? (4)

Como @KarolKrol alude a usted, puede hacerlo con una secuencia de CompletableFuture .

Hay una biblioteca que se basa en las corrientes JDK8 para facilitar el trabajo con flujos de CompletableFuture llamados cyclops-react .

Para componer tus streams, puedes usar la API ike API prometedora de cyclops-reaction o puedes usar Stages simple-reaction.

He estado jugando con CompletionStage / CompletableFuture en Java 8 para hacer un procesamiento asíncrono, que funciona bastante bien. Sin embargo, a veces quiero que una etapa realice un procesamiento asincrónico de un iterador / secuencia de elementos, y no parece haber una manera de hacerlo.

Específicamente, Stream.forEach () tiene la semántica de que después de la llamada se han procesado todos los elementos. Me gustaría lo mismo, pero con un CompletionStage en su lugar, por ejemplo:

CompletionStage<Void> done = stream.forEach(...); done.thenRun(...);

Si Stream está respaldado por un resultado de transmisión asíncrona, sería mucho mejor que esperar a que se complete en el código anterior.

¿Es posible construir esto con la API Java 8 actual de alguna manera? ¿Soluciones provisionales?


Es posible producir una secuencia, asignar cada elemento a CompletionStage y recopilar los resultados usando CompletionStage.thenCombine() , pero el código resultante no será más legible que el uso simple de este modo.

CompletionStage<Collection<Result>> collectionStage = CompletableFuture.completedFuture( new LinkedList<>() ); for (Request request : requests) { CompletionStage<Result> resultStage = performRequest(request); collectionStage = collectionStage.thenCombine( resultStage, (collection, result) -> { collection.add(result); return collection; } ); } return collectionStage;

Este ejemplo puede transformarse fácilmente en funcional para que cada uno no pierda legibilidad. Pero el uso de reduce o collect requiere un código adicional no tan fino.

Actualización: CompletableFuture.allOf y CompletableFuture.join proporcionan otra forma más legible de transformar la recopilación de resultados futuros para la futura recopilación de resultados.


Hasta donde yo sé, la API de secuencias no admite el procesamiento de eventos asíncronos. Parece que quieres algo como Reactive Extensions para .NET, y hay un puerto de Java llamado RxJava , creado por Netflix.

RxJava admite muchas de las mismas operaciones de alto nivel que las secuencias Java 8 (como mapa y filtro) y es asincrónico.

Actualización : ahora hay una iniciativa de reactive-streams.org en proceso, y parece que JDK 9 incluirá soporte para al menos una parte de la misma a través de la clase Flow .


cyclops-react (soy el autor de esta biblioteca), proporciona una clase de StreamUtils para procesar flujos. Una de las funciones que proporciona es futureOperations, que proporciona acceso a las operaciones estándar del terminal Stream (y algunas más) con un giro: el flujo se ejecuta de forma asincrónica y el resultado se devuelve dentro de un CompletableFuture. .p.ej

Stream<Integer> stream = Stream.of(1,2,3,4,5,6) .map(i->i+2); CompletableFuture<List<Integer>> asyncResult = StreamUtils.futureOperations(stream, Executors.newFixedThreadPool(1)) .collect(Collectors.toList());

También hay una clase de convience ReactiveSeq que envuelve Stream y proporciona la misma funcionalidad, con una API fluida

CompletableFuture<List<Integer>> asyncResult = ReactiveSeq.of(1,2,3,4,5,6) .map(i->i+2) .futureOperations( Executors.newFixedThreadPool(1)) .collect(Collectors.toList());

Como Adam ha señalado que cyclops-react reaction FutureStreams está diseñado para procesar datos de forma asincrónica (al mezclar Futures y Streams juntos), es especialmente adecuado para operaciones de subprocesos múltiples que implican bloqueo de E / S (como leer archivos, realizar llamadas a bases de datos, hacer llamadas de descanso, etc.).