parallel into instream ejemplo divided java java-8 java-stream

into - stream parallel java 8



Procesamiento condicional de secuencias de Java 8 (7)

Me interesa separar un flujo en dos o más subflujos y procesar los elementos de diferentes maneras. Por ejemplo, un archivo de texto (grande) puede contener líneas de tipo A y líneas de tipo B, en cuyo caso me gustaría hacer algo como:

File.lines(path) .filter(line -> isTypeA(line)) .forEachTrue(line -> processTypeA(line)) .forEachFalse(line -> processTypeB(line))

Lo anterior es mi intento de abstraer la situación. En realidad, tengo un archivo de texto muy grande donde cada línea está probando contra una expresión regular; Si la línea pasa, entonces se procesa, mientras que si se rechaza, quiero actualizar un contador. Este procesamiento adicional en las cadenas rechazadas es la razón por la que no uso simplemente el filter .

¿Hay alguna forma razonable de hacer esto con las secuencias, o tendré que recurrir a los bucles? (Me gustaría que esto también se ejecute en paralelo, así que las secuencias son mi primera opción).


Aquí hay un enfoque (que ignora las precauciones sobre forzar el procesamiento condicional en una secuencia) que envuelve un predicado y un consumidor en un solo predicado con efecto secundario:

public static class StreamProc { public static <T> Predicate<T> process( Predicate<T> condition, Consumer<T> operation ) { Predicate<T> p = t -> { operation.accept(t); return false; }; return (t) -> condition.test(t) ? p.test(t) : true; } }

Luego filtra la corriente:

someStream .filter( StreamProc.process( cond1, op1 ) ) .filter( StreamProc.process( cond2, op2 ) ) ... .collect( ... )

Los elementos que quedan en la corriente aún no se han procesado.

Por ejemplo, un recorrido típico del sistema de archivos con iteración externa parece

File[] files = dir.listFiles(); for ( File f : files ) { if ( f.isDirectory() ) { this.processDir( f ); } else if ( f.isFile() ) { this.processFile( f ); } else { this.processErr( f ); } }

Con streams e iteración interna esto se convierte en

Arrays.stream( dir.listFiles() ) .filter( StreamProc.process( f -> f.isDirectory(), this::processDir ) ) .filter( StreamProc.process( f -> f.isFile(), this::processFile ) ) .forEach( f -> this::processErr );

Me gustaría que Stream implementara el método de proceso directamente. Entonces podríamos tener

Arrays.stream( dir.listFiles() ) .process( f -> f.isDirectory(), this::processDir ) ) .process( f -> f.isFile(), this::processFile ) ) .forEach( f -> this::processErr );

¿Pensamientos?


Bueno, simplemente puedes hacer

Counter counter = new Counter(); File.lines(path) .forEach(line -> { if (isTypeA(line)) { processTypeA(line); } else { counter.increment(); } });

No es muy funcional, pero lo hace de manera similar a su ejemplo. Por supuesto, si son paralelos, tanto Counter.increment() como processTypeA() deben ser seguros para subprocesos.


La forma en que manejaría esto es no dividir esto en absoluto, sino escribir

Files.lines(path) .map(line -> { if (condition(line)) { return doThingA(line); } else { return doThingB(line); } })...

Los detalles varían según lo que quieras hacer exactamente y cómo planeas hacerlo.


Las transmisiones Java 8 no fueron diseñadas para soportar este tipo de operación. Desde el jdk :

Un flujo debe operarse en (invocar una operación de flujo intermedio o terminal) solo una vez. Esto descarta, por ejemplo, flujos "bifurcados", donde la misma fuente alimenta dos o más tuberías, o múltiples recorridos del mismo flujo.

Si puede almacenarlo en la memoria, puede usar Collectors.partitioningBy si solo tiene dos tipos e ir con un Map<Boolean, List> . De lo contrario, utilice Collectors.groupingBy .


Parece que en realidad usted desea procesar cada línea, pero procesarla de manera diferente según alguna condición (tipo).

Creo que esta es la forma más o menos funcional de implementarlo sería:

public static void main(String[] args) { Arrays.stream(new int[] {1,2,3,4}).map(i -> processor(i).get()).forEach(System.out::println); } static Supplier<Integer> processor(int i) { return tellType(i) ? () -> processTypeA(i) : () -> processTypeB(i); } static boolean tellType(int i) { return i % 2 == 0; } static int processTypeA(int i) { return i * 100; } static int processTypeB(int i) { return i * 10; }


