forkjoin - fork join java docs
¿Qué determina la cantidad de hilos que crea Java ForkJoinPool? (4)
De los comentarios fuente:
Compensación: a menos que ya haya suficientes subprocesos en vivo, el método tryPreBlock () puede crear o reactivar un subproceso adicional para compensar los bloqueadores bloqueados hasta que se desbloqueen.
Creo que lo que está sucediendo es que no está terminando ninguna de las tareas muy rápidamente, y dado que no hay subprocesos de trabajo disponibles cuando envía una nueva tarea, se crea un nuevo subproceso.
Por lo que yo había entendido ForkJoinPool
, ese grupo crea un número fijo de subprocesos (por defecto: número de núcleos) y nunca creará más subprocesos (a menos que la aplicación indique una necesidad de ellos mediante el uso de managedBlock
).
Sin embargo, al usar ForkJoinPool.getPoolSize()
descubrí que en un programa que crea 30,000 tareas ( RecursiveAction
), ForkJoinPool
ejecuta esas tareas usando 700 hilos en promedio (hilos contados cada vez que se crea una tarea). Las tareas no hacen I / O, sino computación pura; la única sincronización entre tareas es llamar a ForkJoinTask.join()
y acceder a AtomicBoolean
s, es decir, no hay operaciones de bloqueo de hilos.
Dado que join()
no bloquea el hilo de llamada tal como lo entiendo, no hay ninguna razón por la que un hilo en el grupo deba bloquearse, y así (lo había supuesto) no debería haber ningún motivo para crear más hilos (lo cual obviamente sucediendo sin embargo).
Entonces, ¿por qué ForkJoinPool
crea tantos hilos? ¿Qué factores determinan la cantidad de hilos creados?
Tenía la esperanza de que esta pregunta se pudiera responder sin publicar código, pero aquí se puede solicitar. Este código es un extracto de un programa de cuatro veces el tamaño, reducido a las partes esenciales; no compila como es. Si lo desea, puedo, por supuesto, publicar el programa completo también.
El programa busca en un laberinto un camino desde un punto de inicio dado hasta un punto final dado utilizando la búsqueda de profundidad en primer lugar. Una solución está garantizada para existir. La lógica principal está en el método compute()
de SolverTask
: Una SolverTask
RecursiveAction
que comienza en un punto determinado y continúa con todos los puntos vecinos accesibles desde el punto actual. En lugar de crear una nueva SolverTask
en cada punto de bifurcación (que crearía demasiadas tareas), empuja a todos los vecinos, excepto uno, hacia una pila de retroceso para procesarlos más tarde y continúa con solo un vecino que no se haya enviado a la pila. Una vez que llega a un callejón sin salida de esa manera, el punto más recientemente empujado a la pila de retroceso se abre, y la búsqueda continúa desde allí (recortando la ruta construida desde el punto de partida de las tomas en consecuencia). Se crea una nueva tarea una vez que una tarea encuentra su pila de retroceso más grande que un cierto umbral; a partir de ese momento, la tarea, mientras sigue saliendo de su pila de retroceso hasta que se agote, no empujará ningún otro punto a su pila cuando llegue a un punto de bifurcación, sino que creará una nueva tarea para cada punto. Por lo tanto, el tamaño de las tareas se puede ajustar utilizando el umbral de límite de pila.
Los números que cité arriba ("30,000 tareas, 700 hilos en promedio") provienen de buscar en un laberinto de 5000x5000 celdas. Entonces, aquí está el código esencial:
class SolverTask extends RecursiveTask<ArrayDeque<Point>> {
// Once the backtrack stack has reached this size, the current task
// will never add another cell to it, but create a new task for each
// newly discovered branch:
private static final int MAX_BACKTRACK_CELLS = 100*1000;
/**
* @return Tries to compute a path through the maze from local start to end
* and returns that (or null if no such path found)
*/
@Override
public ArrayDeque<Point> compute() {
// Is this task still accepting new branches for processing on its own,
// or will it create new tasks to handle those?
boolean stillAcceptingNewBranches = true;
Point current = localStart;
ArrayDeque<Point> pathFromLocalStart = new ArrayDeque<Point>(); // Path from localStart to (including) current
ArrayDeque<PointAndDirection> backtrackStack = new ArrayDeque<PointAndDirection>();
// Used as a stack: Branches not yet taken; solver will backtrack to these branching points later
Direction[] allDirections = Direction.values();
while (!current.equals(end)) {
pathFromLocalStart.addLast(current);
// Collect current''s unvisited neighbors in random order:
ArrayDeque<PointAndDirection> neighborsToVisit = new ArrayDeque<PointAndDirection>(allDirections.length);
for (Direction directionToNeighbor: allDirections) {
Point neighbor = current.getNeighbor(directionToNeighbor);
// contains() and hasPassage() are read-only methods and thus need no synchronization
if (maze.contains(neighbor) && maze.hasPassage(current, neighbor) && maze.visit(neighbor))
neighborsToVisit.add(new PointAndDirection(neighbor, directionToNeighbor.opposite));
}
// Process unvisited neighbors
if (neighborsToVisit.size() == 1) {
// Current node is no branch: Continue with that neighbor
current = neighborsToVisit.getFirst().getPoint();
continue;
}
if (neighborsToVisit.size() >= 2) {
// Current node is a branch
if (stillAcceptingNewBranches) {
current = neighborsToVisit.removeLast().getPoint();
// Push all neighbors except one on the backtrack stack for later processing
for(PointAndDirection neighborAndDirection: neighborsToVisit)
backtrackStack.push(neighborAndDirection);
if (backtrackStack.size() > MAX_BACKTRACK_CELLS)
stillAcceptingNewBranches = false;
// Continue with the one neighbor that was not pushed onto the backtrack stack
continue;
} else {
// Current node is a branch point, but this task does not accept new branches any more:
// Create new task for each neighbor to visit and wait for the end of those tasks
SolverTask[] subTasks = new SolverTask[neighborsToVisit.size()];
int t = 0;
for(PointAndDirection neighborAndDirection: neighborsToVisit) {
SolverTask task = new SolverTask(neighborAndDirection.getPoint(), end, maze);
task.fork();
subTasks[t++] = task;
}
for (SolverTask task: subTasks) {
ArrayDeque<Point> subTaskResult = null;
try {
subTaskResult = task.join();
} catch (CancellationException e) {
// Nothing to do here: Another task has found the solution and cancelled all other tasks
}
catch (Exception e) {
e.printStackTrace();
}
if (subTaskResult != null) { // subtask found solution
pathFromLocalStart.addAll(subTaskResult);
// No need to wait for the other subtasks once a solution has been found
return pathFromLocalStart;
}
} // for subTasks
} // else (not accepting any more branches)
} // if (current node is a branch)
// Current node is dead end or all its neighbors lead to dead ends:
// Continue with a node from the backtracking stack, if any is left:
if (backtrackStack.isEmpty()) {
return null; // No more backtracking avaible: No solution exists => end of this task
}
// Backtrack: Continue with cell saved at latest branching point:
PointAndDirection pd = backtrackStack.pop();
current = pd.getPoint();
Point branchingPoint = current.getNeighbor(pd.getDirectionToBranchingPoint());
// DEBUG System.out.println("Backtracking to " + branchingPoint);
// Remove the dead end from the top of pathSoFar, i.e. all cells after branchingPoint:
while (!pathFromLocalStart.peekLast().equals(branchingPoint)) {
// DEBUG System.out.println(" Going back before " + pathSoFar.peekLast());
pathFromLocalStart.removeLast();
}
// continue while loop with newly popped current
} // while (current ...
if (!current.equals(end)) {
// this task was interrupted by another one that already found the solution
// and should end now therefore:
return null;
} else {
// Found the solution path:
pathFromLocalStart.addLast(current);
return pathFromLocalStart;
}
} // compute()
} // class SolverTask
@SuppressWarnings("serial")
public class ParallelMaze {
// for each cell in the maze: Has the solver visited it yet?
private final AtomicBoolean[][] visited;
/**
* Atomically marks this point as visited unless visited before
* @return whether the point was visited for the first time, i.e. whether it could be marked
*/
boolean visit(Point p) {
return visited[p.getX()][p.getY()].compareAndSet(false, true);
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
ParallelMaze maze = new ParallelMaze(width, height, new Point(width-1, 0), new Point(0, height-1));
// Start initial task
long startTime = System.currentTimeMillis();
// since SolverTask.compute() expects its starting point already visited,
// must do that explicitly for the global starting point:
maze.visit(maze.start);
maze.solution = pool.invoke(new SolverTask(maze.start, maze.end, maze));
// One solution is enough: Stop all tasks that are still running
pool.shutdownNow();
pool.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
long endTime = System.currentTimeMillis();
System.out.println("Computed solution of length " + maze.solution.size() + " to maze of size " +
width + "x" + height + " in " + ((float)(endTime - startTime))/1000 + "s.");
}
Hay preguntas relacionadas en :
Stands de ForkJoinPool durante invokeAll / join
ForkJoinPool parece perder un hilo
Hice una versión simplificada ejecutable de lo que está sucediendo (argumentos jvm que utilicé: -Xms256m -Xmx1024m -Xss8m):
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
public class Test1 {
private static ForkJoinPool pool = new ForkJoinPool(2);
private static class SomeAction extends RecursiveAction {
private int counter; //recursive counter
private int childrenCount=80;//amount of children to spawn
private int idx; // just for displaying
private SomeAction(int counter, int idx) {
this.counter = counter;
this.idx = idx;
}
@Override
protected void compute() {
System.out.println(
"counter=" + counter + "." + idx +
" activeThreads=" + pool.getActiveThreadCount() +
" runningThreads=" + pool.getRunningThreadCount() +
" poolSize=" + pool.getPoolSize() +
" queuedTasks=" + pool.getQueuedTaskCount() +
" queuedSubmissions=" + pool.getQueuedSubmissionCount() +
" parallelism=" + pool.getParallelism() +
" stealCount=" + pool.getStealCount());
if (counter <= 0) return;
List<SomeAction> list = new ArrayList<>(childrenCount);
for (int i=0;i<childrenCount;i++){
SomeAction next = new SomeAction(counter-1,i);
list.add(next);
next.fork();
}
for (SomeAction action:list){
action.join();
}
}
}
public static void main(String[] args) throws Exception{
pool.invoke(new SomeAction(2,0));
}
}
Aparentemente cuando realiza una combinación, el hilo actual ve que la tarea requerida aún no se ha completado y toma otra tarea para él.
Sucede en java.util.concurrent.ForkJoinWorkerThread#joinTask
.
Sin embargo, esta nueva tarea genera más de las mismas tareas, pero no pueden encontrar subprocesos en el grupo, porque los hilos están bloqueados en unirse. Y como no tiene forma de saber cuánto tiempo requerirá que se liberen (el hilo podría estar en un bucle infinito o bloqueado para siempre), se engendrarán nuevos hilos (se compensarán los hilos unidos como lo mencionó Louis Wasserman ): java.util.concurrent.ForkJoinPool#signalWork
Por lo tanto, para evitar ese escenario, debe evitar el engendro recursivo de tareas.
Por ejemplo, si en el código anterior establece el parámetro inicial en 1, la cantidad de hilo activo será 2, incluso si aumenta childrenCount diez veces.
También tenga en cuenta que, aunque la cantidad de subprocesos activos aumenta, la cantidad de subprocesos en ejecución es menor o igual que el paralelismo .
Los dos fragmentos de código publicados por y realidad no siguen la práctica recomendada que apareció en javadoc para la versión 1.8 :
En los usos más típicos, un par fork-join actúa como una llamada (fork) y return (join) desde una función recursiva paralela. Como en el caso de otras formas de llamadas recursivas, las devoluciones (uniones) se deben realizar en la parte más interna. Por ejemplo, a.fork (); b.fork (); b.join (); a.join (); es probable que sea sustancialmente más eficiente que unir el código a antes del código b .
En ambos casos, FJPool se instanciaba mediante un constructor predeterminado. Esto lleva a la construcción del grupo con asyncMode = false , que es el predeterminado:
@param asyncMode si es verdadero,
establece el modo de programación local primero en entrar, primero en salir, para las tareas bifurcadas que nunca se unen. Este modo puede ser más apropiado que el modo predeterminado basado en la pila local en aplicaciones en las que los hilos de trabajo solo procesan tareas asíncronas de estilo de evento. Para el valor predeterminado, use falso.
de esa manera la cola de trabajo es en realidad lifo:
cabeza -> | t4 | t3 | t2 | t1 | ... | <- cola
Entonces en fragmentos, bifurcan () todas las tareas empujándolas en la pila y luego join () en el mismo orden, es decir, desde la tarea más profunda (t1) hasta la más alta (t4) bloqueando hasta que otro hilo robe (t1), luego (t2 ) y así. Como hay una gran cantidad de tareas para bloquear todos los subprocesos del pool (task_count >> pool.getParallelism ()), se activa la compensación como describió .
strict, full-strict y terminually-strict tienen que ver con el procesamiento de un gráfico acíclico dirigido (DAG). Puede googlear esos términos para obtener una comprensión completa de ellos. Ese es el tipo de procesamiento que el marco fue diseñado para procesar. Mire el código en la API de Recursive ..., la infraestructura se basa en su código de cálculo () para hacer otros enlaces de cálculo () y luego hacer una combinación (). Cada Tarea realiza una sola unión () al igual que el procesamiento de un DAG.
No estás haciendo el procesamiento DAG. Está bifurcando muchas Tareas nuevas y esperando (join ()) en cada una. Lee en el código fuente. Es horrendo y complejo, pero es posible que puedas resolverlo. El marco no hace una gestión de tareas adecuada. ¿Dónde colocará la Tarea de espera cuando haga un join ()? No hay una cola suspendida, que requeriría un hilo de monitor para mirar constantemente la cola y ver qué está terminado. Esta es la razón por la cual el marco usa "hilos de continuación". Cuando una tarea se une (), el marco asume que está esperando que finalice una única tarea inferior. Cuando hay muchos métodos join (), el hilo no puede continuar, por lo que debe existir un helper o thread de continuación.
Como se señaló anteriormente, necesita un proceso de ensamblaje de horquillas de tipo scatter-gather. Allí puedes bifurcar tantas Tareas