tutorial streams procesamiento parte libreria ejemplo datos con collection java concurrency parallel-processing java-8 java-stream

libreria - procesamiento de datos con streams de java se 8-parte 2



Grupo de subprocesos personalizados en Java 8 stream paralelo (11)

Como alternativa al truco de activar el cálculo paralelo dentro de su propia forkJoinPool, también puede pasar ese grupo al método CompletableFuture.supplyAsync como en:

ForkJoinPool forkJoinPool = new ForkJoinPool(2); CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() -> //parallel task here, for example range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()), forkJoinPool );

¿Es posible especificar un grupo de subprocesos personalizados para el flujo paralelo de Java 8? No lo encuentro por ningún lado.

Imagina que tengo una aplicación de servidor y me gustaría usar flujos paralelos. Pero la aplicación es grande y de múltiples subprocesos, así que quiero compartimentarla. No quiero una tarea de ejecución lenta en un módulo de las tareas de bloque de aplicación de otro módulo.

Si no puedo usar grupos de subprocesos diferentes para módulos diferentes, significa que no puedo usar flujos paralelos de forma segura en la mayoría de las situaciones del mundo real.

Prueba el siguiente ejemplo. Hay algunas tareas intensivas de CPU ejecutadas en subprocesos separados. Las tareas aprovechan los flujos paralelos. La primera tarea se interrumpe, por lo que cada paso toma 1 segundo (simulado por el modo de espera de subprocesos). El problema es que otros hilos se atascan y esperan a que finalice la tarea rota. Este es un ejemplo artificial, pero imagine una aplicación servlet y alguien que envía una tarea de larga ejecución al grupo de unión de bifurcaciones compartidas.

public class ParallelTest { public static void main(String[] args) throws InterruptedException { ExecutorService es = Executors.newCachedThreadPool(); es.execute(() -> runTask(1000)); //incorrect task es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.execute(() -> runTask(0)); es.shutdown(); es.awaitTermination(60, TimeUnit.SECONDS); } private static void runTask(int delay) { range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max() .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max)); } public static boolean isPrime(long n) { return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0); } }



En realidad, hay un truco sobre cómo ejecutar una operación paralela en un grupo específico de fork-join. Si lo ejecuta como una tarea en un grupo de fork-join, permanece allí y no usa el común.

ForkJoinPool forkJoinPool = new ForkJoinPool(2); forkJoinPool.submit(() -> //parallel task here, for example IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()) ).get();

El truco se basa en ForkJoinTask.fork que especifica: "Organiza la ejecución asíncrona de esta tarea en el grupo en el que se ejecuta la tarea actual, si corresponde, o el uso de ForkJoinPool.commonPool () si no es inForkJoinPool ()"


Hasta ahora, utilicé las soluciones descritas en las respuestas de esta pregunta. Ahora, se me ocurrió una pequeña biblioteca llamada Parallel Stream Support para eso:

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS); ParallelIntStreamSupport.range(1, 1_000_000, pool) .filter(PrimesPrint::isPrime) .collect(toList())

Pero como @PabloMatiasGomez señaló en los comentarios, hay inconvenientes con respecto al mecanismo de división de flujos paralelos que depende en gran medida del tamaño de la reserva común. Ver flujo paralelo desde un HashSet no se ejecuta en paralelo .

Estoy usando esta solución solo para tener grupos separados para diferentes tipos de trabajo, pero no puedo establecer el tamaño del grupo común en 1, incluso si no lo uso.


Las transmisiones paralelas usan el ForkJoinPool.commonPool predeterminado que, de manera predeterminada, tiene menos subprocesos que procesadores , como lo devolvió Runtime.getRuntime().availableProcessors() (Esto significa que las transmisiones paralelas usan todos sus procesadores porque también usan el subproceso principal ):

Para aplicaciones que requieren grupos separados o personalizados, un ForkJoinPool puede construirse con un nivel de paralelismo objetivo dado; por defecto, igual al número de procesadores disponibles.

Esto también significa que si ha anidado secuencias paralelas o si varias secuencias paralelas se iniciaron simultáneamente, todas compartirán el mismo grupo. Ventaja: nunca usará más que el valor predeterminado (número de procesadores disponibles). Desventaja: es posible que no se asignen "todos los procesadores" a cada flujo paralelo que inicie (si tiene más de uno). (Aparentemente puedes usar un ManagedBlocker para sortear eso).

Para cambiar la forma en que se ejecutan los flujos paralelos, puede:

