streams procesamiento parte datos con java java-8 limit java-stream skip

procesamiento de datos con streams de java se 8-parte 2



Java 8 Stream: diferencia entre limit() y skip() (5)

Es una blasfemia completa mirar las operaciones de vapor individualmente porque no es así como se evalúa una corriente.

Hablando del límite (3) , es una operación de cortocircuito, lo cual tiene sentido porque al pensar en ello, sea cual sea la operación antes y después del limit , tener un límite en una secuencia detendrá la iteración después de obtener n elementos hasta la operación de límite, pero Esto no significa que solo se procesarían n elementos de flujo. Tome esta operación de transmisión diferente como ejemplo

public class App { public static void main(String[] args) { Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("/nA"+x)) .filter(x -> x%2==0) .limit(3) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x)); } }

saldría

A1 A2B2C2 A3 A4B4C4 A5 A6B6C6

lo cual parece correcto, porque el límite está esperando que pasen 3 elementos de flujo a través de la cadena de operación, aunque se procesan 6 elementos de flujo.

Hablando de Stream s, cuando ejecuto este fragmento de código

public class Main { public static void main(String[] args) { Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("/nA"+x)) .limit(3) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x)); } }

Me sale esta salida

A1B1C1 A2B2C2 A3B3C3

porque limitar mi flujo a los primeros tres componentes obliga a las acciones A , B y C a ejecutarse solo tres veces.

Intentar realizar un cálculo análogo en los últimos tres elementos utilizando el método skip() muestra un comportamiento diferente: esto

public class Main { public static void main(String[] args) { Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("/nA"+x)) .skip(6) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x)); } }

salidas esto

A1 A2 A3 A4 A5 A6 A7B7C7 A8B8C8 A9B9C9

¿Por qué, en este caso, se ejecutan las acciones A1 a A6 ? Debe tener algo que ver con el hecho de que el límite es una operación intermedia con estado de cortocircuito , mientras que el salto no lo es, pero no entiendo las implicaciones prácticas de esta propiedad. ¿Es solo que "todas las acciones antes de saltar se ejecutan mientras que no todas las personas antes del límite lo son"?


La notación fluida de la tubería fluida es lo que está causando esta confusión. Piensa en ello de esta manera:

limit(3)

Todas las operaciones canalizadas se evalúan perezosamente, excepto forEach() , que es una operación terminal , que desencadena la "ejecución de la canalización" .

Cuando se ejecuta la canalización, las definiciones de flujo intermediario no harán suposiciones sobre lo que sucede "antes" o "después" . Todo lo que están haciendo es tomar una secuencia de entrada y transformarla en una secuencia de salida:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9); Stream<Integer> s2 = s1.peek(x->System.out.print("/nA"+x)); Stream<Integer> s3 = s2.limit(3); Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x)); s4.forEach(x->System.out.print("C"+x));

  • s1 contiene 9 valores Integer diferentes.
  • s2 mira todos los valores que lo pasan y los imprime.
  • s3 pasa los primeros 3 valores a s4 y aborta la tubería después del tercer valor. S3 no produce más valores. Esto no significa que no haya más valores en la tubería. s2 aún produciría (e imprimiría) más valores, pero nadie solicita esos valores y, por lo tanto, la ejecución se detiene.
  • s4 vuelve a mirar todos los valores que lo pasan y los imprime.
  • forEach consume e imprime lo que s4 pasa.

Piensa en ello de esta manera. Toda la corriente es completamente perezosa. Solo la operación del terminal extrae activamente nuevos valores de la tubería. Después de haber extraído 3 valores de s4 <- s3 <- s2 <- s1 , s3 ya no producirá nuevos valores y ya no extraerá ningún valor de s2 <- s1 . Si bien s1 -> s2 aún podría producir 4-9 , esos valores nunca se extraen de la tubería y, por lo tanto, nunca se imprimen con s2 .

skip(6)

Con skip() sucede lo mismo:

Stream<Integer> s1 = Stream.of(1,2,3,4,5,6,7,8,9); Stream<Integer> s2 = s1.peek(x->System.out.print("/nA"+x)); Stream<Integer> s3 = s2.skip(6); Stream<Integer> s4 = s3.peek(x->System.out.print("B"+x)); s4.forEach(x->System.out.print("C"+x));

  • s1 contiene 9 valores Integer diferentes.
  • s2 mira todos los valores que lo pasan y los imprime.
  • s3 consume los primeros 6 valores, "omitiéndolos" , lo que significa que los primeros 6 valores no se pasan a s4 , solo los valores posteriores.
  • s4 vuelve a mirar todos los valores que lo pasan y los imprime.
  • forEach consume e imprime lo que s4 pasa.

Lo importante aquí es que s2 no es consciente de que la tubería restante omite ningún valor. s2 mira a todos los valores independientemente de lo que ocurra después.

Otro ejemplo:

Considere esta canalización, que aparece en esta publicación de blog.

IntStream.iterate(0, i -> ( i + 1 ) % 2) .distinct() .limit(10) .forEach(System.out::println);

Cuando ejecutas lo anterior, el programa nunca se detendrá. ¿Por qué? Porque:

IntStream i1 = IntStream.iterate(0, i -> ( i + 1 ) % 2); IntStream i2 = i1.distinct(); IntStream i3 = i2.limit(10); i3.forEach(System.out::println);

Lo que significa:

  • i1 genera una cantidad infinita de valores alternos: 0 , 1 , 0 , 1 , 0 , 1 , ...
  • i2 consume todos los valores que se han encontrado antes, pasando solo valores "nuevos" , es decir, hay un total de 2 valores que salen de i2 .
  • i3 pasa 10 valores, luego se detiene.

