tutorial streams procesamiento parte libreria expresiones ejemplo datos con java parallel-processing java-8 java-stream

libreria - procesamiento de datos con streams de java se 8-parte 2



Java 8 Stream api: ¿por qué la distinción entre el modo de ejecución secuencial y paralelo? (3)

Parece que solo debería poder declarar un flujo, y la opción de ejecución secuencial / paralela debería manejarse automágicamente en una capa más abajo, ya sea por código de biblioteca o por la propia JVM como una función de los núcleos disponibles en tiempo de ejecución, el tamaño del problema, etc.

La realidad es que a) los streams son una biblioteca, y no tienen magia JVM especial, yb) no se puede diseñar una biblioteca lo suficientemente inteligente como para descubrir automágicamente cuál es la decisión correcta en este caso particular. No hay una forma sensata de calcular cuán costosa será una función en particular sin ejecutarla, incluso si pudiera introspectar su implementación, lo que no puede hacer, y ahora está introduciendo un punto de referencia en cada operación de transmisión, tratando de descubrir si paralelizar valdrá la pena el costo de la sobrecarga de paralelismo. Eso no es práctico, especialmente dado que tampoco se sabe de antemano cuán mala es la sobrecarga de paralelismo.

Siempre se prefiere una transmisión en paralelo, dada la cantidad adecuada de núcleos y el tamaño del problema para justificar la sobrecarga, debido a las ganancias de rendimiento.

No siempre, en la práctica. Algunas tareas son tan pequeñas que no vale la pena paralelizarlas, y el paralelismo siempre tiene un poco de sobrecarga. (Y francamente, la mayoría de los programadores tienden a sobreestimar la utilidad del paralelismo, golpeándolo en todas partes cuando realmente está perjudicando el rendimiento).

Básicamente, es un problema bastante difícil que básicamente tienes que llevarlo al programador.

Desde el Stream javadoc :

Las tuberías de flujo pueden ejecutarse de forma secuencial o en paralelo. Este modo de ejecución es una propiedad de la transmisión. Los flujos se crean con una opción inicial de ejecución secuencial o paralela.

Mis suposiciones:

  1. No hay diferencia funcional entre las secuencias secuenciales / paralelas. La salida nunca se ve afectada por el modo de ejecución.
  2. Siempre se prefiere una transmisión en paralelo, dada la cantidad adecuada de núcleos y el tamaño del problema para justificar la sobrecarga, debido a las ganancias de rendimiento.
  3. Queremos escribir código una vez y ejecutar en cualquier lugar sin tener que preocuparnos por el hardware (esto es Java, después de todo).

Suponiendo que estas suposiciones son válidas (nada malo con un poco de meta-suposición), ¿cuál es el valor de tener el modo de ejecución expuesto en la API?

Parece que solo debería poder declarar un Stream , y la opción de ejecución secuencial / paralela debería manejarse automágicamente en una capa más abajo, ya sea por código de biblioteca o por la propia JVM como una función de los núcleos disponibles en tiempo de ejecución, el tamaño del problema, etc.

Claro, suponiendo que las transmisiones paralelas también funcionen en una sola máquina central, tal vez solo el uso de una transmisión paralela lo logre. Pero esto es realmente feo: ¿por qué tener referencias explícitas a las transmisiones paralelas en mi código cuando es la opción predeterminada?

Incluso si hay un escenario en el que deliberadamente desea codificar el uso de una secuencia secuencial, ¿por qué no hay solo una interfaz secundaria SequentialStream para ese fin, en lugar de contaminar Stream con un cambio de modo de ejecución?


No hay diferencia funcional entre las secuencias secuenciales / paralelas. La salida nunca se ve afectada por el modo de ejecución.

Hay una diferencia entre la ejecución de secuencias secuenciales / paralelas. En el código siguiente, TEST_2 resultados muestra que la ejecución de subprocesos paralelos es mucho más rápida que la secuencia.

Siempre se prefiere una transmisión en paralelo, dada la cantidad adecuada de núcleos y el tamaño del problema para justificar la sobrecarga, debido a las ganancias de rendimiento.

Realmente no. si la tarea no es digna (tareas simples) para ser ejecutada en hilos paralelos, entonces simplemente estamos agregando sobrecarga a nuestro código. TEST_1 resultados muestra esto. También tenga en cuenta que si todos los subprocesos de trabajo están ocupados en una tarea de ejecución paralela; entonces otra operación de transmisión paralela en otro lugar de su código estará esperando eso.

Queremos escribir código una vez y ejecutar en cualquier lugar sin tener que preocuparnos por el hardware (esto es Java, después de todo).

Como solo el programador sabe; ¿Vale la pena ejecutar esta tarea en paralelo / secuencia independientemente de las CPU? Entonces Java API expuso ambas opciones al desarrollador.

