streams procesamiento parte ejemplo datos con available java java-8 java-stream

procesamiento - Java 8 Streams: cuenta todos los elementos que entran en la operación del terminal



streams api is available in which package of java 8 (2)

Me pregunto si hay un enfoque más agradable (o simplemente otro) para obtener el recuento de todos los elementos que entran en el funcionamiento de terminal de una secuencia en lugar de lo siguiente:

Stream<T> stream = ... // given as parameter AtomicLong count = new AtomicLong(); stream.filter(...).map(...) .peek(t -> count.incrementAndGet())

donde count.get() me da el recuento real de los elementos procesados ​​en esa etapa.

.forEach deliberadamente la operación de la terminal, ya que podría cambiar entre .forEach , .reduce o .collect . Ya sé .count , pero parece funcionar bien solo si cambio un .forEach con un .map y uso el .count como operación de terminal. Pero me parece como si .map fuera mal utilizado.

Lo que realmente no me gusta con la solución anterior: si se agrega un filtro después de él, solo cuenta los elementos en esa etapa específica, pero no los que entran en la operación del terminal.

El otro enfoque que me viene a la mente es collect los valores filtrados y asignados en una lista y operar en eso y simplemente llamar a list.size() para obtener el conteo. Sin embargo, esto no funcionará, si la recopilación de la transmisión llevaría a un error, mientras que con la solución anterior podría tener un recuento de todos los elementos procesados ​​hasta el momento, si existe un try/catch adecuado. Sin embargo, eso no es un requisito difícil.


La mejor idea posible es utilizar una asignación en sí misma y, al mismo tiempo, contar la invocación de la rutina de asignación.

steam.map(object -> {counter.incrementAndGet(); return object;});

Dado que este lambda se puede reutilizar y puede reemplazar cualquier lambda con un objeto, puede crear un objeto de contador como este:

class StreamCounter<T> implements Function<? super T,? extends T> { int counter = 0; public T apply(T object) { counter++; return object;} public int get() { return counter;} }

Así que usando:

StreamCounter<String> myCounter = new ...; stream.map(myCounter)... int count = myCounter.get();

Como nuevamente, la invocación del mapa es solo otro punto de reutilización, el método del mapa puede proporcionarse extendiendo el flujo y envolviendo el flujo ordinario.

De esta manera puedes crear algo como:

AtomicLong myValue = new AtomicLong(); ... convert(stream).measure(myValue).map(...).measure(mySecondValue).filter(...).measure(myThirdValue).toList(...);

De esta manera, simplemente puede tener su propio contenedor de Stream que envuelve de forma transparente cada stream en su propia versión (que no es una sobrecarga de rendimiento o memoria) y mide la cardinalidad de cualquier punto de medida.

Esto se hace a menudo cuando se analiza la complejidad de los algoritmos al crear soluciones de mapa / reducción. Extienda su implementación de flujo al no tomar una instancia larga atómica para el conteo, sino solo el nombre del punto de medición que su implementación de flujo puede contener un número ilimitado de puntos de medición mientras proporciona una manera flexible de imprimir un informe.

Dicha implementación puede recordar la secuencia concreta de los métodos de flujo junto con la posición de cada punto de medición y brinda resultados como:

list -> (32k)map -> (32k)filter -> (5k)map -> avg().

Dicha implementación de flujo se escribe una vez, se puede usar para pruebas, pero también para informes.

La implementación en una implementación diaria brinda la posibilidad de recopilar estadísticas para ciertos procesos y permitir una optimización dinámica mediante el uso de una permutación diferente de las operaciones. Esto sería, por ejemplo, un optimizador de consultas.

Entonces, en su caso, lo mejor sería reutilizar primero un StreamCounter y, dependiendo de la frecuencia de uso, la cantidad de contadores y la afinidad por el principio DRY, eventualmente implementarán una solución más sofisticada más adelante.

PS: StreamCounter usa un valor int y no es seguro para subprocesos, por lo que en una configuración de flujo paralelo, se reemplazaría el int con una instancia de AtomicInteger .


Parece que ya tiene la solución más limpia a través de un peek antes de la operación de terminal IMO. La única razón por la que podría pensar que esto es necesario es para fines de depuración, y si ese es el caso, peek fue diseñado para eso. Envolver el flujo para eso y proporcionar implementaciones separadas es demasiado, además de la enorme cantidad de tiempo y soporte posterior para todo lo que se agrega a los Streams .

¿Para la parte de qué si hay otro filtro agregado? Bueno, proporcione un comentario de código (muchos de nosotros lo hacemos) y algunos casos de prueba que de otro modo fallarían, por ejemplo.

Sólo mi 0.02 $