c++ multithreading c++11 asynchronous stdasync

c++ - ¿Puedo usar std:: async sin esperar la futura limitación?



multithreading c++11 (3)

Nivel alto
Quiero llamar a algunas funciones sin valor de retorno en modo asíncrono sin esperar a que finalicen. Si uso std :: async, el objeto futuro no se destruye hasta que la tarea finalice, esto hace que la llamada no se sincronice en mi caso.

Ejemplo

void sendMail(const std::string& address, const std::string& message) { //sending the e-mail which takes some time... } myResonseType processRequest(args...) { //Do some processing and valuate the address and the message... //Sending the e-mail async auto f = std::async(std::launch::async, sendMail, address, message); //returning the response ASAP to the client return myResponseType; } //<-- I''m stuck here until the async call finish to allow f to be destructed. // gaining no benefit from the async call.

Mis preguntas son

  1. ¿Hay alguna manera de superar esta limitación?
  2. si (1) es no, ¿debería implementar una vez un hilo que tome esos futuros "zombies" y esperarlos?
  3. Es (1) y (2) no, ¿hay alguna otra opción, solo crear mi propio grupo de subprocesos?

Nota:
Prefiero no usar la opción de thread + detach (sugerido por @ galop1n) ya que la creación de un nuevo hilo tiene una sobrecarga que deseo evitar. Mientras usa std :: async (al menos en MSVC) está usando un grupo de subprocesos interno.

Gracias.


¿Por qué no solo comienzas un hilo y te desconectas si no te importa unirte?

std::thread{ sendMail, address, message}.detach();

std :: async está ligado a la vida útil de std :: future y no hay alternativa a eso.

Poner el std :: future en una cola de espera leída por otro hilo requerirá el mismo mecanismo de seguridad que un grupo que recibe nuevas tareas, como mutex alrededor del contenedor.

Su mejor opción, entonces, es un grupo de subprocesos para consumir tareas directamente insertadas en una cola segura de subprocesos. Y no dependerá de una implementación específica.

Debajo de una implementación del grupo de subprocesos que toma cualquier argumento y llamable, los hilos hacen poling en la cola, una implementación mejor debe usar variables de condición ( coliru ):

#include <iostream> #include <queue> #include <memory> #include <thread> #include <mutex> #include <functional> #include <string> struct ThreadPool { struct Task { virtual void Run() const = 0; virtual ~Task() {}; }; template < typename task_, typename... args_ > struct RealTask : public Task { RealTask( task_&& task, args_&&... args ) : fun_( std::bind( std::forward<task_>(task), std::forward<args_>(args)... ) ) {} void Run() const override { fun_(); } private: decltype( std::bind(std::declval<task_>(), std::declval<args_>()... ) ) fun_; }; template < typename task_, typename... args_ > void AddTask( task_&& task, args_&&... args ) { auto lock = std::unique_lock<std::mutex>{mtx_}; using FinalTask = RealTask<task_, args_... >; q_.push( std::unique_ptr<Task>( new FinalTask( std::forward<task_>(task), std::forward<args_>(args)... ) ) ); } ThreadPool() { for( auto & t : pool_ ) t = std::thread( [=] { while ( true ) { std::unique_ptr<Task> task; { auto lock = std::unique_lock<std::mutex>{mtx_}; if ( q_.empty() && stop_ ) break; if ( q_.empty() ) continue; task = std::move(q_.front()); q_.pop(); } if (task) task->Run(); } } ); } ~ThreadPool() { { auto lock = std::unique_lock<std::mutex>{mtx_}; stop_ = true; } for( auto & t : pool_ ) t.join(); } private: std::queue<std::unique_ptr<Task>> q_; std::thread pool_[8]; std::mutex mtx_; volatile bool stop_ {}; }; void foo( int a, int b ) { std::cout << a << "." << b; } void bar( std::string const & s) { std::cout << s; } int main() { ThreadPool pool; for( int i{}; i!=42; ++i ) { pool.AddTask( foo, 3, 14 ); pool.AddTask( bar, " - " ); } }


En lugar de mover el futuro a un objeto global (y administrar manualmente la eliminación de futuros no utilizados), puede moverlo al alcance local de la función llamada de forma asincrónica.

"Deje que la función asincrónica tome su propio futuro", por así decirlo.

He creado este envoltorio de plantilla que funciona para mí (probado en Windows):

#include <future> template<class Function, class... Args> void async_wrapper(Function&& f, Args&&... args, std::future<void>& future, std::future<void>&& is_valid, std::promise<void>&& is_moved) { is_valid.wait(); // Wait until the return value of std::async is written to "future" auto our_future = std::move(future); // Move "future" to a local variable is_moved.set_value(); // Only now we can leave void_async in the main thread // This is also used by std::async so that member function pointers work transparently auto functor = std::bind(f, std::forward<Args>(args)...); functor(); } template<class Function, class... Args> // This is what you call instead of std::async void void_async(Function&& f, Args&&... args) { std::future<void> future; // This is for std::async return value // This is for our synchronization of moving "future" between threads std::promise<void> valid; std::promise<void> is_moved; auto valid_future = valid.get_future(); auto moved_future = is_moved.get_future(); // Here we pass "future" as a reference, so that async_wrapper // can later work with std::async''s return value future = std::async( async_wrapper<Function, Args...>, std::forward<Function>(f), std::forward<Args>(args)..., std::ref(future), std::move(valid_future), std::move(is_moved) ); valid.set_value(); // Unblock async_wrapper waiting for "future" to become valid moved_future.wait(); // Wait for "future" to actually be moved }

Estoy un poco sorprendido de que funcione porque pensé que el destructor del futuro movido se bloquearía hasta que salgamos de async_wrapper . Debería esperar a que async_wrapper vuelva, pero está esperando dentro de esa misma función. Lógicamente, debería ser un punto muerto, pero no lo es.

También traté de agregar una línea al final de async_wrapper para vaciar manualmente el objeto futuro:

our_future = std::future<void>();

Esto tampoco bloquea.


Puede mover el futuro a un objeto global, por lo que cuando se ejecuta el destructor del futuro local no tiene que esperar a que se complete el subproceso asíncrono.

std::vector<std::future<void>> pending_futures; myResonseType processRequest(args...) { //Do some processing and valuate the address and the message... //Sending the e-mail async auto f = std::async(std::launch::async, sendMail, address, message); // transfer the future''s shared state to a longer-lived future pending_futures.push_back(std::move(f)); //returning the response ASAP to the client return myResponseType; }

NB Esto no es seguro si el hilo asincrónico se refiere a cualquier variable local en la función processRequest .

Mientras usa std::async (al menos en MSVC) está usando un grupo de subprocesos interno.

De hecho, no es conforme, el estándar explícitamente dice que las tareas que se ejecutan con std::launch::async deben ejecutarse como si estuvieran en un nuevo hilo, por lo que cualquier variable local del hilo no debe persistir de una tarea a otra. Sin embargo, por lo general no importa.