thread pthread ejemplo c++ linux multithreading pthreads task

pthread - ¿Cómo crear un eficiente planificador de tareas de múltiples subprocesos en C++?



pthreads php (5)

Me gustaría crear un sistema de planificación de tareas muy eficiente en C ++.

La idea básica es esta:

class Task { public: virtual void run() = 0; }; class Scheduler { public: void add(Task &task, double delayToRun); };

Detrás del Scheduler , debe haber un grupo de subprocesos de tamaño fijo, que ejecute las tareas (no quiero crear un subproceso para cada tarea). delayToRun significa que la task no se ejecuta de inmediato, pero delayToRun segundos después (midiendo desde el punto en que se agregó al Scheduler ).

( delayToRun significa un valor "al menos", por supuesto. Si el sistema está cargado, o si pedimos lo imposible al Programador, no podrá manejar nuestra solicitud. Pero debería hacer lo mejor que pueda)

Y aquí está mi problema. ¿Cómo implementar la funcionalidad delayToRun manera eficiente? Estoy tratando de resolver este problema con el uso de mutexes y variables de condición.

Veo dos maneras:

Con hilo gestor

El programador contiene dos colas: allTasksQueue y tasksReadyToRunQueue . Una tarea se agrega a allTasksQueue en Scheduler::add . Hay un subproceso de administrador, que espera el menor tiempo posible para que pueda poner una tarea de allTasksQueue a tasksReadyToRunQueue . Los subprocesos de trabajo esperan una tarea disponible en tasksReadyToRunQueue .

Si Scheduler::add agrega una tarea delante de allTasksQueue (una tarea, que tiene un valor de delayToRun por lo que debe ir antes que la tarea actual más delayToRun de ejecutar), entonces la tarea del administrador debe delayToRun para que delayToRun actualizar el tiempo de espera.

Este método puede considerarse ineficiente, ya que necesita dos colas, y necesita dos condvar.signals para hacer que una tarea se ejecute (una para allTasksQueue -> tasksReadyToRunQueue , y otra para indicar a un subproceso de trabajo que realmente ejecute la tarea)

Sin hilo gestor

Hay una cola en el programador. Una tarea se agrega a esta cola en Scheduler::add . Un hilo de trabajo comprueba la cola. Si está vacío, espera sin una restricción de tiempo. Si no está vacío, espera la tarea más rápida.

  1. Si solo hay una variable de condición para la cual esperan los subprocesos de trabajo: este método puede considerarse ineficaz, porque si una tarea se agregó al frente de la cola (frente significa, si hay N subprocesos de trabajo, entonces el índice de tarea <N) luego, todos los subprocesos de trabajo deben activarse para actualizar el tiempo que están esperando.

  2. Si hay una variable de condición separada para cada hilo, entonces podemos controlar qué hilo reactivar, por lo que en este caso no necesitamos reactivar todos los hilos (solo necesitamos despertar el hilo que tiene el mayor tiempo de espera). , por lo que necesitamos gestionar este valor). Actualmente estoy pensando en implementar esto, pero trabajar los detalles exactos es complejo. ¿Hay recomendaciones / pensamientos / documento sobre este método?

¿Hay alguna solución mejor para este problema? Estoy tratando de usar las funciones estándar de C ++, pero estoy dispuesto a usar herramientas dependientes de la plataforma (mi plataforma principal es Linux) (como pthreads), o incluso herramientas específicas de Linux (como futexes), si ofrecen una mejor solución.


Código del núcleo para C ++ 11:

#include <thread> #include <queue> #include <chrono> #include <mutex> #include <atomic> using namespace std::chrono; using namespace std; class Task { public: virtual void run() = 0; }; template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>> class SchedulerItem { public: T task; time_point<steady_clock> startTime; int delay; SchedulerItem(T t, time_point<steady_clock> s, int d) : task(t), startTime(s), delay(d){} }; template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>> class Scheduler { public: queue<SchedulerItem<T>> pool; mutex mtx; atomic<bool> running; Scheduler() : running(false){} void add(T task, double delayMsToRun) { lock_guard<mutex> lock(mtx); pool.push(SchedulerItem<T>(task, high_resolution_clock::now(), delayMsToRun)); if (running == false) runNext(); } void runNext(void) { running = true; auto th = [this]() { mtx.lock(); auto item = pool.front(); pool.pop(); mtx.unlock(); auto remaining = (item.startTime + milliseconds(item.delay)) - high_resolution_clock::now(); if(remaining.count() > 0) this_thread::sleep_for(remaining); item.task.run(); if(pool.size() > 0) runNext(); else running = false; }; thread t(th); t.detach(); } };