  • envíe la ejecución de la transmisión en paralelo a su propio ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get(); o
  • puede cambiar el tamaño de la agrupación común usando las propiedades del sistema: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") para un paralelismo de destino de 20 hilos.

Ejemplo de este último en mi máquina que tiene 8 procesadores. Si ejecuto el siguiente programa:

long start = System.currentTimeMillis(); IntStream s = IntStream.range(0, 20); //System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20"); s.parallel().forEach(i -> { try { Thread.sleep(100); } catch (Exception ignore) {} System.out.print((System.currentTimeMillis() - start) + " "); });

La salida es:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

Así que puedes ver que la secuencia paralela procesa 8 elementos a la vez, es decir, utiliza 8 secuencias. Sin embargo, si elimino el comentario de la línea comentada, la salida es:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216

Esta vez, la secuencia paralela ha utilizado 20 subprocesos y los 20 elementos de la secuencia se han procesado simultáneamente.


Para medir el número real de subprocesos usados, puede verificar Thread.activeCount() :

Runnable r = () -> IntStream .range(-42, +42) .parallel() .map(i -> Thread.activeCount()) .max() .ifPresent(System.out::println); ForkJoinPool.commonPool().submit(r).join(); new ForkJoinPool(42).submit(r).join();

Esto puede producir en una CPU de 4 núcleos una salida como:

5 // common pool 23 // custom pool

Sin .parallel() da:

3 // common pool 4 // custom pool


Probé el ForkJoinPool personalizado de la siguiente manera para ajustar el tamaño de la piscina:

private static Set<String> ThreadNameSet = new HashSet<>(); private static Callable<Long> getSum() { List<Long> aList = LongStream.rangeClosed(0, 10_000_000).boxed().collect(Collectors.toList()); return () -> aList.parallelStream() .peek((i) -> { String threadName = Thread.currentThread().getName(); ThreadNameSet.add(threadName); }) .reduce(0L, Long::sum); } private static void testForkJoinPool() { final int parallelism = 10; ForkJoinPool forkJoinPool = null; Long result = 0L; try { forkJoinPool = new ForkJoinPool(parallelism); result = forkJoinPool.submit(getSum()).get(); //this makes it an overall blocking call } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } finally { if (forkJoinPool != null) { forkJoinPool.shutdown(); //always remember to shutdown the pool } } out.println(result); out.println(ThreadNameSet); }

Aquí está el resultado que indica que el grupo está utilizando más subprocesos que el predeterminado 4 .

50000005000000 [ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-6, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-10, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-13, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-2]

Pero en realidad hay un bicho raro , cuando intenté lograr el mismo resultado utilizando ThreadPoolExecutor siguiente manera:

BlockingDeque blockingDeque = new LinkedBlockingDeque(1000); ThreadPoolExecutor fixedSizePool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, blockingDeque, new MyThreadFactory("my-thread"));

pero yo fallé

Solo iniciará el paralelismo en un nuevo hilo y luego todo lo demás será igual, lo que nuevamente demuestra que el parallelStream utilizará el ForkJoinPool para iniciar sus hilos secundarios.


Si no le importa usar una biblioteca de terceros, con cyclops-react puede mezclar Streams secuenciales y paralelos dentro de la misma canalización y proporcionar herramientas de ForkJoin personalizadas. Por ejemplo

ReactiveSeq.range(1, 1_000_000) .foldParallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId())) .max(Comparator.naturalOrder()));

O si deseamos continuar procesando dentro de un Stream secuencial.

ReactiveSeq.range(1, 1_000_000) .parallel(new ForkJoinPool(10), s->s.filter(i->true) .peek(i->System.out.println("Thread " + Thread.currentThread().getId()))) .map(this::processSequentially) .forEach(System.out::println);

[Revelación Soy el desarrollador líder de cyclops-react]


Si no necesita un ThreadPool personalizado pero prefiere limitar el número de tareas simultáneas, puede usar:

List<Path> paths = List.of("/path/file1.csv", "/path/file2.csv", "/path/file3.csv").stream().map(e -> Paths.get(e)).collect(toList()); List<List<Path>> partitions = Lists.partition(paths, 4); // Guava method partitions.forEach(group -> group.parallelStream().forEach(csvFilePath -> { // do your processing }));

(La pregunta duplicada que solicita esto está bloqueada, así que por favor, pónganme aquí)


Ve a buscar AbacusUtil . El número de hilo puede especificarse para transmisión paralela. Aquí está el código de ejemplo:

LongStream.range(4, 1_000_000).parallel(threadNum)...

Divulgación: Soy el desarrollador de AbacusUtil.


Nota: Parece que hay una solución implementada en JDK 10 que garantiza que el Grupo de subprocesos personalizados utiliza el número esperado de subprocesos.

La ejecución de secuencias paralelas dentro de un ForkJoinPool personalizado debe obedecer el paralelismo https://bugs.openjdk.java.net/browse/JDK-8190974