parallel - El bucle forEach paralelo de Java 8 anidado no funciona bien. ¿Se espera este comportamiento?
java 8 parallel stream thread pool size (3)
Después de poner en orden el código un poco. No veo los mismos resultados con la actualización 45 de Java 8. Sin duda, hay una sobrecarga, pero es muy pequeña en comparación con el período de tiempo del que habla.
Se espera la posibilidad de un interbloqueo ya que está consumiendo todos los subprocesos disponibles en la agrupación con el bucle externo, sin dejar hilos para ejecutar el bucle interno.
El siguiente programa imprime
isInnerStreamParallel: false, isCPUTimeBurned: false
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 33.1 seconds.
isInnerStreamParallel: false, isCPUTimeBurned: true
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 33.0 seconds.
isInnerStreamParallel: true, isCPUTimeBurned: false
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 32.5 seconds.
isInnerStreamParallel: true, isCPUTimeBurned: true
java.util.concurrent.ForkJoinPool.common.parallelism = 8
Done in 32.6 seconds.
El código
import java.util.stream.IntStream;
public class NestedParallelForEachTest {
// Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
static final int numberOfTasksInOuterLoop = 24; // In real applications this can be a large number (e.g. > 1000).
static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
static final int concurrentExecutionsLimitForStreams = 8; // java.util.concurrent.ForkJoinPool.common.parallelism
public static void main(String[] args) {
testNestedLoops(false, false);
testNestedLoops(false, true);
testNestedLoops(true, false);
testNestedLoops(true, true);
}
public static void testNestedLoops(boolean isInnerStreamParallel, boolean isCPUTimeBurned) {
System.out.println("isInnerStreamParallel: " + isInnerStreamParallel + ", isCPUTimeBurned: " + isCPUTimeBurned);
long start = System.nanoTime();
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
// Outer loop
IntStream.range(0, numberOfTasksInOuterLoop).parallel().forEach(i -> {
// System.out.println(i + "/t" + Thread.currentThread());
if(i < 10) burnTime(10 * 1000, isCPUTimeBurned);
IntStream range = IntStream.range(0, numberOfTasksInInnerLoop);
if (isInnerStreamParallel) {
// Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
range = range.parallel();
} else {
// Inner loop as sequential
}
range.forEach(j -> burnTime(10, isCPUTimeBurned));
if(i >= 10) burnTime(10 * 1000, isCPUTimeBurned);
});
long end = System.nanoTime();
System.out.printf("Done in %.1f seconds.%n", (end - start) / 1e9);
}
static void burnTime(long millis, boolean isCPUTimeBurned) {
if (isCPUTimeBurned) {
long end = System.nanoTime() + millis * 1000000;
while (System.nanoTime() < end)
;
} else {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
}
}
}
Nota: ya solucioné este problema en otra publicación SO: el uso de un semáforo dentro de una acción de flujo paralelo Java 8 anidada puede ser DEADLOCK. ¿Es esto un error? -, pero el título de este post sugiere que el problema está relacionado con el uso de un semáforo, lo que distrajo un poco la discusión. Estoy creando este para enfatizar que los bucles anidados pueden tener un problema de rendimiento, aunque ambos problemas probablemente tengan una causa común (y quizás porque me tomó mucho tiempo resolver este problema). (No lo veo como un duplicado, porque está acentuando otro síntoma, pero si lo elimina).
Problema: si anida dos Java 8 stream.parallel (). ForEach bucles y todas las tareas son independientes, sin estado, etc., excepto que se envían a la agrupación de FJ común, el anidamiento de un bucle paralelo dentro de un bucle paralelo funciona mucho peor que anidar un bucle secuencial dentro de un bucle paralelo. Peor aún: si la operación que contiene el bucle interno está sincronizada, obtendrá un DEADLOCK.
Demostración del problema de rendimiento.
Sin el ''sincronizado'' todavía puede observar un problema de rendimiento. Encontrará un código de demostración para esto en: http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachTest.java (vea JavaDoc para obtener una descripción más detallada).
Nuestra configuración aquí es la siguiente: Tenemos un stream.parallel (). ForEach () anidado.
- El bucle interno es independiente (sin estado, sin interferencias, etc., excepto por el uso de un conjunto común) y consume 1 segundo en total en el peor de los casos, es decir, si se procesa secuencialmente.
- La mitad de las tareas del bucle externo consumen 10 segundos antes de ese bucle.
- La mitad consume 10 segundos después de ese bucle.
- Por lo tanto, cada hilo consume 11 segundos (el peor de los casos) en total. * Tenemos un booleano que permite cambiar el bucle interno de paralelo () a secuencial ().
Ahora: al enviar 24 tareas de bucle externo a una agrupación con paralelismo 8 esperaríamos 24/8 * 11 = 33 segundos como máximo (en una máquina de 8 núcleos o mejor).
El resultado es:
- Con bucle secuencial interno: 33 segundos.
- Con bucle paralelo interno:> 80 segundos (tuve 92 segundos).
Pregunta: ¿Se puede confirmar este comportamiento? ¿Es esto algo que uno esperaría del marco? (Tengo un poco más de cuidado ahora con la afirmación de que se trata de un error, pero personalmente creo que se debe a un error en la implementación de ForkJoinTask. Observación: he publicado esto en concurrency-interest (consulte http://cs.oswego.edu/pipermail/concurrency-interest/2014-May/012652.html ), pero hasta ahora no obtuve la confirmación de allí).
Demostración del punto muerto
El siguiente código va a DEADLOCK
// Outer loop
IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
doWork();
synchronized(this) {
// Inner loop
IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
doWork();
});
}
});
donde numberOfTasksInOuterLoop = 24
, numberOfTasksInInnerLoop = 240
, outerLoopOverheadFactor = 10000
y doWork
es un quemador de CPU sin estado.
Encontrará un código de demostración completo en http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachAndSynchronization.java (consulte el documento JavaDoc para obtener una descripción más detallada).
¿Se espera este comportamiento? Tenga en cuenta que la documentación sobre flujos paralelos de Java no menciona ningún problema con el anidamiento o la sincronización. Además, no se menciona el hecho de que ambos utilicen un fork-join-pool común.
Actualizar
Otra prueba sobre el problema de rendimiento se puede encontrar en http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/NestedParallelForEachBenchmark.java : esta prueba se realiza sin ninguna operación de bloqueo .sueño y no sincronizado). Compilé algunos comentarios más aquí: http://christian-fries.de/blog/files/2014-nested-java-8-parallel-foreach.html
Actualización 2
Parece que este problema y el DEADLOCK más severo con semáforos se ha solucionado en Java8 u40.
El problema es que el procesamiento de flujo externo consume el paralelismo bastante limitado que ha configurado: si dice que quiere ocho hilos y procesa un flujo de más de ocho elementos con parallel()
, creará ocho hilos de trabajo y los dejará artículos de proceso.
Luego, dentro de su consumidor, está procesando otra secuencia usando parallel()
pero no quedan subprocesos de trabajo. Dado que los subprocesos de trabajo están bloqueados esperando el final del procesamiento de la secuencia interna, ForkJoinPool
tiene que crear nuevos subprocesos de trabajo que violan su paralelismo configurado. Me parece que no recicla estos hilos de extensión, sino que los deja morir justo después del procesamiento. Entonces, dentro de su procesamiento interno, se crean y eliminan nuevos hilos, lo que es una operación costosa.
Es posible que vea como un defecto que los subprocesos que inician no contribuyen al cálculo de un procesamiento de flujo paralelo, sino que solo esperan el resultado, pero incluso si eso se solucionó, todavía tiene un problema general que es difícil (o nunca) solucionar:
Cuando la proporción entre el número de subprocesos de trabajo y los elementos de la secuencia externa es baja, la implementación los utilizará todos para la secuencia externa, ya que no sabe que la corriente es una corriente externa. De modo que la ejecución de un flujo interno en paralelo solicita más subprocesos de trabajo que los disponibles. El uso del subproceso de la persona que llama para contribuir al cálculo podría solucionarlo de manera que el rendimiento sea igual al cálculo en serie, pero obtener una ventaja de la ejecución paralela aquí no funciona bien con el concepto de un número fijo de subprocesos de trabajo.
Tenga en cuenta que está arañando la superficie de este problema aquí, ya que tiene tiempos de procesamiento bastante equilibrados para los artículos. Si el procesamiento de ambos, elementos internos y externos, divergen (en comparación con los elementos en el mismo nivel), el problema será aún peor.
Actualización: al crear un perfil y mirar el código, parece que ForkJoinPool
intenta usar el subproceso en espera para "robar trabajo", pero utiliza un código diferente dependiendo del hecho de si el Thread
es un subproceso de trabajo o algún otro subproceso. Como resultado, un subproceso de trabajo está realmente esperando alrededor del 80% del tiempo y haciendo muy poco o ningún trabajo, mientras que otros subprocesos realmente contribuyen al cálculo ...
Actualización 2: para completar, aquí el enfoque de ejecución paralela simple como se describe en los comentarios. Dado que encola cada elemento, se espera que tenga una gran sobrecarga cuando el tiempo de ejecución para un solo elemento es bastante pequeño. Entonces, no es una solución sofisticada sino una demostración de que es posible manejar tareas de ejecución prolongada sin mucha magia ...
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class NestedParallelForEachTest1 {
static final boolean isInnerStreamParallel = true;
// Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
static final int numberOfTasksInOuterLoop = 24; // In real applications this can be a large number (e.g. > 1000).
static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
static final int concurrentExecutionsLimitForStreams = 8;
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
new NestedParallelForEachTest1().testNestedLoops();
E.shutdown();
}
final static ThreadPoolExecutor E = new ThreadPoolExecutor(
concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );
public static void parallelForEach(IntStream s, IntConsumer c) {
s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
.forEach(NestedParallelForEachTest1::waitOrHelp);
}
static void waitOrHelp(Future f) {
while(!f.isDone()) {
Runnable r=E.getQueue().poll();
if(r!=null) r.run();
}
try { f.get(); }
catch(InterruptedException ex) { throw new RuntimeException(ex); }
catch(ExecutionException eex) {
Throwable t=eex.getCause();
if(t instanceof RuntimeException) throw (RuntimeException)t;
if(t instanceof Error) throw (Error)t;
throw new UndeclaredThrowableException(t);
}
}
public void testNestedLoops(NestedParallelForEachTest1 this) {
long start = System.nanoTime();
// Outer loop
parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
if(i < 10) sleep(10 * 1000);
if(isInnerStreamParallel) {
// Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
}
else {
// Inner loop as sequential
IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
}
if(i >= 10) sleep(10 * 1000);
});
long end = System.nanoTime();
System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
}
static void sleep(int milli) {
try {
Thread.sleep(milli);
} catch (InterruptedException ex) {
throw new AssertionError(ex);
}
}
}
Puedo confirmar que esto sigue siendo un problema de rendimiento en 8u72, aunque ya no será un punto muerto. Las operaciones de terminales paralelas todavía se realizan con instancias de ForkJoinTask fuera de un contexto de ForkJoinPool , lo que significa que cada flujo paralelo aún comparte el grupo común .
Para demostrar un caso patológico simple:
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
public class ParallelPerf {
private static final Object LOCK = new Object();
private static void runInNewPool(Runnable task) {
ForkJoinPool pool = new ForkJoinPool();
try {
pool.submit(task).join();
} finally {
pool.shutdown();
}
}
private static <T> T runInNewPool(Callable<T> task) {
ForkJoinPool pool = new ForkJoinPool();
try {
return pool.submit(task).join();
} finally {
pool.shutdown();
}
}
private static void innerLoop() {
IntStream.range(0, 32).parallel().forEach(i -> {
// System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
public static void main(String[] args) {
System.out.println("==DEFAULT==");
long startTime = System.nanoTime();
IntStream.range(0, 32).parallel().forEach(i -> {
synchronized (LOCK) {
innerLoop();
}
// System.out.println(" outer: " + Thread.currentThread().getName());
});
System.out.println(System.nanoTime() - startTime);
System.out.println("==NEW POOLS==");
startTime = System.nanoTime();
IntStream.range(0, 32).parallel().forEach(i -> {
synchronized (LOCK) {
runInNewPool(() -> innerLoop());
}
// System.out.println(" outer: " + Thread.currentThread().getName());
});
System.out.println(System.nanoTime() - startTime);
}
}
La segunda ejecución pasa innerLoop
a runInNewPool
lugar de llamarlo directamente. En mi máquina (i7-4790, 8 subprocesos de CPU), obtengo una aceleración de 4x:
==DEFAULT==
4321223964
==NEW POOLS==
1015314802
Descomentar las otras declaraciones impresas hace que el problema sea obvio:
[...]
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-6
outer: ForkJoinPool.commonPool-worker-6
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
[...]
ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-3
outer: ForkJoinPool.commonPool-worker-3
ForkJoinPool.commonPool-worker-4
ForkJoinPool.commonPool-worker-4
[...]
Los subprocesos de trabajadores del grupo común se acumulan en el bloque sincronizado, con solo un subproceso que puede ingresar a la vez. Dado que la operación paralela interna utiliza el mismo grupo, y todos los otros subprocesos del grupo están esperando el bloqueo, obtenemos una ejecución de un solo hilo.
Y el resultado de usar instancias de ForkJoinPool separadas:
[...]
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-6
ForkJoinPool-1-worker-5
outer: ForkJoinPool.commonPool-worker-4
ForkJoinPool-2-worker-1
ForkJoinPool-2-worker-5
[...]
ForkJoinPool-2-worker-7
ForkJoinPool-2-worker-3
outer: ForkJoinPool.commonPool-worker-1
ForkJoinPool-3-worker-2
ForkJoinPool-3-worker-5
[...]
Todavía tenemos el bucle interno ejecutándose en un subproceso de trabajador a la vez, pero la operación paralela interna obtiene un grupo nuevo cada vez y puede utilizar todos sus subprocesos de trabajador.
Este es un ejemplo artificial, pero la eliminación de los bloques sincronizados todavía muestra una diferencia similar en la velocidad, ya que los bucles interno y externo siguen compitiendo por los mismos subprocesos de trabajo. Las aplicaciones multiproceso deben tener cuidado al usar flujos paralelos en varios subprocesos, ya que esto podría provocar una ralentización aleatoria cuando se superponen.
Este es un problema con todas las operaciones del terminal, no solo para cada forEach
, ya que todas ejecutan tareas en el grupo común. Estoy utilizando los métodos runInNewPool
anteriores como solución alternativa, pero espero que esto se incorpore en la biblioteca estándar en algún momento.