c++ boost boost-asio future

c++ - ¿Hay alguna forma de esperar asíncronamente un futuro en Boost Asio?



boost-asio future (2)

Mi problema es el siguiente. Comienzo varias operaciones de forma asincrónica, y quiero continuar hasta que todas hayan finalizado. Usando Boost Asio, la forma más directa de hacerlo es la siguiente. Supongamos que las tasks son algún tipo de contenedor de objetos que admiten alguna operación asincrónica.

tasksToGo = tasks.size(); for (auto task: tasks) { task.async_do_something([](const boost::system::error_code& ec) { if (ec) { // handle error } else { if (--taslsToGo == 0) { tasksFinished(); } } }); }

El problema con esta solución es que se siente como una solución alternativa. En Boost 1.54 puedo hacerlo con futuros, pero solo puedo esperar sincrónicamente, lo cual solo es posible desde un hilo separado de donde se llama run() .

for (auto task: tasks) { futures.push_back(task.async_do_something(boost::asio::use_future)); } for (auto future: futures) { future.wait(); }

Este código es mucho más claro que el anterior, pero necesito un hilo separado que no quiero. Quiero algo que pueda usarse así:

for (auto task: tasks) { futures.push_back(task.async_do_something(boost::asio::use_future)); } boost::asio::spawn(ioService, [](boost::asio::yield_context yield) { for (auto future: futures) { future.async_wait(yield); } tasksFinished(); }

¿Hay algo que pueda usarse de manera similar?


¿Qué hay de usar el concepto de trabajo? io_service :: run se ejecutará mientras el trabajo esté disponible y terminará cuando se elimine el trabajo tan pronto como no haya una tarea pendiente.

Antes de ejecutar ejecutar, crea una instancia de trabajo:

boost::shared_ptr<boost::asio::io_service::work> work( new boost::asio::io_service::work( io_service) );

Y en su otro hilo, llame tan pronto como quiera permitir que io_servce :: run termine.

work.reset();

ver también http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.stopping_the_io_service_from_running_out_of_work


Hasta donde yo sé, actualmente no hay soporte de primera clase para esto. Sin embargo, dada la dirección de la biblioteca, me sorprendería que esta funcionalidad no estuviera disponible en el futuro.

Se han propuesto algunos artículos para agregar soporte para este tipo de funcionalidad:

  • N3558: una representación estandarizada de operaciones asincrónicas es particularmente interesante. Propone when_all(futures) y future.next() . Si se implementa, entonces sería posible representar la cadena asíncrona como:

    for (auto task: tasks) { futures.push_back(task.async_do_something(boost::asio::use_future)); } when_all(futures).then(&tasksFinished);

  • N3562: los ejecutores y planificadores presentan ejecutores. Que se puede utilizar para proporcionar un control más fino en cuanto al contexto en el que se puede ejecutar una async . Para Boost.Asio, esto probablemente requeriría proporcionar algún tipo de ejecutor que difiere al io_service .

Si bien estos documentos aún están en curso, puede valer la pena revisar periódicamente la página de Boost.Thread''s Conformance and Extension y Boost.Asio''s github para las primeras adaptaciones de estas propuestas.

Hace un año tuve la necesidad de esta funcionalidad con una versión mucho más temprana de Boost, así que trabajé en mi propia solución. Todavía hay algunas áreas difíciles con respecto a la semántica, pero puede ser útil como material de referencia hasta que se adopte algo oficial. Antes de proporcionar el código, aquí hay una aplicación de ejemplo basada en su pregunta:

#include <iostream> #include <boost/asio.hpp> #include <boost/bind.hpp> #include "async_ops.hpp" void handle_timer(const boost::system::error_code& error, int x) { std::cout << "in handle timer: " << x << " : " << error.message() << std::endl; } void a() { std::cout << "a" << std::endl; } void b() { std::cout << "b" << std::endl; } void c() { std::cout << "c" << std::endl; } int main() { boost::asio::io_service io_service; boost::asio::deadline_timer timer1(io_service); boost::asio::deadline_timer timer2(io_service); // Create a chain that will continue once 2 handlers have been executed. chain all_expired = when_all(io_service, 2); all_expired.then(&a) // Once 2 handlers finish, run a within io_service. .then(&b) // Once a has finished, run b within io_service. .then(&c); // Once b has finished, run c within io_service. // Set expiration times for timers. timer1.expires_from_now(boost::posix_time::seconds(2)); timer2.expires_from_now(boost::posix_time::seconds(5)); // Asynchrnously wait for the timers, wrapping the handlers with the chain. timer1.async_wait(all_expired.wrap( boost::bind(&handle_timer, boost::asio::placeholders::error, 1))); timer2.async_wait(all_expired.wrap( boost::bind(&handle_timer, boost::asio::placeholders::error, 2))); // Run the io_service. io_service.run(); }

Que produce el siguiente resultado:

in handle timer: 1 : Success in handle timer: 2 : Success a b c

Y aquí está async_ops.hpp :

#include <vector> #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/bind/protect.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/foreach.hpp> #include <boost/function.hpp> #include <boost/make_shared.hpp> #include <boost/range/iterator_range.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread/locks.hpp> #include <boost/thread/mutex.hpp> #include <boost/type_traits/is_integral.hpp> #include <boost/type_traits/remove_reference.hpp> #include <boost/utility/enable_if.hpp> class chain; namespace detail { /// @brief Chained handler connects two handlers together that will /// be called sequentially. /// /// @note Type erasure is not performed on Handler1 to allow resolving /// to the correct asio_handler_invoke via ADL. template <typename Handler1> class chained_handler { public: template <typename Handler2> chained_handler(Handler1 handler1, Handler2 handler2) : handler1_(handler1), handler2_(handler2) {} void operator()() { handler1_(); handler2_(); } template <typename Arg1> void operator()(const Arg1& a1) { handler1_(a1); handler2_(); } template <typename Arg1, typename Arg2> void operator()(const Arg1& a1, const Arg2& a2) { handler1_(a1, a2); handler2_(); } //private: Handler1 handler1_; boost::function<void()> handler2_; }; /// @brief Hook that allows the sequential_handler to be invoked /// within specific context based on the hander''s type. template <typename Function, typename Handler> void asio_handler_invoke( Function function, chained_handler<Handler>* handler) { boost_asio_handler_invoke_helpers::invoke( function, handler->handler1_); } /// @brief No operation. void noop() {} /// @brief io_service_executor is used to wrap handlers, providing a /// deferred posting to an io_service. This allows for chains /// to inherit io_services from other chains. class io_service_executor : public boost::enable_shared_from_this<io_service_executor> { public: /// @brief Constructor. explicit io_service_executor(boost::asio::io_service* io_service) : io_service_(io_service) {} /// @brief Wrap a handler, returning a functor that will post the /// provided handler into the io_service. /// /// @param handler Handler to be wrapped for deferred posting. /// @return Functor that will post handler into io_service. template <typename Handler> boost::function<void()> wrap(Handler handler) { // By binding to the io_service_exectuer''s post, the io_service // into which the handler can be posted can be specified at a later // point in time. return boost::bind(&io_service_executor::post<Handler>, shared_from_this(), handler); } /// @brief Set the io_service. void io_service(boost::asio::io_service* io_service) { io_service_ = io_service; } /// @brief Get the io_service. boost::asio::io_service* io_service() { return io_service_; } private: /// @brief Post handler into the io_service. /// /// @param handler The handler to post. template <typename Handler> void post(Handler handler) { io_service_->post(handler); } private: boost::asio::io_service* io_service_; }; /// @brief chain_impl is an implementation for a chain. It is responsible /// for lifetime management, tracking posting and wrapped functions, /// as well as determining when run criteria has been satisfied. class chain_impl : public boost::enable_shared_from_this<chain_impl> { public: /// @brief Constructor. chain_impl(boost::shared_ptr<io_service_executor> executor, std::size_t required) : executor_(executor), required_(required) {} /// @brief Destructor will invoke all posted handlers. ~chain_impl() { run(); } /// @brief Post a handler that will be posted into the executor /// after run criteria has been satisfied. template <typename Handler> void post(const Handler& handler) { deferred_handlers_.push_back(executor_->wrap(handler)); } /// @brief Wrap a handler, returning a chained_handler. The returned /// handler will notify the impl when it has been invoked. template <typename Handler> chained_handler<Handler> wrap(const Handler& handler) { return chained_handler<Handler>( handler, // handler1 boost::bind(&chain_impl::complete, shared_from_this())); // handler2 } /// @brief Force run of posted handlers. void run() { boost::unique_lock<boost::mutex> guard(mutex_); run(guard); } /// @brief Get the executor. boost::shared_ptr<io_service_executor> executor() { return executor_; } private: /// @brief Completion handler invoked when a wrapped handler has been /// invoked. void complete() { boost::unique_lock<boost::mutex> guard(mutex_); // Update tracking. if (required_) --required_; // If criteria has not been met, then return early. if (required_) return; // Otherwise, run the handlers. run(guard); } /// @brief Run handlers. void run(boost::unique_lock<boost::mutex>& guard) { // While locked, swap handlers into a temporary. std::vector<boost::function<void()> > handlers; using std::swap; swap(handlers, deferred_handlers_); // Run handlers without mutex. guard.unlock(); BOOST_FOREACH(boost::function<void()>& handler, handlers) handler(); guard.lock(); } private: boost::shared_ptr<io_service_executor> executor_; boost::mutex mutex_; std::size_t required_; std::vector<boost::function<void()> > deferred_handlers_; }; /// @brief Functor used to wrap and post handlers or chains between two /// implementations. struct wrap_and_post { wrap_and_post( boost::shared_ptr<detail::chain_impl> current, boost::shared_ptr<detail::chain_impl> next ) : current_(current), next_(next) {} /// @brief Wrap a handler with next, then post into current. template <typename Handler> void operator()(Handler handler) { // Wrap the handler with the next implementation, then post into the // current. The wrapped handler will keep next alive, and posting into // current will cause next::complete to be invoked when current is ran. current_->post(next_->wrap(handler)); } /// @brief Wrap an entire chain, posting into the current. void operator()(chain chain); private: boost::shared_ptr<detail::chain_impl> current_; boost::shared_ptr<detail::chain_impl> next_; }; } // namespace detail /// @brief Used to indicate that the a chain will inherit its service from an /// outer chain. class inherit_service_type {}; inherit_service_type inherit_service; /// @brief Chain represents an asynchronous call chain, allowing the overall /// chain to be constructed in a verbose and explicit manner. class chain { public: /// @brief Constructor. /// /// @param io_service The io_service in which the chain will run. explicit chain(boost::asio::io_service& io_service) : impl_(boost::make_shared<detail::chain_impl>( boost::make_shared<detail::io_service_executor>(&io_service), 0)), root_impl_(impl_) {} /// @brief Constructor. The chain will inherit its io_service from an /// outer chain. explicit chain(inherit_service_type) : impl_(boost::make_shared<detail::chain_impl>( boost::make_shared<detail::io_service_executor>( static_cast<boost::asio::io_service*>(NULL)), 0)), root_impl_(impl_) {} /// @brief Force run posted handlers. void run() { root_impl_->run(); } /// @brief Chain link that will complete when the amount of wrapped /// handlers is equal to required. /// /// @param required The amount of handlers required to be complete. template <typename T> typename boost::enable_if<boost::is_integral< typename boost::remove_reference<T>::type>, chain>::type any(std::size_t required = 1) { return chain(root_impl_, required); } /// @brief Chain link that wraps all handlers in container, and will /// be complete when the amount of wrapped handlers is equal to /// required. /// /// @param Container of handlers to wrap. /// @param required The amount of handlers required to be complete. template <typename Container> typename boost::disable_if<boost::is_integral< typename boost::remove_reference<Container>::type>, chain>::type any(const Container& container, std::size_t required = 1) { return post(container, required); } /// @brief Chain link that wraps all handlers in iterator range, and will /// be complete when the amount of wrapped handlers is equal to /// required. /// /// @param Container of handlers to wrap. /// @param required The amount of handlers required to be complete. template <typename Iterator> chain any(Iterator begin, Iterator end, std::size_t required = 1) { return any(boost::make_iterator_range(begin, end), required); } /// @brief Chain link that will complete when the amount of wrapped /// handlers is equal to required. /// /// @param required The amount of handlers required to be complete. template <typename T> typename boost::enable_if<boost::is_integral< typename boost::remove_reference<T>::type>, chain>::type all(T required) { return any<T>(required); } /// @brief Chain link that wraps all handlers in container, and will /// be complete when all wrapped handlers from the container /// have been executed. /// /// @param Container of handlers to wrap. template <typename Container> typename boost::disable_if<boost::is_integral< typename boost::remove_reference<Container>::type>, chain>::type all(const Container& container) { return any(container, container.size()); } /// @brief Chain link that wraps all handlers in iterator range, and will /// be complete when all wrapped handlers from the iterator range /// have been executed. /// /// @param Container of handlers to wrap. template <typename Iterator> chain all(Iterator begin, Iterator end) { return all(boost::make_iterator_range(begin, end)); } /// @brief Chain link that represents a single sequential link. template <typename Handler> chain then(const Handler& handler) { boost::array<Handler, 1> handlers = {{handler}}; return all(handlers); } /// @brief Wrap a handler, returning a chained_handler. template <typename Handler> detail::chained_handler<Handler> wrap(const Handler& handler) { return impl_->wrap(handler); } /// @brief Set the executor. void executor(boost::asio::io_service& io_service) { impl_->executor()->io_service(&io_service); } /// @brief Check if this chain should inherit its executor. bool inherits_executor() { return !impl_->executor()->io_service(); } private: /// @brief Private constructor used to create links in the chain. /// /// @note All links maintain a handle to the root impl. When constructing a /// chain, this allows for links later in the chain to be stored as /// non-temporaries. chain(boost::shared_ptr<detail::chain_impl> root_impl, std::size_t required) : impl_(boost::make_shared<detail::chain_impl>( root_impl->executor(), required)), root_impl_(root_impl) {} /// @brief Create a new chain link, wrapping handlers and posting into /// the current chain. template <typename Container> chain post(const Container& container, std::size_t required) { // Create next chain. chain next(root_impl_, required); // Wrap handlers from the next chain, and post into the current chain. std::for_each(container.begin(), container.end(), detail::wrap_and_post(impl_, next.impl_)); return next; } private: boost::shared_ptr<detail::chain_impl> impl_; boost::shared_ptr<detail::chain_impl> root_impl_; }; void detail::wrap_and_post::operator()(chain c) { // If next does not have an executor, then inherit from current. if (c.inherits_executor()) c.executor(*current_->executor()->io_service()); // When current completes, start the chain. current_->post(boost::protect(boost::bind(&chain::run, c))); // The next impl needs to be aware of when the chain stops, so // wrap a noop and append it to the end of the chain. c.then(next_->wrap(&detail::noop)); } // Convenience functions. template <typename T, typename Handler> chain async(T& t, const Handler& handler) { return chain(t).then(handler); } template <typename T, typename Container> chain when_all(T& t, const Container& container) { return chain(t).all(container); } template <typename T, typename Iterator> chain when_all(T& t, Iterator begin, Iterator end) { return chain(t).all(begin, end); } template <typename T, typename Container> chain when_any(T& t, const Container& container) { return chain(t).any(container); } template <typename T, typename Iterator> chain when_any(T& t, Iterator begin, Iterator end) { return chain(t).any(begin, end); }

Aquí hay algunos ejemplos básicos para avanzar utilizando el código anterior con dos hilos. Mi notación:

  • a -> b expresa a continuación, b
  • (a | b) expresa a o b . Por lo tanto, (a | b) -> c implica cuando a o b terminan, luego ejecutan c .
  • (a & b) expresa a y b . Por lo tanto, (a & b) -> c implica cuando tanto a como b terminan, luego ejecuta c .

Antes de cada caso, imprimo la notación de la cadena. Además, cada función imprimirá una letra mayúscula al ingresar y una letra más baja al salir.

#include <iostream> #include <boost/asio.hpp> #include <boost/assign.hpp> #include <boost/thread.hpp> #include "async_ops.hpp" /// @brief Print identifiers when entering and exiting scope, /// sleeping between. void print_and_sleep(char id, unsigned int sleep_time) { std::cout << char(toupper(id)); boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time)); std::cout << char(tolower(id)); std::cout.flush(); } /// @brief Convenience function to create functors. boost::function<void()> make_fn(char id, unsigned int sleep_time) { return boost::bind(&print_and_sleep, id, sleep_time); } /// @brief Run an io_service with multiple threads. void run_service(boost::asio::io_service& io_service) { boost::thread_group threads; threads.create_thread(boost::bind( &boost::asio::io_service::run, &io_service)); io_service.run(); threads.join_all(); } int main() { boost::function<void()> a = make_fn(''a'', 500); boost::function<void()> b = make_fn(''b'', 1000); boost::function<void()> c = make_fn(''c'', 500); boost::function<void()> d = make_fn(''d'', 1000); boost::function<void()> e = make_fn(''e'', 500); { std::cout << "a -> b -> c/n" " "; boost::asio::io_service io_service; async(io_service, a) .then(b) .then(c); run_service(io_service); std::cout << std::endl; } { std::cout << "(a & b) -> c/n" " "; boost::asio::io_service io_service; when_all(io_service, boost::assign::list_of(a)(b)) .then(c); run_service(io_service); std::cout << std::endl; } { std::cout << "(a | b) -> c/n" " "; boost::asio::io_service io_service; when_any(io_service, boost::assign::list_of(a)(b)) .then(c); run_service(io_service); std::cout << std::endl; } { std::cout << "(a & b) -> (c & d)/n" " "; boost::asio::io_service io_service; when_all(io_service, boost::assign::list_of(a)(b)) .all(boost::assign::list_of(c)(d)); run_service(io_service); std::cout << std::endl; } { std::cout << "(a & b) -> c -> (d & e)/n" " "; boost::asio::io_service io_service; when_all(io_service, boost::assign::list_of(a)(b)) .then(c) .all(boost::assign::list_of(d)(e)); run_service(io_service); std::cout << std::endl; } std::cout << "(a & b) -> (c & d) -> e/n" " "; { boost::asio::io_service io_service; when_all(io_service, boost::assign::list_of(a)(b)) .all(boost::assign::list_of(c)(d)) .then(e); run_service(io_service); std::cout << std::endl; } std::cout << "(a | b) -> (c | d) -> e/n" " "; { boost::asio::io_service io_service; when_any(io_service, boost::assign::list_of(a)(b)) .any(boost::assign::list_of(c)(d)) .then(e); run_service(io_service); std::cout << std::endl; } std::cout << "(a | b) -> (c & d) -> e/n" " "; { boost::asio::io_service io_service; when_any(io_service, boost::assign::list_of(a)(b)) .all(boost::assign::list_of(c)(d)) .then(e); run_service(io_service); std::cout << std::endl; } { std::cout << "a -> ((b -> d) | c) -> e/n" " "; boost::asio::io_service io_service; async(io_service, a) .any(boost::assign::list_of (async(io_service, b).then(d)) (async(inherit_service, c))) .then(e); run_service(io_service); std::cout << std::endl; } }

Produce el siguiente resultado:

a -> b -> c AaBbCc (a & b) -> c ABabCc (a | b) -> c ABaCbc (a & b) -> (c & d) ABabCDcd (a & b) -> c -> (d & e) ABabCcDEed (a & b) -> (c & d) -> e ABabCDcdEe (a | b) -> (c | d) -> e ABaCbDcEed (a | b) -> (c & d) -> e ABaCbDcdEe a -> ((b -> d) | c) -> e AaBCcEbDed