c++ threadpool boost-asio

c++ - Grupo de hilos usando boost asio



threadpool boost-asio (1)

Estoy tratando de crear una clase de grupo de subprocesos limitada usando boost :: asio. Pero estoy atascado en un momento puede alguien ayudarme.

¿El único problema es el lugar donde debería disminuir el contador?

El código no funciona como se esperaba.

el problema es que no sé cuándo terminará la ejecución de mi hilo y cómo llegaré a saber que ha vuelto a la agrupación

#include <boost/asio.hpp> #include <iostream> #include <boost/thread/thread.hpp> #include <boost/bind.hpp> #include <boost/thread/mutex.hpp> #include <stack> using namespace std; using namespace boost; class ThreadPool { static int count; int NoOfThread; thread_group grp; mutex mutex_; asio::io_service io_service; int counter; stack<thread*> thStk ; public: ThreadPool(int num) { NoOfThread = num; counter = 0; mutex::scoped_lock lock(mutex_); if(count == 0) count++; else return; for(int i=0 ; i<num ; ++i) { thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service))); } } ~ThreadPool() { io_service.stop(); grp.join_all(); } thread* getThread() { if(counter > NoOfThread) { cout<<"run out of threads /n"; return NULL; } counter++; thread* ptr = thStk.top(); thStk.pop(); return ptr; } }; int ThreadPool::count = 0; struct callable { void operator()() { cout<<"some task for thread /n"; } }; int main( int argc, char * argv[] ) { callable x; ThreadPool pool(10); thread* p = pool.getThread(); cout<<p->get_id(); //how i can assign some function to thread pointer ? //how i can return thread pointer after work done so i can add //it back to stack? return 0; }


En resumen, necesita envolver la tarea provista por el usuario con otra función que:

  • Invoque la función de usuario o el objeto llamable.
  • Bloquea el mutex y decrementa el contador.

Puede que no esté entendiendo todos los requisitos para este grupo de subprocesos. Por lo tanto, para mayor claridad, aquí hay una lista explícita de lo que creo son los requisitos:

  • La agrupación gestiona la vida útil de los hilos. El usuario no debe poder eliminar subprocesos que residen dentro del grupo.
  • El usuario puede asignar una tarea al grupo de forma no intrusiva.
  • Cuando se asigna una tarea, si todos los subprocesos de la agrupación ejecutan actualmente otras tareas, la tarea se descarta.

Antes de proporcionar una implementación, hay algunos puntos clave que me gustaría destacar:

  • Una vez que se ha iniciado un subproceso, se ejecutará hasta que se complete, cancele o finalice. La función que está ejecutando el hilo no puede ser reasignada. Para permitir que un solo hilo ejecute múltiples funciones a lo largo de su vida útil, el hilo deseará io_service::run() con una función que se leerá de una cola, como io_service::run() , y se publicarán tipos invocables en el evento cola, como desde io_service::post() .
  • io_service::run() devuelve si no hay trabajo pendiente en el io_service , el io_service se detiene, o se lanza una excepción desde un controlador que el hilo estaba ejecutando. Para evitar que io_serivce::run() regrese cuando no hay trabajo pendiente, se puede usar la clase io_service::work .
  • La definición de los requisitos de tipo de tarea (es decir, el tipo de tarea debe poder llamarse mediante la sintaxis de object() ) en lugar de requerir un tipo (es decir, la tarea debe heredar del process ), proporciona más flexibilidad al usuario. Permite al usuario proporcionar una tarea como un puntero a función o un tipo que proporciona un operator() nulo operator() .

Implementación usando boost::asio :

#include <boost/asio.hpp> #include <boost/thread.hpp> class thread_pool { private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : work_( io_service_ ), available_( pool_size ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } /// @brief Destructor. ~thread_pool() { // Force all threads to return from io_service::run(). io_service_.stop(); // Suppress all exceptions. try { threads_.join_all(); } catch ( const std::exception& ) {} } /// @brief Adds a task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Post a wrapped task into the queue. io_service_.post( boost::bind( &thread_pool::wrap_task, this, boost::function< void() >( task ) ) ); } private: /// @brief Wrap a task so that the available count can be increased once /// the user provided task has completed. void wrap_task( boost::function< void() > task ) { // Run the user supplied task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} // Task has finished, so increment count of available threads. boost::unique_lock< boost::mutex > lock( mutex_ ); ++available_; } };

Algunos comentarios sobre la implementación:

  • El manejo de excepciones debe ocurrir alrededor de la tarea del usuario. Si la función del usuario o el objeto invocable produce una excepción que no es de tipo boost::thread_interrupted , se boost::thread_interrupted std::terminate() . Este es el resultado de las excepciones de Boost.Thread en el comportamiento de las funciones de los hilos . También vale la pena leer el efecto de Boost.Asio de las excepciones lanzadas por los manejadores .
  • Si el usuario proporciona la task través de boost::bind , entonces el boost::bind anidado boost::bind no se compilará. Se requiere una de las siguientes opciones:
    • No admite task creadas por boost::bind .
    • Programación para realizar la bifurcación en tiempo de compilación según el tipo de usuario, si el resultado de boost::bind para que se pueda usar boost::protect , ya que boost::protect solo funciona correctamente en ciertos objetos de función.
    • Utilice otro tipo para pasar el objeto de task indirectamente. Opté por usar boost::function para facilitar la lectura a costa de perder el tipo exacto. boost::tuple , aunque un poco menos legible, también podría usarse para preservar el tipo exacto, como se ve en el ejemplo de serialization de Boost.Asio.

El código de la aplicación ahora puede usar el tipo thread_pool forma no intrusiva:

void work() {}; struct worker { void operator()() {}; }; void more_work( int ) {}; int main() { thread_pool pool( 2 ); pool.run_task( work ); // Function pointer. pool.run_task( worker() ); // Callable object. pool.run_task( boost::bind( more_work, 5 ) ); // Callable object. }

El thread_pool se puede crear sin Boost.Asio, y puede ser un poco más fácil para los mantenedores, ya que ya no necesitan saber sobre los comportamientos de Boost.Asio , como cuándo vuelve io_service::run() , y qué es io_service::work objeto de io_service::work :

#include <queue> #include <boost/bind.hpp> #include <boost/thread.hpp> class thread_pool { private: std::queue< boost::function< void() > > tasks_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; boost::condition_variable condition_; bool running_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : available_( pool_size ), running_( true ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ; } } /// @brief Destructor. ~thread_pool() { // Set running flag to false then notify all threads. { boost::unique_lock< boost::mutex > lock( mutex_ ); running_ = false; condition_.notify_all(); } try { threads_.join_all(); } // Suppress all exceptions. catch ( const std::exception& ) {} } /// @brief Add task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Set task and signal condition variable so that a worker thread will // wake up andl use the task. tasks_.push( boost::function< void() >( task ) ); condition_.notify_one(); } private: /// @brief Entry point for pool threads. void pool_main() { while( running_ ) { // Wait on condition variable while the task is empty and the pool is // still running. boost::unique_lock< boost::mutex > lock( mutex_ ); while ( tasks_.empty() && running_ ) { condition_.wait( lock ); } // If pool is no longer running, break out. if ( !running_ ) break; // Copy task locally and remove from the queue. This is done within // its own scope so that the task object is destructed immediately // after running the task. This is useful in the event that the // function contains shared_ptr arguments bound via bind. { boost::function< void() > task = tasks_.front(); tasks_.pop(); lock.unlock(); // Run the task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} } // Task has finished, so increment count of available threads. lock.lock(); ++available_; } // while running_ } };