ejemplo - El uso de un semáforo dentro de una acción de secuencia paralela Java 8 anidada puede DEADLOCK. ¿Es esto un error?
parallel stream vs threadpool (3)
Después de investigar un poco el código fuente de ForkJoinPool y ForkJoinTask, asumo que encontré una respuesta:
Es un error (en mi opinión), y el error está en doInvoke()
de ForkJoinTask
. El problema en realidad está relacionado con el anidamiento de los dos bucles y, presumiblemente, no con el uso del Semáforo, sin embargo, se necesita el Semáforo (o el bloqueo al final del bucle externo) para que el problema se haga evidente y resulte en un interbloqueo (pero puedo imaginar que hay otros problemas implicados en este error; consulte Nested Java 8 paralelo para Cada bucle tiene un bajo rendimiento. ¿Se espera este comportamiento? ).
La implementación del método doInvoke()
currently tiene el siguiente aspecto:
/**
* Implementation for invoke, quietlyInvoke.
*
* @return status upon completion
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
externalAwaitDone();
}
(y tal vez también en doJoin
que parece similar). En la linea
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
se prueba si Thread.currentThread () es una instancia de ForkJoinWorkerThread. El motivo de esta prueba es para comprobar si ForkJoinTask se está ejecutando en un subproceso de trabajo del grupo o del subproceso principal. Creo que esta línea está bien para un paralelo no anidado, donde permite distinguir si las tareas actuales se ejecutan en el subproceso principal o en un grupo de trabajadores. Sin embargo, para las tareas del bucle interno, esta prueba es problemática: Llamemos al subproceso que ejecuta el paralelo (). ForEeach el subproceso del creador . Para el bucle externo, el hilo creador es el hilo principal y no es una instanceof ForkJoinWorkerThread
. Sin embargo, para los bucles internos que se ejecutan desde un ForkJoinWorkerThread
el subproceso creador también es una instanceof ForkJoinWorkerThread
. Por lo tanto, en esta situación, la prueba ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
SIEMPRE ES VERDADERA!
Por lo tanto, siempre llamamos pool.awaitJoint(wt.workQueue)
.
Ahora, tenga en cuenta que llamamos awaitJoint
en la cola de trabajo COMPLETA de ese hilo (creo que este es un defecto adicional). Parece como si no solo estuviéramos uniendo las tareas de los bucles internos, sino también las tareas del bucle externo y UNAMOS TODAS AQUELLAS tareas. Desafortunadamente, la tarea externa contiene ese Semáforo.
Para comprobar que el error está relacionado con esto, podemos verificar una solución muy simple. Creo un t = new Thread()
que ejecuta el bucle interno, luego t.start(); t.join();
t.start(); t.join();
. Tenga en cuenta que esto no introducirá ningún paralelismo adicional (me estoy uniendo de inmediato). Sin embargo, cambiará el resultado de la prueba de instanceof ForkJoinWorkerThread
para el hilo creador. (Tenga en cuenta que la tarea aún se enviará al grupo común). Si se crea ese hilo contenedor, el problema ya no ocurre, al menos en mi situación de prueba actual.
Publiqué una demostración completa en http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/ForkJoinPoolTest.java
En este código de prueba la combinación
final boolean isUseSemaphore = true;
final boolean isUseInnerStream = true;
final boolean isWrappedInnerLoopThread = false;
resultará en un punto muerto, mientras que la combinación
final boolean isUseSemaphore = true;
final boolean isUseInnerStream = true;
final boolean isWrappedInnerLoopThread = true;
(y en realidad todas las demás combinaciones) no lo harán.
Actualización: dado que muchos están señalando que el uso del Semáforo es peligroso, intenté crear una demostración del problema sin Semáforo. Ahora, no hay más interbloqueos, sino un problema de rendimiento inesperado, en mi opinión. Creé una nueva publicación para eso en Nested Java 8 paralelismo para cada bucle con bajo rendimiento. ¿Se espera este comportamiento? . El código de demostración está aquí: svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/…
Considere la siguiente situación: estamos utilizando una secuencia paralela de Java 8 para realizar un bucle forEach paralelo, por ejemplo,
IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
El número de subprocesos paralelos está controlado por la propiedad del sistema "java.util.concurrent.ForkJoinPool.common.parallelism" y, por lo general, es igual al número de procesadores.
Ahora suponga que nos gusta limitar el número de ejecuciones paralelas para un trabajo específico, por ejemplo, porque esa parte requiere mucha memoria y la restricción de memoria implica un límite de ejecuciones paralelas.
Una forma obvia y elegante de limitar las ejecuciones paralelas es usar un Semáforo (sugerido ), por ejemplo, el siguiente fragmento de código limita el número de ejecuciones paralelas a 5:
final Semaphore concurrentExecutions = new Semaphore(5);
IntStream.range(0,20).parallel().forEach(i -> {
concurrentExecutions.acquireUninterruptibly();
try {
/* WORK DONE HERE */
}
finally {
concurrentExecutions.release();
}
});
¡Esto funciona bien!
Sin embargo: el uso de cualquier otra transmisión paralela dentro del trabajador (en /* WORK DONE HERE */
) puede resultar en un interbloqueo .
Para mí este es un comportamiento inesperado.
Explicación: dado que las secuencias Java utilizan un grupo de ForkJoin, el forEach interno se bifurca, y la unión parece estar esperando para siempre. Sin embargo, este comportamiento sigue siendo inesperado. Tenga en cuenta que las transmisiones paralelas funcionan incluso si configura "java.util.concurrent.ForkJoinPool.common.parallelism"
en 1.
Tenga en cuenta también que puede no ser transparente si hay un paralelo interno para cada uno.
Pregunta: ¿Está este comportamiento de acuerdo con la especificación de Java 8 (en ese caso, implicaría que el uso de Semáforos en trabajadores de flujos paralelos está prohibido) o es un error?
Para mayor comodidad: a continuación se muestra un caso de prueba completo. Cualquier combinación de los dos booleanos funciona, excepto "verdadero, verdadero", lo que resulta en un punto muerto.
Aclaración: para aclarar el punto, permítanme enfatizar un aspecto: el punto muerto no se produce al acquire
el semáforo. Tenga en cuenta que el código consiste en
- adquirir semáforo
- ejecutar un código
- liberar el semáforo
y el punto muerto se produce en 2. si ese fragmento de código está utilizando OTRO flujo paralelo. Entonces el punto muerto se produce dentro de esa secuencia OTRA. Como consecuencia, parece que no está permitido usar flujos paralelos anidados y operaciones de bloqueo (como un semáforo) juntos.
Tenga en cuenta que está documentado que las transmisiones paralelas utilizan ForkJoinPool y que ForkJoinPool y Semaphore pertenecen al mismo paquete: java.util.concurrent
(por lo que se podría esperar que interactúen bien).
/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: [email protected].
*
* Created on 03.05.2014
*/
package net.finmath.experiments.concurrency;
import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;
/**
* This is a test of Java 8 parallel streams.
*
* The idea behind this code is that the Semaphore concurrentExecutions
* should limit the parallel executions of the outer forEach (which is an
* <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
* the parallel executions of the outer forEach should be limited due to a
* memory constrain).
*
* Inside the execution block of the outer forEach we use another parallel stream
* to create an inner forEach. The number of concurrent
* executions of the inner forEach is not limited by us (it is however limited by a
* system property "java.util.concurrent.ForkJoinPool.common.parallelism").
*
* Problem: If the semaphore is used AND the inner forEach is active, then
* the execution will be DEADLOCKED.
*
* Note: A practical application is the implementation of the parallel
* LevenbergMarquardt optimizer in
* {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
* In one application the number of tasks in the outer and inner loop is very large (>1000)
* and due to memory limitation the outer loop should be limited to a small (5) number
* of concurrent executions.
*
* @author Christian Fries
*/
public class ForkJoinPoolTest {
public static void main(String[] args) {
// Any combination of the booleans works, except (true,true)
final boolean isUseSemaphore = true;
final boolean isUseInnerStream = true;
final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (e.g. > 1000).
final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
final int concurrentExecusionsLimitInOuterLoop = 5;
final int concurrentExecutionsLimitForStreams = 10;
final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
if(isUseSemaphore) {
concurrentExecutions.acquireUninterruptibly();
}
try {
System.out.println(i + "/t" + concurrentExecutions.availablePermits() + "/t" + Thread.currentThread());
if(isUseInnerStream) {
runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
}
else {
try {
Thread.sleep(10*numberOfTasksInInnerLoop);
} catch (Exception e) {
}
}
}
finally {
if(isUseSemaphore) {
concurrentExecutions.release();
}
}
});
System.out.println("D O N E");
}
/**
* Runs code in a parallel forEach using streams.
*
* @param numberOfTasksInInnerLoop Number of tasks to execute.
*/
private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
try {
Thread.sleep(10);
} catch (Exception e) {
}
});
}
}
Ejecuté su prueba en un generador de perfiles (VisualVM) y estoy de acuerdo: los hilos están esperando el semáforo y en aWaitJoin () en el F / J Pool.
Este marco tiene serios problemas en relación con join (). Llevo cuatro años escribiendo una crítica sobre este marco. El problema de unión básica comienza aquí.
aWaitJoin () tiene problemas similares. Usted puede leer el código usted mismo. Cuando el marco de trabajo llega al final del trabajo, emite un wait (). Todo se reduce a que este marco no tiene manera de hacer un cambio de contexto.
Hay una forma de obtener este marco para crear subprocesos de compensación para los subprocesos que están bloqueados. Debe implementar la interfaz ForkJoinPool.ManagedBlocker. Cómo puedes hacer esto, no tengo ni idea. Estás ejecutando una API básica con streams. No estás implementando la API de Streams y escribiendo tu propio código.
Me atengo a mi comentario, arriba: Una vez que pasas el paralelismo al API, renuncias a tu capacidad de controlar el funcionamiento interno de ese mecanismo paralelo. No hay ningún error con la API (aparte de que utiliza un marco defectuoso para operaciones paralelas). El problema es que los semáforos o cualquier otro método para controlar el paralelismo dentro de la API son ideas peligrosas.
Siempre que esté descomponiendo un problema en tareas, donde esas tareas puedan ser bloqueadas en otras tareas, e intente ejecutarlas en un grupo de subprocesos finitos, corre el riesgo de un punto muerto inducido por el grupo . Ver Java Concurrencia en la Práctica 8.1.
Esto es, sin duda, un error en su código. Está llenando el grupo de FJ con tareas que van a bloquear esperando los resultados de otras tareas en el mismo grupo. A veces, tienes suerte y las cosas se las arreglan para no interrumpir el bloqueo (como si no todos los errores de orden de bloqueo resultaran en un interbloqueo todo el tiempo), pero fundamentalmente estás patinando en un hielo muy fino aquí.