import java.util.ArrayList; import java.util.List; /* * Performance test over internal(parallel/sequential) and external iterations. * https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html * * * Parallel computing involves dividing a problem into subproblems, * solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), * and then combining the results of the solutions to the subproblems. Java SE provides the fork/join framework, * which enables you to more easily implement parallel computing in your applications. However, with this framework, * you must specify how the problems are subdivided (partitioned). * With aggregate operations, the Java runtime performs this partitioning and combining of solutions for you. * * Limit the parallelism that the ForkJoinPool offers you. You can do it yourself by supplying the -Djava.util.concurrent.ForkJoinPool.common.parallelism=1, * so that the pool size is limited to one and no gain from parallelization * * @see ForkJoinPool * https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html * * ForkJoinPool, that pool creates a fixed number of threads (default: number of cores) and * will never create more threads (unless the application indicates a need for those by using managedBlock). * * http://.com/questions/10797568/what-determines-the-number-of-threads-a-java-forkjoinpool-creates * */ public class IterationThroughStream { private static boolean found = false; private static List<Integer> smallListOfNumbers = null; public static void main(String[] args) throws InterruptedException { // TEST_1 List<String> bigListOfStrings = new ArrayList<String>(); for(Long i = 1l; i <= 1000000l; i++) { bigListOfStrings.add("Counter no: "+ i); } System.out.println("Test Start"); System.out.println("-----------"); long startExternalIteration = System.currentTimeMillis(); externalIteration(bigListOfStrings); long endExternalIteration = System.currentTimeMillis(); System.out.println("Time taken for externalIteration(bigListOfStrings) is :" + (endExternalIteration - startExternalIteration) + " , and the result found: "+ found); long startInternalIteration = System.currentTimeMillis(); internalIteration(bigListOfStrings); long endInternalIteration = System.currentTimeMillis(); System.out.println("Time taken for internalIteration(bigListOfStrings) is :" + (endInternalIteration - startInternalIteration) + " , and the result found: "+ found); // TEST_2 smallListOfNumbers = new ArrayList<Integer>(); for(int i = 1; i <= 10; i++) { smallListOfNumbers.add(i); } long startExternalIteration1 = System.currentTimeMillis(); externalIterationOnSleep(smallListOfNumbers); long endExternalIteration1 = System.currentTimeMillis(); System.out.println("Time taken for externalIterationOnSleep(smallListOfNumbers) is :" + (endExternalIteration1 - startExternalIteration1)); long startInternalIteration1 = System.currentTimeMillis(); internalIterationOnSleep(smallListOfNumbers); long endInternalIteration1 = System.currentTimeMillis(); System.out.println("Time taken for internalIterationOnSleep(smallListOfNumbers) is :" + (endInternalIteration1 - startInternalIteration1)); // TEST_3 Thread t1 = new Thread(IterationThroughStream :: internalIterationOnThread); Thread t2 = new Thread(IterationThroughStream :: internalIterationOnThread); Thread t3 = new Thread(IterationThroughStream :: internalIterationOnThread); Thread t4 = new Thread(IterationThroughStream :: internalIterationOnThread); t1.start(); t2.start(); t3.start(); t4.start(); Thread.sleep(30000); } private static boolean externalIteration(List<String> bigListOfStrings) { found = false; for(String s : bigListOfStrings) { if(s.equals("Counter no: 1000000")) { found = true; } } return found; } private static boolean internalIteration(List<String> bigListOfStrings) { found = false; bigListOfStrings.parallelStream().forEach( (String s) -> { if(s.equals("Counter no: 1000000")){ //Have a breakpoint to look how many threads are spawned. found = true; } } ); return found; } private static boolean externalIterationOnSleep(List<Integer> smallListOfNumbers) { found = false; for(Integer s : smallListOfNumbers) { try { Thread.sleep(100); } catch (Exception e) { e.printStackTrace(); } } return found; } private static boolean internalIterationOnSleep(List<Integer> smallListOfNumbers) { found = false; smallListOfNumbers.parallelStream().forEach( //Removing parallelStream() will behave as single threaded (sequential access). (Integer s) -> { try { Thread.sleep(100); //Have a breakpoint to look how many threads are spawned. } catch (Exception e) { e.printStackTrace(); } } ); return found; } public static void internalIterationOnThread() { smallListOfNumbers.parallelStream().forEach( (Integer s) -> { try { /* * DANGEROUS * This will tell you that if all the 7 FJP(Fork join pool) worker threads are blocked for one single thread (e.g. t1), * then other normal three(t2 - t4) thread wont execute, will wait for FJP worker threads. */ Thread.sleep(100); //Have a breakpoint here. } catch (Exception e) { e.printStackTrace(); } } ); } }


Hay un caso interesante en esta pregunta que muestra que a veces el flujo paralelo puede ser más lento en órdenes de magnitud. En ese ejemplo particular, la versión paralela se ejecuta durante diez minutos mientras que la secuencia toma varios segundos.