Código de prueba:

class MyTask : Task { public: virtual void run() override { printf("mytask /n"); }; }; int main() { Scheduler<MyTask> s; s.add(MyTask(), 0); s.add(MyTask(), 2000); s.add(MyTask(), 2500); s.add(MyTask(), 6000); std::this_thread::sleep_for(std::chrono::seconds(10)); }


Esta es una implementación de ejemplo para la interfaz que proporcionó que se acerca más a su descripción de '' Con el hilo del administrador ''.

Utiliza un solo hilo ( timer_thread ) para administrar una cola ( allTasksQueue ) que se ordena según la hora real en que se debe iniciar una tarea ( std::chrono::time_point ).
La ''cola'' es una std::priority_queue (que mantiene time_point sus elementos clave de time_point ).

timer_thread normalmente se suspende hasta que se inicia la siguiente tarea o cuando se agrega una nueva tarea.
Cuando una tarea está a punto de ejecutarse, se coloca en tasksReadyToRunQueue , uno de los subprocesos de trabajo se señaliza, se activa, lo elimina de la cola y comienza a procesar la tarea.

Tenga en cuenta que el grupo de subprocesos tiene un límite superior de tiempo de compilación para el número de subprocesos (40). Si está programando más tareas de las que se pueden enviar a los trabajadores, la nueva tarea se bloqueará hasta que los subprocesos estén disponibles nuevamente.

Usted dijo que este enfoque no es eficiente, pero en general, me parece razonablemente eficiente. Todo está controlado por eventos y no está perdiendo ciclos de CPU por giros innecesarios. Por supuesto, es solo un ejemplo, las optimizaciones son posibles (nota: std::multimap ha sido reemplazado por std::priority_queue ).

La implementación es compatible con C ++ 11

#include <iostream> #include <chrono> #include <queue> #include <unistd.h> #include <vector> #include <thread> #include <condition_variable> #include <mutex> #include <memory> class Task { public: virtual void run() = 0; virtual ~Task() { } }; class Scheduler { public: Scheduler(); ~Scheduler(); void add(Task &task, double delayToRun); private: using timepoint = std::chrono::time_point<std::chrono::steady_clock>; struct key { timepoint tp; Task *taskp; }; struct TScomp { bool operator()(const key &a, const key &b) const { return a.tp > b.tp; } }; const int ThreadPoolSize = 40; std::vector<std::thread> ThreadPool; std::vector<Task *> tasksReadyToRunQueue; std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue; std::thread TimerThr; std::mutex TimerMtx, WorkerMtx; std::condition_variable TimerCV, WorkerCV; bool WorkerIsRunning = true; bool TimerIsRunning = true; void worker_thread(); void timer_thread(); }; Scheduler::Scheduler() { for (int i = 0; i <ThreadPoolSize; ++i) ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this)); TimerThr = std::thread(&Scheduler::timer_thread, this); } Scheduler::~Scheduler() { { std::lock_guard<std::mutex> lck{TimerMtx}; TimerIsRunning = false; TimerCV.notify_one(); } TimerThr.join(); { std::lock_guard<std::mutex> lck{WorkerMtx}; WorkerIsRunning = false; WorkerCV.notify_all(); } for (auto &t : ThreadPool) t.join(); } void Scheduler::add(Task &task, double delayToRun) { auto now = std::chrono::steady_clock::now(); long delay_ms = delayToRun * 1000; std::chrono::milliseconds duration (delay_ms); timepoint tp = now + duration; if (now >= tp) { /* * This is a short-cut * When time is due, the task is directly dispatched to the workers */ std::lock_guard<std::mutex> lck{WorkerMtx}; tasksReadyToRunQueue.push_back(&task); WorkerCV.notify_one(); } else { std::lock_guard<std::mutex> lck{TimerMtx}; allTasksQueue.push({tp, &task}); TimerCV.notify_one(); } } void Scheduler::worker_thread() { for (;;) { std::unique_lock<std::mutex> lck{WorkerMtx}; WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 || !WorkerIsRunning; } ); if (!WorkerIsRunning) break; Task *p = tasksReadyToRunQueue.back(); tasksReadyToRunQueue.pop_back(); lck.unlock(); p->run(); delete p; // delete Task } } void Scheduler::timer_thread() { for (;;) { std::unique_lock<std::mutex> lck{TimerMtx}; if (!TimerIsRunning) break; auto duration = std::chrono::nanoseconds(1000000000); if (allTasksQueue.size() != 0) { auto now = std::chrono::steady_clock::now(); auto head = allTasksQueue.top(); Task *p = head.taskp; duration = head.tp - now; if (now >= head.tp) { /* * A Task is due, pass to worker threads */ std::unique_lock<std::mutex> ulck{WorkerMtx}; tasksReadyToRunQueue.push_back(p); WorkerCV.notify_one(); ulck.unlock(); allTasksQueue.pop(); } } TimerCV.wait_for(lck, duration); } } /* * End sample implementation */ class DemoTask : public Task { int n; public: DemoTask(int n=0) : n{n} { } void run() override { std::cout << "Start task " << n << std::endl;; std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << " Stop task " << n << std::endl;; } }; int main() { Scheduler sched; Task *t0 = new DemoTask{0}; Task *t1 = new DemoTask{1}; Task *t2 = new DemoTask{2}; Task *t3 = new DemoTask{3}; Task *t4 = new DemoTask{4}; Task *t5 = new DemoTask{5}; sched.add(*t0, 7.313); sched.add(*t1, 2.213); sched.add(*t2, 0.713); sched.add(*t3, 1.243); sched.add(*t4, 0.913); sched.add(*t5, 3.313); std::this_thread::sleep_for(std::chrono::seconds(10)); }


