retorno resaca electrica corriente c++ multithreading algorithm asynchronous future

c++ - electrica - corriente de resaca



Construyendo una cadena de retorno de llamada ''futura'' asíncrona desde el gráfico de dependencia en tiempo de compilación(DAG) (6)

Considere usar la biblioteca de Gráficos de Flujo TBB de Intel.

Tengo un gráfico acíclico dirigido en tiempo de compilación de tareas asincrónicas. El DAG muestra las dependencias entre las tareas: al analizarlo, es posible comprender qué tareas se pueden ejecutar en paralelo (en hilos separados) y qué tareas deben esperar para que otras tareas finalicen antes de que puedan comenzar (dependencias ).

Quiero generar una cadena de devolución de llamada desde el DAG, utilizando boost::future y .then(...) , when_all(...) funciones de ayuda de continuación. El resultado de esta generación será una función que, cuando se invoca, iniciará la cadena de devolución de llamada y ejecutará las tareas tal como lo describe el DAG, ejecutando tantas tareas como sea posible en paralelo.

Sin embargo, estoy teniendo problemas para encontrar un algoritmo general que pueda funcionar en todos los casos.

Hice algunos dibujos para que el problema sea más fácil de entender. Esta es una leyenda que le mostrará lo que significan los símbolos en los dibujos:

Comencemos con un DAG lineal simple:

Este gráfico de dependencia consta de tres tareas ( A , B y C ) . C depende de B B depende de A No hay posibilidad de paralelismo aquí; el algoritmo de generación construiría algo similar a esto:

boost::future<void> A, B, C, end; A.then([] { B.then([] { C.get(); end.get(); }); });

(Tenga en cuenta que todas las muestras de código no son 100% válidas, estoy ignorando la semántica de movimiento, el reenvío y las capturas lambda).

Hay muchos enfoques para resolver este DAG lineal: ya sea comenzando desde el final o desde el principio, es trivial construir una cadena de devolución de llamada correcta.

Las cosas comienzan a complicarse cuando se introducen los tenedores y las combinaciones .

Aquí hay un DAG con un tenedor / join:

Es difícil pensar en una cadena de devolución de llamada que coincida con este DAG. Si trato de trabajar hacia atrás, comenzando desde el final, mi razonamiento es el siguiente:

  • end depende de B y D (unirse)
    • D depende de C
    • B y C dependen de A (tenedor)

Una posible cadena se ve así:

boost::future<void> A, B, C, D, end; A.then([] { boost::when_all(B, C.then([] { D.get(); })) .then([] { end.get(); }); });

Me resultó difícil escribir esta cadena a mano, y también dudo sobre su corrección. No se me ocurre una forma general de implementar un algoritmo que pueda generar esto: dificultades adicionales también están presentes debido a que when_all necesita que sus argumentos se muevan hacia él.

Veamos un último ejemplo, incluso más complejo:

Aquí queremos explotar el paralelismo tanto como sea posible. Considere la tarea E : E se puede ejecutar en paralelo con cualquiera de [B, C, D] .

Esta es una posible cadena de devolución de llamada:

boost::future<void> A, B, C, D, E, F, end; A.then([] { boost::when_all(boost::when_all(B, C).then([] { D.get(); }), E) .then([] { F.then([] { end.get(); }); }); });

Intenté crear un algoritmo general de varias maneras:

  • Comenzando desde el comienzo del DAG, tratando de construir la cadena usando .then(...) continuaciones. Esto no funciona con combinaciones, ya que la tarea de unión de destino se repetiría varias veces.

  • Comenzando desde el final del DAG, tratando de generar la cadena usando when_all(...) continuations. Esto falla con las horquillas, ya que el nodo que crea la horquilla se repite varias veces.

Obviamente, el enfoque de "amplitud transversal" no funciona bien aquí. A partir de las muestras de código que he escrito a mano, parece que el algoritmo debe tener en cuenta las bifurcaciones y las uniones, y necesita poder mezclar correctamente .then(...) y when_all(...) continuaciones.

