operaciones - Implementación de una cola de robo de trabajo en C/C++?
operaciones con colas (12)
¿Eliminar las tareas de trabajo en unidades más pequeñas elimina la necesidad de robar trabajo en primer lugar?
Estoy buscando una implementación adecuada de una cola de robo de trabajo en C / CPP. He buscado en Google pero no he encontrado nada útil.
Quizás alguien esté familiarizado con una buena implementación de código abierto. (Prefiero no implementar el pseudocódigo tomado de los documentos académicos originales).
Eche un vistazo a los bloques de construcción Threading de Intel.
No creo que JobSwarm use el robo de trabajo, pero es un primer paso. No estoy al tanto de otras librerías de código abierto para este propósito.
La clase structure_task_group del PPL utiliza una cola de robo de trabajo para su implementación. Si necesita un WSQ para enhebrar, lo recomendaría.
Si realmente está buscando la fuente, no sé si el código se da en ppl.h o si hay un objeto precompilado; Tendré que verificar cuando llegue a casa esta noche.
No sé si esto podría ayudarlo, pero eche un vistazo a este artículo en la red de desarrolladores de AMD, es simple, pero debería darle algo útil.
No hay almuerzo gratis.
Por favor, eche un vistazo al documento original de robo de trabajo . Este documento es difícil de entender. Sé que el documento contiene pruebas teóricas en lugar de pseudocódigos. Sin embargo, simplemente no hay una versión mucho más simple que TBB. Si hay alguno, no dará un rendimiento óptimo. El robo de trabajo en sí mismo genera cierta cantidad de sobrecarga, por lo que las optimizaciones y los trucos son bastante importantes. Especialmente, las dequeues deben ser seguras para hilos. Implementar sincronizaciones altamente escalables y de bajo costo es un desafío.
Realmente me pregunto por qué lo necesitas. Creo que una implementación adecuada significa algo como TBB y Cilk. De nuevo, el robo de trabajo es difícil de implementar.
Existe una herramienta para simplemente hacerlo de una manera muy elegante. Es una forma realmente efectiva de paralelizar su programa en muy poco tiempo.
Premio HPC Challenge
Nuestra entrada en Cilk para el premio HPC Challenge Class 2 ganó el premio 2006 por "Mejor combinación de elegancia y rendimiento". El premio fue otorgado en SC''06 en Tampa el 14 de noviembre de 2006.
Implementar el "robo de trabajo" no es difícil en teoría. Necesita un conjunto de colas que contengan tareas que funcionen haciendo una combinación de computación y generando otras tareas para hacer más trabajo. Y necesita acceso atómico a las colas para colocar tareas recién generadas en esas colas. Finalmente, necesita un procedimiento que cada tarea llame al final, para encontrar más trabajo para el hilo que ejecutó la tarea; ese procedimiento necesita buscar en colas de trabajo para encontrar trabajo.
La mayoría de estos sistemas de robo de trabajo suponen que hay una pequeña cantidad de subprocesos (respaldados típicamente por núcleos reales de procesadores), y que hay exactamente una cola de trabajos por subproceso. Luego, primero intenta robar el trabajo de su propia cola, y si está vacío, intente robarle a los demás. Lo que se complica es saber qué colas buscar; escanearlos en serie para el trabajo es bastante caro y puede crear una gran cantidad de disputas entre los hilos que buscan trabajo.
Hasta ahora, todo esto es bastante genérico con dos excepciones principales: 1) los contextos de conmutación (p. Ej., Establecer registros de contexto del procesador, como una "pila") no se pueden expresar en C o C ++ puros. Puede resolver esto aceptando escribir parte de su paquete en el código máquina específico de la plataforma objetivo. 2) El acceso atómico a las colas para un multiprocesador no se puede realizar puramente en C o C ++ (ignorando el algoritmo de Dekker), por lo que deberá codificar las primitivas de sincronización de lenguaje ensamblador como X86 LOCK XCH o Compare and Swap. Ahora, el código involucrado en la actualización de la queuse una vez que tiene acceso seguro no es muy complejo, y usted podría escribirlo fácilmente en unas pocas líneas de C.
Sin embargo, creo que se dará cuenta de que intentar codificar un paquete de este tipo en C y C ++ con ensamblador mixto todavía es bastante ineficiente y eventualmente terminará codificando todo el ensamblador de todos modos. Todo lo que queda son puntos de entrada compatibles con C / C ++: -}
Hice esto para nuestro lenguaje de programación paralelo PARLANSE , que ofrece la idea de un número arbitrariamente grande de cómputos paralelos en vivo e interactuando (sincronización) en cualquier instante. Se implementa entre bastidores en un X86 exactamente con un hilo por CPU, y la implementación está completamente en ensamblador. El código de robo de trabajo es probablemente de 1000 líneas en total, y su código complicado porque desea que sea extremadamente rápido en el caso de no contención.
El vuelo real en la pomada para C y C ++ es, cuando creas una tarea que representa trabajo, ¿cuánto espacio de pila asignas? Los programas en serie C / C ++ evitan esta pregunta simplemente sobreasignando grandes cantidades (por ej., 10Mb) de una pila lineal, y a nadie le importa mucho de cuánto de ese espacio de pila se desperdicia. Pero si puede crear miles de tareas y hacer que todas ellas vivan en un instante determinado, no puede asignar razonablemente 10Mb a cada una. Entonces, ahora es necesario determinar de forma estática cuánto espacio de pila necesitará una tarea (Turing-hard), o tendrá que asignar trozos de pila (por ejemplo, por llamada de función), que los compiladores de C / C ++ ampliamente disponibles no hacen (por ejemplo, el que probablemente esté usando). La última salida es estrangular la creación de tareas para limitarla a unos pocos cientos en cualquier instante, y multiplexar unos cientos de montones realmente enormes entre las tareas que están activas. No puede hacer lo último si las tareas pueden interbloquear / suspender el estado, porque se ejecutará en su umbral. Entonces solo puedes hacer esto si las tareas solo hacen computación. Eso parece una restricción bastante severa.
Para PARLANSE, construimos un compilador que asigna registros de activación en el montón para cada llamada de función.
Si estás buscando una implementación de clase de cola de trabajo independiente en C ++ basada en pthread o boost :: thread, buena suerte, que yo sepa, no hay ninguna.
Sin embargo, como otros han dicho, Cilk, TBB y PPL de Microsoft tienen implementaciones de protección laboral bajo el capó.
La pregunta es, ¿quieres utilizar una cola de trabajo o implementar una? Si solo quiere usar uno, las opciones anteriores son buenos puntos de partida; basta con programar una "tarea" en cualquiera de ellos.
Como BlueRaja dijo que el task_group & structured_task_group en PPL hará esto, también tenga en cuenta que estas clases también están disponibles en la última versión del TBB de Intel. Los bucles paralelos (parallel_for, parallel_for_each) también se implementan con workstealing.
Si debe ver el origen en lugar de utilizar una implementación, TBB es OpenSource y Microsoft envía las fuentes para su CRT, por lo que puede ir a la especulación.
También puede buscar en el blog de Joe Duffy una implementación de C # (pero es C # y el modelo de memoria es diferente).
-Almiar
La implementación más cercana de este algoritmo de robo de trabajo que he encontrado es algo llamado Wool por Karl-Filip Faxén. src / informe / comparación
He transferido este proyecto C a C ++.
El original Steal
puede experimentar una lectura sucia cuando se amplía la matriz. Traté de arreglar el error, pero finalmente cedí porque en realidad no necesitaba una pila en crecimiento dinámico. En lugar de intentar asignar espacio, el método Push
simplemente devuelve false
. La persona que llama puede realizar un spin-wait, es decir, while(!stack->Push(value)){}
.
#pragma once
#include <atomic>
// A lock-free stack.
// Push = single producer
// Pop = single consumer (same thread as push)
// Steal = multiple consumer
// All methods, including Push, may fail. Re-issue the request
// if that occurs (spinwait).
template<class T, size_t capacity = 131072>
class WorkStealingStack {
public:
inline WorkStealingStack() {
_top = 1;
_bottom = 1;
}
WorkStealingStack(const WorkStealingStack&) = delete;
inline ~WorkStealingStack()
{
}
// Single producer
inline bool Push(const T& item) {
auto oldtop = _top.load(std::memory_order_relaxed);
auto oldbottom = _bottom.load(std::memory_order_relaxed);
auto numtasks = oldbottom - oldtop;
if (
oldbottom > oldtop && // size_t is unsigned, validate the result is positive
numtasks >= capacity - 1) {
// The caller can decide what to do, they will probably spinwait.
return false;
}
_values[oldbottom % capacity].store(item, std::memory_order_relaxed);
_bottom.fetch_add(1, std::memory_order_release);
return true;
}
// Single consumer
inline bool Pop(T& result) {
size_t oldtop, oldbottom, newtop, newbottom, ot;
oldbottom = _bottom.fetch_sub(1, std::memory_order_release);
ot = oldtop = _top.load(std::memory_order_acquire);
newtop = oldtop + 1;
newbottom = oldbottom - 1;
// Bottom has wrapped around.
if (oldbottom < oldtop) {
_bottom.store(oldtop, std::memory_order_relaxed);
return false;
}
// The queue is empty.
if (oldbottom == oldtop) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
// Make sure that we are not contending for the item.
if (newbottom == oldtop) {
auto ret = _values[newbottom % capacity].load(std::memory_order_relaxed);
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
_bottom.fetch_add(1, std::memory_order_release);
return false;
}
else {
result = ret;
_bottom.store(newtop, std::memory_order_release);
return true;
}
}
// It''s uncontended.
result = _values[newbottom % capacity].load(std::memory_order_acquire);
return true;
}
// Multiple consumer.
inline bool Steal(T& result) {
size_t oldtop, newtop, oldbottom;
oldtop = _top.load(std::memory_order_acquire);
oldbottom = _bottom.load(std::memory_order_relaxed);
newtop = oldtop + 1;
if (oldbottom <= oldtop)
return false;
// Make sure that we are not contending for the item.
if (!_top.compare_exchange_strong(oldtop, newtop, std::memory_order_acquire)) {
return false;
}
result = _values[oldtop % capacity].load(std::memory_order_relaxed);
return true;
}
private:
// Circular array
std::atomic<T> _values[capacity];
std::atomic<size_t> _top; // queue
std::atomic<size_t> _bottom; // stack
};
Full Gist (incluidas pruebas unitarias). Solo he ejecutado las pruebas en una arquitectura sólida (x86 / 64), por lo que, en lo que respecta a las arquitecturas débiles, su kilometraje puede variar si intenta usar esto en, por ejemplo, Neon / PPC.
OpenMP puede muy bien apoyar el robo de trabajo, aunque se llama paralelismo recursivo
Publicación en el foro de OpenMP
La especificación OpenMP define construcciones de tareas (que se pueden anidar, por lo que son muy adecuadas para el paralelismo recursivo) pero no especifica los detalles de cómo se implementan. Las implementaciones de OpenMP, incluido gcc, suelen utilizar algún tipo de robo de tareas, aunque el algoritmo exacto (y el rendimiento resultante) pueden variar.
Ver #pragma omp task
y #pragma omp taskwait
Actualizar
El Capítulo 9 del libro C ++ Concurrency in Action describe cómo implementar "robo de trabajo para hilos de grupo". No lo he leído / implementado, pero no parece demasiado difícil.