Si bien no se recomiendan los efectos secundarios en los parámetros de comportamiento, no están prohibidos, siempre que no haya interferencia, por lo que la solución más simple, aunque no más limpia, es contar directamente en el filtro:

AtomicInteger rejected=new AtomicInteger(); Files.lines(path) .filter(line -> { boolean accepted=isTypeA(line); if(!accepted) rejected.incrementAndGet(); return accepted; }) // chain processing of matched lines

Mientras esté procesando todos los elementos, el resultado será consistente. Solo si está utilizando una operación de terminal de cortocircuito (en una secuencia paralela), el resultado será impredecible.

La actualización de una variable atómica puede no ser la solución más eficiente, pero en el contexto del procesamiento de líneas desde un archivo, la sobrecarga probablemente sea insignificante.

Si desea una solución limpia y paralela, un enfoque general es implementar un Collector que pueda combinar el procesamiento de dos operaciones de recopilación basadas en una condición. Esto requiere que pueda expresar la operación posterior como un recopilador, pero la mayoría de las operaciones de transmisión pueden expresarse como un recopilador (y la tendencia es hacia la posibilidad de expresar todas las operaciones de esa manera, es decir, Java 9 agregará el filtering falta actualmente). flatMapping .

Necesitará un tipo de par para mantener dos resultados, por lo que suponiendo un boceto como

class Pair<A,B> { final A a; final B b; Pair(A a, B b) { this.a=a; this.b=b; } }

La implementación del colector de combinación se verá como

public static <T, A1, A2, R1, R2> Collector<T, ?, Pair<R1,R2>> conditional( Predicate<? super T> predicate, Collector<T, A1, R1> whenTrue, Collector<T, A2, R2> whenFalse) { Supplier<A1> s1=whenTrue.supplier(); Supplier<A2> s2=whenFalse.supplier(); BiConsumer<A1, T> a1=whenTrue.accumulator(); BiConsumer<A2, T> a2=whenFalse.accumulator(); BinaryOperator<A1> c1=whenTrue.combiner(); BinaryOperator<A2> c2=whenFalse.combiner(); Function<A1,R1> f1=whenTrue.finisher(); Function<A2,R2> f2=whenFalse.finisher(); return Collector.of( ()->new Pair<>(s1.get(), s2.get()), (p,t)->{ if(predicate.test(t)) a1.accept(p.a, t); else a2.accept(p.b, t); }, (p1,p2)->new Pair<>(c1.apply(p1.a, p2.a), c2.apply(p1.b, p2.b)), p -> new Pair<>(f1.apply(p.a), f2.apply(p.b))); }

y se puede usar, por ejemplo, para recopilar elementos coincidentes en una lista y contar los no coincidentes, como esto:

Pair<List<String>, Long> p = Files.lines(path) .collect(conditional(line -> isTypeA(line), Collectors.toList(), Collectors.counting())); List<String> matching=p.a; long nonMatching=p.b;

El colector es amigable para el paralelo y permite recolectores delegados arbitrariamente complejos, pero tenga en cuenta que con la implementación actual, la secuencia devuelta por Files.lines puede no funcionar tan bien con el procesamiento en paralelo, en comparación con "Lector # líneas () paraleliza debido a un lote no configurable. La política de tamaño en su separador ” . Las mejoras están programadas para el lanzamiento de Java 9.


Simplemente prueba cada elemento, y actúa en consecuencia.

lines.forEach(line -> { if (isTypeA(line)) processTypeA(line); else processTypeB(line); });

Este comportamiento podría estar oculto en un método auxiliar:

public static <T> Consumer<T> branch(Predicate<? super T> test, Consumer<? super T> t, Consumer<? super T> f) { return o -> { if (test.test(o)) t.accept(o); else f.accept(o); }; }

Entonces el uso se vería así:

lines.forEach(branch(this::isTypeA, this::processTypeA, this::processTypeB));

Nota tangencial

El método Files.lines() no cierra el archivo subyacente, por lo que debe usarlo así:

try (Stream<String> lines = Files.lines(path, encoding)) { lines.forEach(...); }

Las variables de tipo Stream ponen un poco de bandera roja, así que prefiero administrar un BufferedReader directamente:

try (BufferedReader lines = Files.newBufferedReader(path, encoding)) { lines.lines().forEach(...); }