Aquí están mis últimas preguntas:

  • ¿Siempre es posible generar una cadena de devolución de llamada basada en el future desde un DAG de dependencias de tareas, donde cada tarea aparece solo una vez en la cadena de devolución de llamada?

  • Si es así, ¿cómo puede implementarse un algoritmo general que, dada una dependencia de tareas en la que DAG construye una cadena de devolución de llamada?

EDIT 1:

Aquí hay un enfoque adicional que estoy tratando de explorar.

La idea es generar una estructura de datos de mapa ([dependencies...] -> [dependents...]) del DAG y generar la cadena de devolución de llamada desde ese mapa.

Si len(dependencies...) > 1 , entonces el value es un nodo de unión .

Si len(dependents...) > 1 , entonces la key es un nodo de horquilla .

Todos los pares clave-valor en el mapa se pueden expresar como when_all(keys...).then(values...) continuations.

La parte difícil es encontrar el orden correcto en el que "expandir" (pensar en algo similar a un analizador sintáctico) los nodos y cómo conectar el tenedor / unir las continuaciones.

Considere el siguiente mapa, generado por la imagen .

depenendencies | dependents ----------------|------------- [F] : [end] [D, E] : [F] [B, C] : [D] [A] : [E, C, B] [begin] : [A]

Al aplicar algún tipo de reducciones / pases parecidos a los analizadores, podemos obtener una cadena de devolución de llamada "limpia":

// First pass: // Convert everything to `when_all(...).then(...)` notation when_all(F).then(end) when_all(D, E).then(F) when_all(B, C).then(D) when_all(A).then(E, C, B) when_all(begin).then(A) // Second pass: // Solve linear (trivial) transformations when_all(D, E).then( when_all(F).then(end) ) when_all(B, C).then(D) when_all( when_all(begin).then(A) ).then(E, C, B) // Third pass: // Solve fork/join transformations when_all( when_all(begin).then(A) ).then( when_all( E, when_all(B, C).then(D) ).then( when_all(F).then(end) ) )

El tercer pase es el más importante, pero también el que parece realmente difícil de diseñar un algoritmo.

Observe cómo se deben encontrar [B, C] dentro de la lista [E, C, B] , y cómo, en la lista de dependencias [D, E] , D debe interpretarse como el resultado de when_all(B, C).then(D) y encadenado junto con E en when_all(E, when_all(B, C).then(D)) .

Tal vez todo el problema se pueda simplificar como:

Dado un mapa que consta de pares de valores clave [dependencies...] -> [dependents...] , ¿cómo podría ser un algoritmo que transforma esos pares a una cadena de continuación when_all(...) / .then(...) ¿implementado?

EDICION 2:

Aquí hay un pseudocode que surgió para el enfoque descrito anteriormente. Parece que funciona para el DAG que probé, pero necesito dedicarle más tiempo y probarlo "mentalmente" con otras configuraciones DAG más complicadas.


Este gráfico no está construido en tiempo de compilación, pero no es claro para mí si eso es un requisito. El gráfico se mantiene en un gráfico de impulso implementado como adjacency_list<vecS, vecS, bidirectionalS> . Un solo despacho iniciará las tareas. Solo necesitamos los bordes internos en cada nodo, para que sepamos exactamente lo que estamos esperando. Eso precalculado en instanciación en el programador a continuación.

Yo sostengo que un tipo topológico completo no es necesario.

Por ejemplo, si el gráfico de dependencia fuera:

use scheduler_driver.cpp

Para una unión como en

simplemente redefina el Graph para definir los bordes dirigidos.

Por lo tanto, para responder a sus 2 preguntas:

. Sí, para un DAG. Solo se necesitan dependencias únicas e inmediatas para cada nodo, que se pueden precalcular como se muestra a continuación. La cadena de dependencia se puede iniciar con un solo envío y la cadena de dominó se cae.

. Sí, vea el algoritmo a continuación (usando C ++ 11 hilos, no boost::thread ). Para las horquillas, se requiere un shared_future para la comunicación, mientras que las uniones son compatibles con la comunicación basada en el future .

scheduler_driver.hpp:

#ifndef __SCHEDULER_DRIVER_HPP__ #define __SCHEDULER_DRIVER_HPP__ #include <iostream> #include <ostream> #include <iterator> #include <vector> #include <chrono> #include "scheduler.h" #endif