Esto significa que desea ejecutar todas las tareas continuamente utilizando algún orden.

Puede crear algún tipo de ordenado por una pila de demora (o incluso una lista vinculada) de tareas. Cuando venga una nueva tarea, debe insertarla en la posición en función de un tiempo de demora (solo calcule la posición de manera eficiente e inserte la nueva tarea de manera eficiente).

Ejecute todas las tareas comenzando con el jefe de la pila de tareas (o lista).


Puede evitar tener un subproceso de "administrador" separado y tener que activar una gran cantidad de tareas cuando cambie la tarea de próxima ejecución, mediante el uso de un diseño en el que un único hilo de grupo espera la tarea de "próxima a la ejecución" (si hay uno) en una variable de condición, y los subprocesos de grupo restantes esperan indefinidamente en una segunda variable de condición.

Los hilos de la agrupación ejecutarán pseudocódigo a lo largo de estas líneas:

pthread_mutex_lock(&queue_lock); while (running) { if (head task is ready to run) { dequeue head task; if (task_thread == 1) pthread_cond_signal(&task_cv); else pthread_cond_signal(&queue_cv); pthread_mutex_unlock(&queue_lock); run dequeued task; pthread_mutex_lock(&queue_lock); } else if (!queue_empty && task_thread == 0) { task_thread = 1; pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run); task_thread = 0; } else { pthread_cond_wait(&queue_cv, &queue_lock); } } pthread_mutex_unlock(&queue_lock);

Si cambia la siguiente tarea a ejecutar, entonces ejecuta:

if (task_thread == 1) pthread_cond_signal(&task_cv); else pthread_cond_signal(&queue_cv);

con el queue_lock retenido.

Bajo este esquema, todas las activaciones son directamente en un solo hilo, solo hay una cola de tareas prioritaria y no se requiere un hilo administrador.


Su especificación es un poco demasiado fuerte:

delayToRun significa que la tarea no se ejecuta inmediatamente, pero delayToRun segundos después

Olvidaste agregar "al menos":

  • La tarea no se ejecuta ahora, pero al menos se delayToRun segundos más tarde

El punto es que si diez mil tareas están programadas con un 0.1 delayToRun, seguramente no podrán ejecutarse prácticamente al mismo tiempo.

Con tal corrección, solo mantiene una cola (o agenda) de (tiempo de inicio programado, cierre para ejecutar), mantiene esa cola ordenada e inicia N (un número fijo) de subprocesos que hacen saltar atómicamente el primer elemento de La agenda y la ejecutamos.

luego, todos los subprocesos de trabajo deben activarse para actualizar el tiempo que están esperando.

No, algunos hilos de trabajadores serían despertados.

Leer sobre variables de condición y emisión.

También puede timer_create(2) los temporizadores POSIX, ver timer_create(2) , o el temporizador de fd específico de Linux, ver timerfd_create(2)

Probablemente evitaría ejecutar llamadas al sistema de bloqueo en sus subprocesos, y tendría un subproceso central que las gestiona usando algún bucle de eventos (ver poll(2) ...); de lo contrario, si tiene un centenar de tareas en ejecución sleep(100) y una tarea programada para ejecutarse en medio segundo, no se ejecutará antes de cien segundos.

Es posible que desee leer acerca de la programación del estilo de paso de continuación (es importante para el PCPS). Lea el artículo sobre el Paso de Continuación C por Juliusz Chroboczek.

Mira también en hilos de Qt .

También podría considerar la codificación en Go (con sus Goroutines).