c++ - Grupo de hilos usando boost asio
threadpool boost-asio (1)
Estoy tratando de crear una clase de grupo de subprocesos limitada usando boost :: asio. Pero estoy atascado en un momento puede alguien ayudarme.
¿El único problema es el lugar donde debería disminuir el contador?
El código no funciona como se esperaba.
el problema es que no sé cuándo terminará la ejecución de mi hilo y cómo llegaré a saber que ha vuelto a la agrupación
#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>
using namespace std;
using namespace boost;
class ThreadPool
{
static int count;
int NoOfThread;
thread_group grp;
mutex mutex_;
asio::io_service io_service;
int counter;
stack<thread*> thStk ;
public:
ThreadPool(int num)
{
NoOfThread = num;
counter = 0;
mutex::scoped_lock lock(mutex_);
if(count == 0)
count++;
else
return;
for(int i=0 ; i<num ; ++i)
{
thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
}
}
~ThreadPool()
{
io_service.stop();
grp.join_all();
}
thread* getThread()
{
if(counter > NoOfThread)
{
cout<<"run out of threads /n";
return NULL;
}
counter++;
thread* ptr = thStk.top();
thStk.pop();
return ptr;
}
};
int ThreadPool::count = 0;
struct callable
{
void operator()()
{
cout<<"some task for thread /n";
}
};
int main( int argc, char * argv[] )
{
callable x;
ThreadPool pool(10);
thread* p = pool.getThread();
cout<<p->get_id();
//how i can assign some function to thread pointer ?
//how i can return thread pointer after work done so i can add
//it back to stack?
return 0;
}
En resumen, necesita envolver la tarea provista por el usuario con otra función que:
- Invoque la función de usuario o el objeto llamable.
- Bloquea el mutex y decrementa el contador.
Puede que no esté entendiendo todos los requisitos para este grupo de subprocesos. Por lo tanto, para mayor claridad, aquí hay una lista explícita de lo que creo son los requisitos:
- La agrupación gestiona la vida útil de los hilos. El usuario no debe poder eliminar subprocesos que residen dentro del grupo.
- El usuario puede asignar una tarea al grupo de forma no intrusiva.
- Cuando se asigna una tarea, si todos los subprocesos de la agrupación ejecutan actualmente otras tareas, la tarea se descarta.
Antes de proporcionar una implementación, hay algunos puntos clave que me gustaría destacar:
- Una vez que se ha iniciado un subproceso, se ejecutará hasta que se complete, cancele o finalice. La función que está ejecutando el hilo no puede ser reasignada. Para permitir que un solo hilo ejecute múltiples funciones a lo largo de su vida útil, el hilo deseará
io_service::run()
con una función que se leerá de una cola, comoio_service::run()
, y se publicarán tipos invocables en el evento cola, como desdeio_service::post()
. -
io_service::run()
devuelve si no hay trabajo pendiente en elio_service
, elio_service
se detiene, o se lanza una excepción desde un controlador que el hilo estaba ejecutando. Para evitar queio_serivce::run()
regrese cuando no hay trabajo pendiente, se puede usar la claseio_service::work
. - La definición de los requisitos de tipo de tarea (es decir, el tipo de tarea debe poder llamarse mediante la sintaxis de
object()
) en lugar de requerir un tipo (es decir, la tarea debe heredar delprocess
), proporciona más flexibilidad al usuario. Permite al usuario proporcionar una tarea como un puntero a función o un tipo que proporciona unoperator()
nulooperator()
.
Implementación usando boost::asio
:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
boost::asio::io_service io_service_;
boost::asio::io_service::work work_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: work_( io_service_ ),
available_( pool_size )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &boost::asio::io_service::run,
&io_service_ ) );
}
}
/// @brief Destructor.
~thread_pool()
{
// Force all threads to return from io_service::run().
io_service_.stop();
// Suppress all exceptions.
try
{
threads_.join_all();
}
catch ( const std::exception& ) {}
}
/// @brief Adds a task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Post a wrapped task into the queue.
io_service_.post( boost::bind( &thread_pool::wrap_task, this,
boost::function< void() >( task ) ) );
}
private:
/// @brief Wrap a task so that the available count can be increased once
/// the user provided task has completed.
void wrap_task( boost::function< void() > task )
{
// Run the user supplied task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
// Task has finished, so increment count of available threads.
boost::unique_lock< boost::mutex > lock( mutex_ );
++available_;
}
};
Algunos comentarios sobre la implementación:
- El manejo de excepciones debe ocurrir alrededor de la tarea del usuario. Si la función del usuario o el objeto invocable produce una excepción que no es de tipo
boost::thread_interrupted
, seboost::thread_interrupted
std::terminate()
. Este es el resultado de las excepciones de Boost.Thread en el comportamiento de las funciones de los hilos . También vale la pena leer el efecto de Boost.Asio de las excepciones lanzadas por los manejadores . - Si el usuario proporciona la
task
través deboost::bind
, entonces elboost::bind
anidadoboost::bind
no se compilará. Se requiere una de las siguientes opciones:- No admite
task
creadas porboost::bind
. - Programación para realizar la bifurcación en tiempo de compilación según el tipo de usuario, si el resultado de
boost::bind
para que se pueda usarboost::protect
, ya queboost::protect
solo funciona correctamente en ciertos objetos de función. - Utilice otro tipo para pasar el objeto de
task
indirectamente. Opté por usarboost::function
para facilitar la lectura a costa de perder el tipo exacto.boost::tuple
, aunque un poco menos legible, también podría usarse para preservar el tipo exacto, como se ve en el ejemplo de serialization de Boost.Asio.
- No admite
El código de la aplicación ahora puede usar el tipo thread_pool
forma no intrusiva:
void work() {};
struct worker
{
void operator()() {};
};
void more_work( int ) {};
int main()
{
thread_pool pool( 2 );
pool.run_task( work ); // Function pointer.
pool.run_task( worker() ); // Callable object.
pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}
El thread_pool
se puede crear sin Boost.Asio, y puede ser un poco más fácil para los mantenedores, ya que ya no necesitan saber sobre los comportamientos de Boost.Asio
, como cuándo vuelve io_service::run()
, y qué es io_service::work
objeto de io_service::work
:
#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
class thread_pool
{
private:
std::queue< boost::function< void() > > tasks_;
boost::thread_group threads_;
std::size_t available_;
boost::mutex mutex_;
boost::condition_variable condition_;
bool running_;
public:
/// @brief Constructor.
thread_pool( std::size_t pool_size )
: available_( pool_size ),
running_( true )
{
for ( std::size_t i = 0; i < pool_size; ++i )
{
threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
}
}
/// @brief Destructor.
~thread_pool()
{
// Set running flag to false then notify all threads.
{
boost::unique_lock< boost::mutex > lock( mutex_ );
running_ = false;
condition_.notify_all();
}
try
{
threads_.join_all();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
/// @brief Add task to the thread pool if a thread is currently available.
template < typename Task >
void run_task( Task task )
{
boost::unique_lock< boost::mutex > lock( mutex_ );
// If no threads are available, then return.
if ( 0 == available_ ) return;
// Decrement count, indicating thread is no longer available.
--available_;
// Set task and signal condition variable so that a worker thread will
// wake up andl use the task.
tasks_.push( boost::function< void() >( task ) );
condition_.notify_one();
}
private:
/// @brief Entry point for pool threads.
void pool_main()
{
while( running_ )
{
// Wait on condition variable while the task is empty and the pool is
// still running.
boost::unique_lock< boost::mutex > lock( mutex_ );
while ( tasks_.empty() && running_ )
{
condition_.wait( lock );
}
// If pool is no longer running, break out.
if ( !running_ ) break;
// Copy task locally and remove from the queue. This is done within
// its own scope so that the task object is destructed immediately
// after running the task. This is useful in the event that the
// function contains shared_ptr arguments bound via bind.
{
boost::function< void() > task = tasks_.front();
tasks_.pop();
lock.unlock();
// Run the task.
try
{
task();
}
// Suppress all exceptions.
catch ( const std::exception& ) {}
}
// Task has finished, so increment count of available threads.
lock.lock();
++available_;
} // while running_
}
};