scheduler_driver.cpp:

#include "scheduler_driver.hpp" enum task_nodes { task_0, task_1, task_2, task_3, task_4, task_5, task_6, task_7, task_8, task_9, N }; int basic_task(int a, int d) { std::chrono::milliseconds sleepDuration(d); std::this_thread::sleep_for(sleepDuration); std::cout << "Result: " << a << "/n"; return a; } using namespace SCHEDULER; int main(int argc, char **argv) { using F = std::function<R()>; Graph deps(N); boost::add_edge(task_0, task_1, deps); boost::add_edge(task_0, task_2, deps); boost::add_edge(task_0, task_3, deps); boost::add_edge(task_1, task_4, deps); boost::add_edge(task_1, task_5, deps); boost::add_edge(task_1, task_6, deps); boost::add_edge(task_2, task_7, deps); boost::add_edge(task_2, task_8, deps); boost::add_edge(task_2, task_9, deps); std::vector<F> tasks = { std::bind(basic_task, 0, 1000), std::bind(basic_task, 1, 1000), std::bind(basic_task, 2, 1000), std::bind(basic_task, 3, 1000), std::bind(basic_task, 4, 1000), std::bind(basic_task, 5, 1000), std::bind(basic_task, 6, 1000), std::bind(basic_task, 7, 1000), std::bind(basic_task, 8, 1000), std::bind(basic_task, 9, 1000) }; auto s = std::make_unique<scheduler<int>>(std::move(deps), std::move(tasks)); s->doit(); return 0; }

scheduler.h:

#ifndef __SCHEDULER2_H__ #define __SCHEDULER2_H__ #include <iostream> #include <vector> #include <iterator> #include <functional> #include <algorithm> #include <mutex> #include <thread> #include <future> #include <boost/graph/graph_traits.hpp> #include <boost/graph/adjacency_list.hpp> #include <boost/graph/depth_first_search.hpp> #include <boost/graph/visitors.hpp> using namespace boost; namespace SCHEDULER { using Graph = adjacency_list<vecS, vecS, bidirectionalS>; using Edge = graph_traits<Graph>::edge_descriptor; using Vertex = graph_traits<Graph>::vertex_descriptor; using VectexCont = std::vector<Vertex>; using outIt = graph_traits<Graph>::out_edge_iterator; using inIt = graph_traits<Graph>::in_edge_iterator; template<typename R> class scheduler { public: using ret_type = R; using fun_type = std::function<R()>; using prom_type = std::promise<ret_type>; using fut_type = std::shared_future<ret_type>; scheduler() = default; scheduler(const Graph &deps_, const std::vector<fun_type> &tasks_) : g(deps_), tasks(tasks_) { init_();} scheduler(Graph&& deps_, std::vector<fun_type>&& tasks_) : g(std::move(deps_)), tasks(std::move(tasks_)) { init_(); } scheduler(const scheduler&) = delete; scheduler& operator=(const scheduler&) = delete; void doit(); private: void init_(); std::list<Vertex> get_sources(const Vertex& v); auto task_thread(fun_type&& f, int i); Graph g; std::vector<fun_type> tasks; std::vector<prom_type> prom; std::vector<fut_type> fut; std::vector<std::thread> th; std::vector<std::list<Vertex>> sources; }; template<typename R> void scheduler<R>::init_() { int num_tasks = tasks.size(); prom.resize(num_tasks); fut.resize(num_tasks); // Get the futures for(size_t i=0; i<num_tasks; ++i) { fut[i] = prom[i].get_future(); } // Predetermine in_edges for faster traversal sources.resize(num_tasks); for(size_t i=0; i<num_tasks; ++i) { sources[i] = get_sources(i); } } template<typename R> std::list<Vertex> scheduler<R>::get_sources(const Vertex& v) { std::list<Vertex> r; Vertex v1; inIt j, j_end; boost::tie(j,j_end) = in_edges(v, g); for(;j != j_end;++j) { v1 = source(*j, g); r.push_back(v1); } return r; } template<typename R> auto scheduler<R>::task_thread(fun_type&& f, int i) { auto j_beg = sources[i].begin(), j_end = sources[i].end(); for(; j_beg != j_end; ++j_beg) { R val = fut[*j_beg].get(); } return std::thread([this](fun_type f, int i) { prom[i].set_value(f()); },f,i); } template<typename R> void scheduler<R>::doit() { size_t num_tasks = tasks.size(); th.resize(num_tasks); for(int i=0; i<num_tasks; ++i) { th[i] = task_thread(std::move(tasks[i]), i); } for_each(th.begin(), th.end(), mem_fn(&std::thread::join)); } } // namespace SCHEDULER #endif


