streams procesamiento parte parallel operaciones metodos libreria funcionales funcional ejemplo datos con collection java multithreading concurrency parallel-processing java-stream

procesamiento - Agregar elementos a Java 8 Streams paralelos sobre la marcha



procesamiento de datos con streams de java se 8-parte 2 (2)

El objetivo es procesar un flujo continuo de elementos con la ayuda de las secuencias Java 8. Por lo tanto, los elementos se agregan a la fuente de datos de una secuencia paralela mientras se procesa esa secuencia.

El Javadoc de Streams describe las siguientes propiedades en la sección "No interferencia":

Para la mayoría de las fuentes de datos, prevenir la interferencia significa asegurarse de que la fuente de datos no se modifique durante la ejecución de la canalización de la secuencia. La excepción notable a esto son las secuencias cuyas fuentes son colecciones concurrentes, que están diseñadas específicamente para manejar modificaciones concurrentes. Las fuentes de transmisión concurrentes son aquellas cuyo Spliterator informa la característica CONCURRENT.

Esa es la razón por la cual se utiliza una ConcurrentLinkedQueue en nuestros intentos, que se cumple para

new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)

No se dice explícitamente que la fuente de datos no se debe modificar cuando se usa en transmisiones paralelas.

En nuestro ejemplo para cada uno de los elementos en la secuencia, el valor de contador incrementado se agrega a la cola, que es la fuente de datos de la secuencia, hasta que el contador es mayor que N. Con la llamada queue.stream () todo funciona bien mientras secuencial ejecución:

import static org.junit.Assert.assertEquals; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; public class StreamTest { public static void main(String[] args) { final int N = 10000; assertEquals(N, testSequential(N)); } public static int testSequential(int N) { final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger check = new AtomicInteger(0); final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); for (int i = 0; i < N / 10; ++i) { queue.add(counter.incrementAndGet()); } Stream<Integer> stream = queue.stream(); stream.forEach(i -> { System.out.println(i); int j = counter.incrementAndGet(); check.incrementAndGet(); if (j <= N) { queue.add(j); } }); stream.close(); return check.get(); } }

Como segundo intento, la secuencia es paralela y arroja un java.lang.AssertionError porque la verificación es menor que N y no se procesaron todos los elementos de la cola. Es posible que la secuencia haya finalizado la ejecución anticipadamente porque la cola puede haberse quedado vacía en algún momento.

import static org.junit.Assert.assertEquals; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; public class StreamTest { public static void main(String[] args) { final int N = 10000; assertEquals(N, testParallel1(N)); } public static int testParallel1(int N) { final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger check = new AtomicInteger(0); final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); for (int i = 0; i < N / 10; ++i) { queue.add(counter.incrementAndGet()); } Stream<Integer> stream = queue.parallelStream(); stream.forEach(i -> { System.out.println(i); int j = counter.incrementAndGet(); check.incrementAndGet(); if (j <= N) { queue.add(j); } }); stream.close(); return check.get(); } }

El siguiente intento fue señalar el hilo principal, una vez que el flujo continuo ''realmente'' finalizó (la cola está vacía) y luego cerrar el objeto de flujo. Aquí el problema es que el objeto de transmisión parece leer elementos de la cola solo una vez o al menos no continuamente y nunca llega al final "real" de la secuencia.

import static org.junit.Assert.assertEquals; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; public class StreamTest { public static void main(String[] args) { final int N = 10000; try { assertEquals(N, testParallel2(N)); } catch (InterruptedException e) { e.printStackTrace(); } } public static int testParallel2(int N) throws InterruptedException { final Lock lock = new ReentrantLock(); final Condition cond = lock.newCondition(); final AtomicInteger counter = new AtomicInteger(0); final AtomicInteger check = new AtomicInteger(0); final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); for (int i = 0; i < N / 10; ++i) { queue.add(counter.incrementAndGet()); } Stream<Integer> stream = queue.parallelStream(); stream.forEach(i -> { System.out.println(i); int j = counter.incrementAndGet(); lock.lock(); check.incrementAndGet(); if (j <= N) { queue.add(j); } else { cond.signal(); } lock.unlock(); }); lock.lock(); while (check.get() < N) { cond.await(); } lock.unlock(); stream.close(); return check.get(); } }

Las preguntas que surgen son:

  • ¿Hicimos algo mal?
  • ¿Es un uso no especificado o incluso incorrecto de Stream API?
  • ¿Cómo podemos lograr el comportamiento deseado de otra manera?

Stream puede generarse de forma continua o desde una colección que se modifica, y no está diseñado para ejecutarse continuamente. Está diseñado para procesar los elementos disponibles cuando la transmisión se inicia y se devuelve una vez que se han procesado. Tan pronto como se llega al final, se detiene.

¿Cómo podemos lograr el comportamiento deseado de otra manera?

Debe usar un enfoque diferente. Utilizaría un ExecutorService donde pase la tarea de envío que desea realizar.

Una alternativa sería usar un flujo continuo que bloquea cuando no hay resultados disponibles. Nota: esto bloqueará el Common ForkJoinPool utilizado por transmisión en paralelo y ningún otro código puede usarlo.


Existe una diferencia significativa entre "modificar la fuente del Stream no lo rompe" y las modificaciones de su supuesto "se reflejarán en la operación actual del Stream ".

La propiedad CONCURRENT implica que la modificación de la fuente está permitida , es decir, que nunca lanzará una ConcurrentModificationException , pero no implica que pueda confiar en un comportamiento específico con respecto a si estos cambios se reflejan o no.

La documentación de la bandera de CONCURRENT dice:

La mayoría de las colecciones simultáneas mantienen una política de coherencia que garantiza la precisión con respecto a los elementos presentes en el punto de construcción del Spliterator, pero posiblemente no reflejen adiciones o eliminaciones posteriores.

Este comportamiento de Stream es coherente con el comportamiento ya conocido de ConcurrentLinkedQueue :

Los iteradores son débilmente consistentes y devuelven elementos que reflejan el estado de la cola en algún momento o desde la creación del iterador. No lanzan ConcurrentModificationException , y pueden proceder al mismo tiempo que otras operaciones. Los elementos contenidos en la cola desde la creación del iterador se devolverán exactamente una vez.

Es difícil decir cómo "lograr el comportamiento deseado de lo contrario", ya que no describió el "comportamiento deseado" en ninguna forma que no sea el código, que puede ser simplemente reemplazado por

public static int testSequential(int N) { return N; } public static int testParallel1(int N) { return N; }

ya que es el único efecto observable ... Considere redefinir su problema ...