tutorial suma streams procesamiento parte libreria funcional ejemplo datos con collection java lambda java-8 java-stream

java - suma - Copie una secuencia para evitar que "la transmisión ya haya sido operada o cerrada"



stream java 8 ejemplo (8)

Creo que su suposición sobre la eficiencia es un poco al revés. Obtiene esta enorme rentabilidad si solo va a utilizar los datos una vez, porque no tiene que almacenarlos, y las transmisiones le ofrecen potentes optimizaciones de "fusión de bucle" que le permiten transmitir toda la información de manera eficiente a través de la canalización.

Si desea volver a utilizar los mismos datos, entonces, por definición, debe generarlos dos veces (de manera determinista) o almacenarlos. Si ya está en una colección, genial; luego iterarlo dos veces es barato.

Experimentamos en el diseño con "streams bifurcados". Lo que encontramos fue que apoyar esto tenía costos reales; abrumaba el caso común (uso una vez) a expensas del caso poco común. El gran problema fue lidiar con "lo que sucede cuando los dos conductos no consumen datos a la misma velocidad". Ahora has vuelto al almacenamiento en búfer de todos modos. Esta fue una característica que claramente no tuvo su peso.

Si desea operar los mismos datos repetidamente, almacénelos o estructure sus operaciones como Consumidores y haga lo siguiente:

stream()...stuff....forEach(e -> { consumerA(e); consumerB(e); });

También puede consultar la biblioteca RxJava, ya que su modelo de procesamiento se presta mejor a este tipo de "bifurcación de flujo".

Me gustaría duplicar una secuencia de Java 8 para poder manejarla dos veces. Puedo collect como una lista y obtener nuevas transmisiones de eso;

// doSomething() returns a stream List<A> thing = doSomething().collect(toList()); thing.stream()... // do stuff thing.stream()... // do other stuff

Pero creo que debería haber una manera más eficiente / elegante.

¿Hay alguna forma de copiar la transmisión sin convertirla en una colección?

De hecho, estoy trabajando con una secuencia de Either de los Either , así que quiero procesar la proyección de la izquierda de una manera antes de pasar a la proyección correcta y lidiar con eso de otra manera. Algo así (que, hasta ahora, me veo forzado a usar el truco de toList ).

List<Either<Pair<A, Throwable>, A>> results = doSomething().collect(toList()); Stream<Pair<A, Throwable>> failures = results.stream().flatMap(either -> either.left()); failures.forEach(failure -> ... ); Stream<A> successes = results.stream().flatMap(either -> either.right()); successes.forEach(success -> ... );


Hemos implementado un método duplicate() para las transmisiones en jOOλ , una biblioteca de código abierto que creamos para mejorar las pruebas de integración para jOOQ . Básicamente, puedes escribir:

Tuple2<Seq<A>, Seq<A>> duplicates = Seq.seq(doSomething()).duplicate();

Internamente, hay un búfer que almacena todos los valores que se han consumido de una secuencia, pero no de la otra. Probablemente sea lo más eficiente posible si tus dos flujos se consumen al mismo ritmo y si puedes vivir con la falta de seguridad de hilos .

Así es como funciona el algoritmo:

static <T> Tuple2<Seq<T>, Seq<T>> duplicate(Stream<T> stream) { final List<T> gap = new LinkedList<>(); final Iterator<T> it = stream.iterator(); @SuppressWarnings("unchecked") final Iterator<T>[] ahead = new Iterator[] { null }; class Duplicate implements Iterator<T> { @Override public boolean hasNext() { if (ahead[0] == null || ahead[0] == this) return it.hasNext(); return !gap.isEmpty(); } @Override public T next() { if (ahead[0] == null) ahead[0] = this; if (ahead[0] == this) { T value = it.next(); gap.offer(value); return value; } return gap.poll(); } } return tuple(seq(new Duplicate()), seq(new Duplicate())); }

Más código fuente aquí

Tuple2 es probablemente como su tipo de Pair , mientras que Seq es Stream con algunas mejoras.