Esto parece razonablemente fácil si deja de pensar en ello en forma de dependencias explícitas y organizando un DAG. Cada tarea puede organizarse de la siguiente manera (C # porque es mucho más simple explicar la idea):

class MyTask { // a list of all tasks that depend on this to be finished private readonly ICollection<MyTask> _dependenants; // number of not finished dependencies of this task private int _nrDependencies; public int NrDependencies { get { return _nrDependencies; } private set { _nrDependencies = value; } } }

Si ha organizado su DAG de tal forma, el problema es muy simple: cada tarea donde se puede ejecutar _nrDependencies == 0 . Entonces, necesitamos un método de ejecución que se parezca a lo siguiente:

public async Task RunTask() { // Execute actual code of the task. var tasks = new List<Task>(); foreach (var dependent in _dependenants) { if (Interlocked.Decrement(ref dependent._nrDependencies) == 0) { tasks.Add(Task.Run(() => dependent.RunTask())); } } await Task.WhenAll(tasks); }

Básicamente, tan pronto como termina nuestra tarea, examinamos a todos nuestros dependientes y ejecutamos todos aquellos que no tienen más dependencias sin terminar.

Para comenzar todo, lo único que tienes que hacer es llamar a RunTask() para todas las tareas que tienen cero dependientes para empezar (al menos uno de ellos debe existir ya que tenemos un DAG). Tan pronto como hayan finalizado todas estas tareas, sabemos que se ha ejecutado todo el DAG.


La forma más fácil es comenzar desde el nodo de entrada del gráfico, como si estuvieras escribiendo el código a mano. Para resolver el problema de join , no puede usar una solución recursiva, necesita tener un ordenamiento topológico de su gráfico y luego construir el gráfico de acuerdo con el orden.

Esto le da la garantía de que cuando construye un nodo, todos sus predecesores ya han sido creados.

Para lograr este objetivo, podemos usar un DFS, con postordering inverso .

Una vez que tenga una clasificación topológica, puede olvidar los ID de nodo originales y hacer referencia a los nodos con su número en la lista. Para hacerlo, necesita crear un mapa de tiempo de compilación que permita recuperar los predecesores de nodo utilizando el índice de nodo en la clasificación topológica en lugar del índice de nodo original del nodo.

EDITAR : Continuando con la implementación de la clasificación topológica en tiempo de compilación, modifiqué esta respuesta.

Para estar en la misma página, asumiré que su gráfica se ve así:

struct mygraph { template<int Id> static constexpr auto successors(node_id<Id>) -> list< node_id<> ... >; //List of successors for the input node template<int Id> static constexpr auto predecessors(node_id<Id>) -> list< node_id<> ... >; //List of predecessors for the input node //Get the task associated with the given node. template<int Id> static constexpr auto task(node_id<Id>); using entry_node = node_id<0>; };

Paso 1: clasificación topológica

El ingrediente básico que necesita es un conjunto de compilación de ID de nodo. En TMP, un conjunto también es una lista, simplemente porque en set<Ids...> el orden de los Ids . Esto significa que puede usar la misma estructura de datos para codificar la información sobre si un nodo fue visitado Y el orden resultante al mismo tiempo.

/** Topological sort using DFS with reverse-postordering **/ template<class Graph> struct topological_sort { private: struct visit; // If we reach a node that we already visited, do nothing. template<int Id, int ... Is> static constexpr auto visit_impl( node_id<Id>, set<Is...> visited, std::true_type ) { return visited; } // This overload kicks in when node has not been visited yet. template<int Id, int ... Is> static constexpr auto visit_impl( node_id<Id> node, set<Is...> visited, std::false_type ) { // Get the list of successors for the current node constexpr auto succ = Graph::successors(node); // Reverse postordering: we call insert *after* visiting the successors // This will call "visit" on each successor, updating the // visited set after each step. // Then we insert the current node in the set. // Notice that if the graph is cyclic we end up in an infinite // recursion here. return fold( succ, visited, visit() ).insert(node); // Conventional DFS would be: // return fold( succ, visited.insert(node), visit() ); } struct visit { // Dispatch to visit_impl depending on the result of visited.contains(node) // Note that "contains" returns a type convertible to // integral_constant<bool,x> template<int Id, int ... Is> constexpr auto operator()( set<Is...> visited, node_id<Id> node ) const { return visit_impl(node, visited, visited.contains(node) ); } }; public: template<int StartNodeId> static constexpr auto compute( node_id<StartNodeId> node ) { // Start visiting from the entry node // The set of visited nodes is initially empty. // "as_list" converts set<Is ... > to list< node_id<Is> ... >. return reverse( visit()( set<>{}, node ).as_list() ); } };

Este algoritmo con el gráfico de su último ejemplo (asumiendo A = node_id<0> , B = node_id<1> , etc.), produce la list<A,B,C,D,E,F> .

Paso 2: mapa del gráfico

Esto es simplemente un adaptador que modifica el Id de cada nodo en su gráfico de acuerdo con un orden determinado. Entonces, suponiendo que los pasos anteriores regresaran a la list<C,D,A,B> , este graph_map el índice 0 a C , índice 1 a D , etc.

template<class Graph, class List> class graph_map { // Convert a node_id from underlying graph. // Use a function-object so that it can be passed to algorithms. struct from_underlying { template<int I> constexpr auto operator()(node_id<I> id) { return node_id< find(id, List{}) >{}; } }; struct to_underlying { template<int I> constexpr auto operator()(node_id<I> id) { return get<I>(List{}); } }; public: template<int Id> static constexpr auto successors( node_id<Id> id ) { constexpr auto orig_id = to_underlying()(id); constexpr auto orig_succ = Graph::successors( orig_id ); return transform( orig_succ, from_underlying() ); } template<int Id> static constexpr auto predecessors( node_id<Id> id ) { constexpr auto orig_id = to_underlying()(id); constexpr auto orig_succ = Graph::predecessors( orig_id ); return transform( orig_succ, from_underlying() ); } template<int Id> static constexpr auto task( node_id<Id> id ) { return Graph::task( to_underlying()(id) ); } using entry_node = decltype( from_underlying()( typename Graph::entry_node{} ) ); };

Paso 3: ensamblar el resultado

Ahora podemos iterar sobre cada ID de nodo en orden. Gracias a la forma en que construimos el mapa de gráficos, sabemos que todos los predecesores de I tienen un ID de nodo que es menor que I , para cada nodo I posible.

// Returns a tuple<> of futures template<class GraphMap, class ... Ts> auto make_cont( std::tuple< future<Ts> ... > && pred ) { // The next node to work with is N: constexpr auto current_node = node_id< sizeof ... (Ts) >(); // Get a list of all the predecessors for the current node. auto indices = GraphMap::predecessors( current_node ); // "select" is some magic function that takes a tuple of Ts // and an index_sequence, and returns a tuple of references to the elements // from the input tuple that are in the indices list. auto futures = select( pred, indices ); // Assuming you have an overload of when_all that takes a tuple, // otherwise use C++17 apply. auto join = when_all( futures ); // Note: when_all with an empty parameter list returns a future< tuple<> >, // which is always ready. // In general this has to be a shared_future, but you can avoid that // by checking if this node has only one successor. auto next = join.then( GraphMap::task( current_node ) ).share(); // Return a new tuple of futures, pushing the new future at the back. return std::tuple_cat( std::move(pred), std::make_tuple(std::move(next)) ); } // Returns a tuple of futures, you can take the last element if you // know that your DAG has only one leaf, or do some additional // processing to extract only the leaf nodes. template<class Graph> auto make_callback_chain() { constexpr auto entry_node = typename Graph::entry_node{}; constexpr auto sorted_list = topological_sort<Graph>::compute( entry_node ); using map = graph_map< Graph, decltype(sorted_list) >; // Note: we are not really using the "index" in the functor here, // we only want to call make_cont once for each node in the graph return fold( sorted_list, std::make_tuple(), //Start with an empty tuple []( auto && tuple, auto index ) { return make_cont<map>(std::move(tuple)); } ); }

Demostración en vivo


No estoy seguro de cuál es tu configuración y por qué necesitas construir DAG, pero creo que ese simple algoritmo codicioso puede ser suficiente.

when (some task have finished) { mark output resources done; find all tasks that can be run; post them to thread pool; }


Si pueden producirse dependencias redundantes, elimínelas primero (consulte, por ejemplo, https://mathematica.stackexchange.com/questions/33638/remove-redundant-dependencies-from-a-directed-acyclic-graph acyclic- https://mathematica.stackexchange.com/questions/33638/remove-redundant-dependencies-from-a-directed-acyclic-graph ).

A continuación, realice las siguientes transformaciones gráficas (creación de subexpresiones en nodos fusionados) hasta que se encuentre en un solo nodo (de forma similar a como calcularía una red de resistencias):

* : Dependencias adicionales entrantes o salientes, según la ubicación

(...) : Expresión en un solo nodo

Código Java, incluida la configuración para su ejemplo más complejo:

public class DirectedGraph { /** Set of all nodes in the graph */ static Set<Node> allNodes = new LinkedHashSet<>(); static class Node { /** Set of all preceeding nodes */ Set<Node> prev = new LinkedHashSet<>(); /** Set of all following nodes */ Set<Node> next = new LinkedHashSet<>(); String value; Node(String value) { this.value = value; allNodes.add(this); } void addPrev(Node other) { prev.add(other); other.next.add(this); } /** Returns one of the next nodes */ Node anyNext() { return next.iterator().next(); } /** Merges this node with other, then removes other */ void merge(Node other) { prev.addAll(other.prev); next.addAll(other.next); for (Node on: other.next) { on.prev.remove(other); on.prev.add(this); } for (Node op: other.prev) { op.next.remove(other); op.next.add(this); } prev.remove(this); next.remove(this); allNodes.remove(other); } public String toString() { return value; } } /**  * Merges sequential or parallel nodes following the given node. * Returns true if any node was merged. */ public static boolean processNode(Node node) { // Check if we are the start of a sequence. Merge if so. if (node.next.size() == 1 && node.anyNext().prev.size() == 1) { Node then = node.anyNext(); node.value += " then " + then.value; node.merge(then); return true; } // See if any of the next nodes has a parallel node with // the same one level indirect target. for (Node next : node.next) { // Nodes must have only one in and out connection to be merged. if (next.prev.size() == 1 && next.next.size() == 1) { // Collect all parallel nodes with only one in and out connection // and the same target; the same source is implied by iterating over // node.next again. Node target = next.anyNext().next(); Set<Node> parallel = new LinkedHashSet<Node>(); for (Node other: node.next) { if (other != next && other.prev.size() == 1 && other.next.size() == 1 && other.anyNext() == target) { parallel.add(other); } } // If we have found any "parallel" nodes, merge them if (parallel.size() > 0) { StringBuilder sb = new StringBuilder("allNodes("); sb.append(next.value); for (Node other: parallel) { sb.append(", ").append(other.value); next.merge(other); } sb.append(")"); next.value = sb.toString(); return true; } } } return false; } public static void main(String[] args) { Node a = new Node("A"); Node b = new Node("B"); Node c = new Node("C"); Node d = new Node("D"); Node e = new Node("E"); Node f = new Node("F"); f.addPrev(d); f.addPrev(e); e.addPrev(a); d.addPrev(b); d.addPrev(c); b.addPrev(a); c.addPrev(a); boolean anyChange; do { anyChange = false; for (Node node: allNodes) { if (processNode(node)) { anyChange = true; // We need to leave the inner loop here because changes // invalidate the for iteration. break; } } // We are done if we can''t find any node to merge. } while (anyChange); System.out.println(allNodes.toString()); } }

Salida: A then all(E, all(B, C) then D) then F