Este algoritmo nunca se detendrá, porque i3 espera a que i2 produzca 8 valores más después de 0 y 1 , pero esos valores nunca aparecen, mientras que i1 nunca deja de alimentar valores a i2 .

No importa que en algún momento de la tubería, se hayan producido más de 10 valores. Lo único que importa es que i3 nunca ha visto esos 10 valores.

Para responder tu pregunta:

¿Es solo que "todas las acciones antes de saltar se ejecutan mientras que no todas las personas antes del límite lo son"?

No Todas las operaciones antes de skip() o limit() se ejecutan. En ambas ejecuciones, obtienes A1 - A3 . Pero el limit() puede provocar un cortocircuito en la tubería, abortando el consumo de valor una vez que se ha producido el evento de interés (se alcanza el límite).


Lo que tienes aquí son dos tuberías de flujo.

Estas tuberías de flujo consisten en una fuente, varias operaciones intermedias y una operación terminal.

Pero las operaciones intermedias son flojas. Esto significa que no sucede nada a menos que una operación posterior requiera un artículo. Cuando lo hace, la operación intermedia hace todo lo necesario para producir el artículo requerido, y luego espera nuevamente hasta que se solicite otro artículo, y así sucesivamente.

Las operaciones de la terminal son generalmente "ansiosas". Es decir, solicitan todos los elementos de la secuencia que se necesitan para completar.

Entonces, realmente debería pensar en la canalización como forEach pidiendo a la secuencia detrás de él para el siguiente elemento, y esa secuencia pregunta a la secuencia detrás de ella, y así sucesivamente, hasta el origen.

Con eso en mente, veamos qué tenemos con su primer canal:

Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("/nA"+x)) .limit(3) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x));

Entonces, forEach está pidiendo el primer artículo. Eso significa que el peek "B" necesita un elemento, y solicita el flujo de salida de limit para él, lo que significa que el limit tendrá que pedir el peek "A", que va a la fuente. Se da un elemento, y sube hasta forEach , y obtienes tu primera línea:

A1B1C1

forEach pide otro artículo, luego otro. Y cada vez, la solicitud se propaga por la secuencia y se realiza. Pero cuando forEach pregunta por el cuarto elemento, cuando la solicitud llega al limit , sabe que ya ha dado todos los elementos que tiene permitido dar.

Por lo tanto, no le está pidiendo a la mirada "A" otro artículo. Indica inmediatamente que sus elementos están agotados y, por lo tanto, no se realizan más acciones y para cada forEach termina.

¿Qué sucede en la segunda tubería?

Stream.of(1,2,3,4,5,6,7,8,9) .peek(x->System.out.print("/nA"+x)) .skip(6) .peek(x->System.out.print("B"+x)) .forEach(x->System.out.print("C"+x));

Nuevamente, forEach está pidiendo el primer artículo. Esto se propaga de nuevo. Pero cuando llega al skip , sabe que tiene que pedir 6 artículos de su flujo ascendente antes de que pueda pasar uno río abajo. Por lo tanto, realiza una solicitud en sentido ascendente desde el peek "A", la consume sin pasarla en sentido descendente, realiza otra solicitud, etc. Entonces, el vistazo "A" obtiene 6 solicitudes de un artículo y produce 6 impresiones, pero estos artículos no se transmiten.

A1 A2 A3 A4 A5 A6

En la séptima solicitud realizada por skip , el elemento se pasa al vistazo "B" y de allí a forEach , por lo que se realiza la impresión completa:

A7B7C7

Entonces es como antes. El skip ahora, cada vez que reciba una solicitud, solicitará un artículo en sentido ascendente y lo pasará en sentido descendente, ya que "sabe" que ya ha realizado su trabajo de omisión. Entonces, el resto de las impresiones pasan por toda la tubería, hasta que se agota la fuente.


Tal vez este pequeño diagrama ayude a obtener una "sensación" natural de cómo se procesa la transmisión.

La primera línea =>8=>=7= ... === representa la secuencia. Los elementos 1..8 fluyen de izquierda a derecha. Hay tres "ventanas":

  1. En la primera ventana ( peek A ) ves todo
  2. En la segunda ventana ( skip 6 o limit 3 ) se realiza un tipo de filtrado. El primer o el último elemento se "eliminan", es decir, no se transfieren para su posterior procesamiento.
  3. En la tercera ventana, solo verá los elementos que se pasaron

┌────────────────────────────────────────────────────────────────────────────┐ │ │ │▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸▸ ▸▸▸▸▸▸▸▸▸ │ │ 8 7 6 5 4 3 2 1 │ │▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸▸ ▲ ▸▸▸▸▸▸▸▸▸ │ │ │ │ │ │ │ │ skip 6 │ │ │ peek A limit 3 peek B │ └────────────────────────────────────────────────────────────────────────────┘

Probablemente no todo (tal vez ni siquiera nada) en esta explicación sea técnicamente completamente correcto. Pero cuando lo veo así, es bastante claro para mí qué elementos alcanzan cuál de las instrucciones concatenadas.


Todos los flujos se basan en spliteradores, que tienen básicamente dos operaciones: avanzar (avanzar un elemento, similar al iterador) y dividir (dividirse en una posición arbitraria, que es adecuada para el procesamiento en paralelo). Puede dejar de tomar elementos de entrada en cualquier momento que desee (lo que se hace por limit ), pero no puede simplemente saltar a la posición arbitraria (no existe tal operación en la interfaz Spliterator ). Por lo tanto, la operación de skip necesita leer los primeros elementos de la fuente solo para ignorarlos. Tenga en cuenta que en algunos casos puede realizar un salto real:

List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9); list.stream().skip(3)... // will read 1,2,3, but ignore them list.subList(3, list.size()).stream()... // will actually jump over the first three elements