streams procesamiento print parte libreria ejercicios ejemplos datos con java set java-8 java-stream cartesian-product

print - procesamiento de datos con streams de java se 8-parte 2



Producto cartesiano de streams en Java 8 como stream(usando solo streams) (2)

Me gustaría crear un método que cree una secuencia de elementos que sean productos cartesianos de múltiples flujos determinados (agregados al mismo tipo al final por un operador binario). Tenga en cuenta que tanto los argumentos como los resultados son secuencias, no colecciones.

Por ejemplo, para dos flujos de {A, B} y {X, Y} me gustaría que produzca un flujo de valores {AX, AY, BX, BY} (la concatenación simple se usa para agregar las cadenas). Hasta ahora, he ideado este código:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, Stream<T>... streams) { Stream<T> result = null; for (Stream<T> stream : streams) { if (result == null) { result = stream; } else { result = result.flatMap(m -> stream.map(n -> aggregator.apply(m, n))); } } return result; }

Este es mi caso de uso deseado:

Stream<String> result = cartesian( (a, b) -> a + b, Stream.of("A", "B"), Stream.of("X", "Y") ); System.out.println(result.collect(Collectors.toList()));

Resultado esperado: AX, AY, BX, BY .

Otro ejemplo:

Stream<String> result = cartesian( (a, b) -> a + b, Stream.of("A", "B"), Stream.of("K", "L"), Stream.of("X", "Y") );

Resultado esperado: AKX, AKY, ALX, ALY, BKX, BKY, BLX, BLY .

Sin embargo, si ejecuto el código, obtengo este error:

IllegalStateException: la transmisión ya se ha operado o cerrado

¿Dónde se consume la corriente? Por flatMap ? ¿Se puede arreglar fácilmente?


stream se consume en la operación flatMap en la segunda iteración. Por lo tanto, debe crear una nueva secuencia cada vez que map su resultado. Por lo tanto, debe recopilar la stream por adelantado para obtener una nueva transmisión en cada iteración.

private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) { Stream<T> result = null; for (Stream<T> stream : streams) { if (result == null) { result = stream; } else { Collection<T> s = stream.collect(Collectors.toList()); result = result.flatMap(m -> s.stream().map(n -> aggregator.apply(m, n))); } } return result; }

O incluso más corto:

private static <T> Stream<T> cartesian(BiFunction<T, T, T> aggregator, Stream<T>... streams) { return Arrays.stream(streams).reduce((r, s) -> { List<T> collect = s.collect(Collectors.toList()); return r.flatMap(m -> collect.stream().map(n -> aggregator.apply(m, n))); }).orElse(Stream.empty()); }


Pasar las transmisiones en su ejemplo nunca es mejor que pasar las listas:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, List<T>... lists) { ... }

Y úsalo así:

Stream<String> result = cartesian( (a, b) -> a + b, Arrays.asList("A", "B"), Arrays.asList("K", "L"), Arrays.asList("X", "Y") );

En ambos casos, usted crea una matriz implícita de varargs y la usa como fuente de datos, por lo tanto, la pereza es imaginaria. Sus datos se almacenan realmente en las matrices.

En la mayoría de los casos, el flujo de productos cartesianos resultante es mucho más largo que las entradas, por lo que no hay prácticamente ninguna razón para hacer que las entradas sean flojas. Por ejemplo, al tener cinco listas de cinco elementos (25 en total), tendrá la secuencia resultante de 3125 elementos. Entonces, almacenar 25 elementos en la memoria no es un gran problema. De hecho, en la mayoría de los casos prácticos ya están almacenados en la memoria.

Para generar la secuencia de productos cartesianos, necesita "rebobinar" constantemente todas las secuencias (excepto la primera). Para rebobinar, las transmisiones deberían poder recuperar los datos originales una y otra vez, ya sea almacenándolos en el búfer (lo que no le gusta) o tomándolos nuevamente de la fuente (recopilación, matriz, archivo, red, números aleatorios, etc.). ) y realizar una y otra vez todas las operaciones intermedias. Si las operaciones de origen e intermedias son lentas, la solución diferida puede ser mucho más lenta que la solución de almacenamiento en búfer. Si su fuente no puede volver a generar los datos (por ejemplo, un generador de números aleatorios que no puede producir los mismos números que produjo anteriormente), su solución será incorrecta.

Sin embargo, la solución totalmente perezosa es posible. Solo use las transmisiones, pero transmita los proveedores:

private static <T> Stream<T> cartesian(BinaryOperator<T> aggregator, Supplier<Stream<T>>... streams) { return Arrays.stream(streams) .reduce((s1, s2) -> () -> s1.get().flatMap(t1 -> s2.get().map(t2 -> aggregator.apply(t1, t2)))) .orElse(Stream::empty).get(); }

La solución es interesante ya que creamos y reducimos el flujo de proveedores para obtener el proveedor resultante y finalmente llamarlo. Uso:

Stream<String> result = cartesian( (a, b) -> a + b, () -> Stream.of("A", "B"), () -> Stream.of("K", "L"), () -> Stream.of("X", "Y") ); result.forEach(System.out::println);