guia - Agrupación de subprocesos en C++ 11
qgis manual (8)
Algo como esto podría ayudar (tomado de una aplicación que funciona).
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
struct thread_pool {
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
auto worker = [this] { return service.run(); };
grp.add_thread(new boost::thread(worker));
}
}
template<class F>
void enqueue(F f) {
service.post(f);
}
~thread_pool() {
service_worker.reset();
grp.join_all();
service.stop();
}
private:
boost::asio::io_service service;
asio_worker service_worker;
boost::thread_group grp;
};
Puedes usarlo así:
thread_pool pool(2);
pool.enqueue([] {
std::cout << "Hello from Task 1/n";
});
pool.enqueue([] {
std::cout << "Hello from Task 2/n";
});
Tenga en cuenta que la reinvención de un mecanismo de colas asíncronas eficiente no es trivial.
Boost :: asio :: io_service es una implementación muy eficiente, o en realidad es una colección de envoltorios específicos de la plataforma (por ejemplo, envuelve los puertos de finalización de E / S en Windows).
Preguntas relevantes :
Acerca de C ++ 11:
- C ++ 11: std :: thread mancomunado?
- ¿Será asincrónico (launch :: async) en C ++ 11 que los pools de hilos se vuelvan obsoletos para evitar la costosa creación de hilos?
Acerca de Boost:
¿Cómo puedo obtener un conjunto de hilos para enviar tareas , sin crear y eliminarlos una y otra vez? Esto significa hilos persistentes para volver a sincronizar sin unirse.
Tengo un código que se ve así:
namespace {
std::vector<std::thread> workers;
int total = 4;
int arr[4] = {0};
void each_thread_does(int i) {
arr[i] += 2;
}
}
int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}
En lugar de crear y unir hilos en cada iteración, preferiría enviar tareas a mis hilos de trabajo en cada iteración y solo crearlas una vez.
Esta es otra implementación de grupo de subprocesos que es muy simple, fácil de entender y usar, usa solo la biblioteca estándar de C ++ 11, y puede ser vista o modificada para sus usos, debe ser un buen iniciador si desea ingresar usando el hilo quinielas:
Esto se copia de mi respuesta a otra publicación muy similar, espero que pueda ayudar:
1) Comience con la cantidad máxima de hilos que un sistema puede admitir:
int Num_Threads = thread::hardware_concurrency();
2) Para una implementación eficiente de subprocesos, una vez que los subprocesos se crean de acuerdo con Num_Threads, es mejor no crear nuevos, o destruir los antiguos (uniéndose). Habrá una penalización de rendimiento, incluso podría hacer que su aplicación sea más lenta que la versión en serie.
Cada subproceso C ++ 11 debe ejecutarse en su función con un bucle infinito, esperando constantemente nuevas tareas para capturar y ejecutar.
Aquí se explica cómo adjuntar dicha función al grupo de subprocesos:
int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{ Pool.push_back(thread(Infinite_loop_function));}
3) La función Infinite_loop_
Este es un bucle "while (true)" esperando la cola de tareas
void The_Pool:: Infinite_loop_function()
{
while(true)
{
{
unique_lock<mutex> lock(Queue_Mutex);
condition.wait(lock, []{return !Queue.empty()});
Job = Queue.front();
Queue.pop();
}
Job(); // function<void()> type
}
};
4) Realice una función para agregar trabajo a su cola
void The_Pool:: Add_Job(function<void()> New_Job)
{
{
unique_lock<mutex> lock(Queue_Mutex);
Queue.push(New_Job);
}
condition.notify_one();
}
5) Vincula una función arbitraria a tu cola
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
Una vez que integra estos ingredientes, tiene su propio grupo de threading dinámico. Estos subprocesos siempre se ejecutan, esperando la realización de un trabajo.
Me disculpo si hay algunos errores de sintaxis, escribí estos códigos y tengo mala memoria. Lamento que no pueda proporcionarle el código completo del grupo de subprocesos, que violaría la integridad de mi trabajo.
Puede utilizar la biblioteca de grupos de subprocesos de C ++, https://github.com/vit-vit/ctpl .
Entonces, el código que escribió puede reemplazarse por el siguiente
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}
Obtendrá el número deseado de hilos y no los creará y eliminará una y otra vez en las iteraciones.
Un conjunto de subprocesos significa que todos los subprocesos se están ejecutando, todo el tiempo; en otras palabras, la función de subproceso nunca vuelve. Para que los hilos tengan algo significativo que hacer, debe diseñar un sistema de comunicación entre hilos, tanto para indicar al hilo que hay algo que hacer como para comunicar los datos de trabajo reales.
Por lo general, esto implicará algún tipo de estructura de datos concurrente, y cada hilo probablemente dormirá en algún tipo de variable de condición, que se notificará cuando haya trabajo por hacer. Al recibir la notificación, uno o varios de los subprocesos se activan, recuperan una tarea de la estructura de datos concurrente, la procesan y almacenan el resultado de forma análoga.
Luego, el hilo continuará para verificar si aún hay más trabajo por hacer y, si no, volver a dormir.
El resultado es que tiene que diseñar todo esto usted mismo, ya que no existe una noción natural de "trabajo" que sea universalmente aplicable. Es bastante trabajo, y hay algunos problemas sutiles que debes resolver. (Puede programar en Go si le gusta un sistema que se encargue de la administración de hilos detrás de escena).
Un threadpool está en el núcleo de un conjunto de hilos, todos vinculados a una función que funciona como un bucle de eventos. Estos hilos esperarán interminablemente para que se ejecute una tarea o su propia terminación.
El trabajo de subprocesos es proporcionar una interfaz para enviar trabajos, definir (y quizás modificar) la política de ejecución de estos trabajos (reglas de programación, creación de instancias de subprocesos, tamaño del grupo) y supervisar el estado de los subprocesos y los recursos relacionados.
Entonces, para un conjunto versátil, uno debe comenzar por definir qué es una tarea, cómo se inicia, se interrumpe, cuál es el resultado (vea la noción de promesa y futuro para esa pregunta), qué tipo de eventos tendrán que responder los hilos. a, cómo los manejarán, cómo estos eventos serán discriminados de los manejados por las tareas. Esto puede volverse bastante complicado, como se puede ver, e imponer restricciones sobre cómo funcionarán los hilos, a medida que la solución se involucra cada vez más.
Las herramientas actuales para manejar eventos son bastante barebones (*): primitivas como mutexes, variables de condición y algunas abstracciones además de eso (bloqueos, barreras). Pero en algunos casos, estas abstracciones pueden resultar no aptas (vea esta question relacionada), y uno debe volver a usar las primitivas.
Otros problemas tienen que ser gestionados también:
- señal
- E / S
- hardware (afinidad del procesador, configuración heterogénea)
¿Cómo se desarrollarían estos en tu entorno?
Esta respuesta a una pregunta similar apunta a una implementación existente destinada a impulsar y al stl.
Ofrecí una implementación muy cruda de un hilo para otra pregunta, que no aborda muchos de los problemas descritos anteriormente. Es posible que desee construir sobre él. Es posible que también desee echar un vistazo a los marcos existentes en otros idiomas para encontrar inspiración.
(*) No veo eso como un problema, sino todo lo contrario. Creo que es el verdadero espíritu de C ++ heredado de C.
Un threadpool sin dependencias fuera de STL es completamente posible. Hace poco escribí una pequeña biblioteca de subprocesos de solo subprocesos para abordar exactamente el mismo problema. Es compatible con el redimensionamiento dinámico de la agrupación (cambiando el número de trabajadores en tiempo de ejecución), esperando, deteniendo, pausando, reanudando, etc. Espero que le sea útil.
Editar: Esto ahora requiere C ++ 17 y conceptos. (Desde el 12/09/16, solo g ++ 6.0+ es suficiente).
Sin embargo, la deducción de la plantilla es mucho más precisa debido a eso, por lo que vale la pena el esfuerzo de obtener un compilador más nuevo. Todavía no he encontrado una función que requiera argumentos de plantilla explícitos.
Ahora también toma cualquier objeto invocable apropiado (¡ y sigue siendo estático en cuanto a tipo de letra! ).
También ahora incluye un grupo de subprocesos de prioridad de subprocesamiento verde opcional que utiliza la misma API. Esta clase es solo POSIX, sin embargo. Utiliza la API ucontext_t
para el cambio de tareas del espacio de usuario.
Creé una biblioteca simple para esto. Un ejemplo de uso se da a continuación. (Estoy respondiendo esto porque era una de las cosas que encontré antes de decidir que era necesario escribirlo yo mismo).
bool is_prime(int n){
// Determine if n is prime.
}
int main(){
thread_pool pool(8); // 8 threads
list<future<bool>> results;
for(int n = 2;n < 10000;n++){
// Submit a job to the pool.
results.emplace_back(pool.async(is_prime, n));
}
int n = 2;
for(auto i = results.begin();i != results.end();i++, n++){
// i is an iterator pointing to a future representing the result of is_prime(n)
cout << n << " ";
bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
if(prime)
cout << "is prime";
else
cout << "is not prime";
cout << endl;
}
}
Puede pasar cualquier función async
con cualquier valor de retorno (o nulo) y cualquier (o ningún) argumento y devolverá un std::future
correspondiente. Para obtener el resultado (o simplemente espere hasta que una tarea se haya completado) llame a get()
en el futuro.
Aquí está el github: https://github.com/Tyler-Hardin/thread_pool .