Otra forma de manejar los elementos varias veces es usar Stream.peek(Consumer) :

doSomething().stream() .peek(either -> handleFailure(either.left())) .foreach(either -> handleSuccess(either.right()));

peek(Consumer) se puede encadenar tantas veces como sea necesario.

doSomething().stream() .peek(element -> handleFoo(element.foo())) .peek(element -> handleBar(element.bar())) .peek(element -> handleBaz(element.baz())) .foreach(element-> handleQux(element.qux()));


Para este problema en particular, puede usar también particionamiento. Algo como

// Partition Eighters into left and right List<Either<Pair<A, Throwable>, A>> results = doSomething(); Map<Boolean, Object> passingFailing = results.collect(Collectors.partitioningBy(s -> s.isLeft())); passingFailing.get(true) <- here will be all passing (left values) passingFailing.get(false) <- here will be all failing (right values)


Puede crear una secuencia de ejecutables (por ejemplo):

results.stream() .flatMap(either -> Stream.<Runnable> of( () -> failure(either.left()), () -> success(either.right()))) .forEach(Runnable::run);

Donde el failure y el success son las operaciones a aplicar. Sin embargo, esto creará bastantes objetos temporales y puede que no sea más eficiente que comenzar desde una colección y reproducirla / iterarla dos veces.


Use java.util.function.Supplier .

De http://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/ :

Reutilizando Streams

Las corrientes Java 8 no se pueden reutilizar. Tan pronto como llame a cualquier operación de terminal, la transmisión se cerrará:

Stream<String> stream = Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); stream.anyMatch(s -> true); // ok stream.noneMatch(s -> true); // exception

Llamar a noneMatch después de anyMatch en la misma secuencia da como resultado la siguiente excepción:

java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229) at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459) at com.winterbe.java8.Streams5.test7(Streams5.java:38) at com.winterbe.java8.Streams5.main(Streams5.java:28)

Para superar esta limitación, tenemos que crear una nueva cadena de flujo para cada operación de terminal que queremos ejecutar, por ejemplo, podríamos crear un proveedor de flujo para construir una nueva corriente con todas las operaciones intermedias ya configuradas:

Supplier<Stream<String>> streamSupplier = () -> Stream.of("d2", "a2", "b1", "b3", "c") .filter(s -> s.startsWith("a")); streamSupplier.get().anyMatch(s -> true); // ok streamSupplier.get().noneMatch(s -> true); // ok

Cada llamada a get() construye una nueva secuencia en la que estamos guardados para llamar a la operación de terminal deseada.


Use el proveedor para producir la secuencia para cada operación de terminación.

Supplier <Stream<Integer>> streamSupplier=()->list.stream();

Siempre que necesite una transmisión de esa colección, use streamSupplier.get() para obtener una nueva transmisión.

Ejemplos:

  1. streamSupplier.get().anyMatch(predicate);
  2. streamSupplier.get().allMatch(predicate2);

cyclops-react , una biblioteca a la que contribuyo, tiene un método estático que te permitirá duplicar un Stream (y devuelve una tupla jOOλ de Streams).

Stream<Integer> stream = Stream.of(1,2,3); Tuple2<Stream<Integer>,Stream<Integer>> streams = StreamUtils.duplicate(stream);

Consulte los comentarios, existe una penalización de rendimiento que se incurrirá al usar duplicados en un Stream existente. Una alternativa más eficaz sería usar Streamable:

También hay una clase Streamable (perezosa) que se puede construir a partir de un Stream, Iterable o Array y se puede reproducir varias veces.

Streamable<Integer> streamable = Streamable.of(1,2,3); streamable.stream().forEach(System.out::println); streamable.stream().forEach(System.out::println);

AsStreamable.synchronizedFromStream (stream): se puede usar para crear un Streamable que poblará de forma perezosa su colección de respaldo, de forma tal que se pueda compartir entre subprocesos. Streamable.fromStream (transmisión) no incurrirá en ninguna sobrecarga de sincronización.