c++ multithreading boost udp boost-asio

c++ - boost:: asio async_receive_from Punto final UDP compartido entre hilos?



multithreading boost-asio (1)

Boost asio permite específicamente que múltiples hilos invoquen el método run () en un io_service. Esto parece una excelente manera de crear un servidor UDP multiproceso. Sin embargo, me he encontrado con un problema por el que estoy luchando para obtener una respuesta.

Mirando una típica llamada async_receive_from:

m_socket->async_receive_from( boost::asio::buffer(m_recv_buffer), m_remote_endpoint, boost::bind( &udp_server::handle_receive, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));

El punto final remoto y el búfer de mensajes no se pasan al manejador, pero tienen un nivel de alcance más alto (variable de miembro en mi ejemplo). El código para manejar el mensaje UDP cuando llegue se verá algo así como:

void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size) { // process message blah(m_recv_buffer, size); // send something back respond(m_remote_endpoint); }

Si hay varios subprocesos en ejecución, ¿cómo funciona la sincronización? Tener un punto final único y un búfer de recepción compartido entre los hilos implica que asio espera que un controlador complete en un solo hilo antes de llamar al controlador en otro hilo en el caso de que llegue un mensaje mientras tanto. Eso parece negar el punto de permitir que múltiples hilos invoquen ejecutar en primer lugar.

Si deseo recibir solicitudes simultáneas, parece que tengo que entregar los paquetes de trabajo, junto con una copia del punto final, a un hilo separado que permite que el método del controlador vuelva inmediatamente para que asio pueda pasar y pasar. otro mensaje en paralelo a otro de los hilos que llamó a run ().

Eso parece más que desagradable. ¿Que me estoy perdiendo aqui?


Tener un solo punto final y un búfer de recepción compartido entre los hilos implica que asio espera a que un manejador complete en un solo hilo

Si quiere decir "al ejecutar el servicio con un solo hilo", entonces esto es correcto.

De lo contrario, este no es el caso. En su lugar, Asio simplemente dice que el comportamiento es "indefinido" cuando llama operaciones en un único objeto de servicio (es decir, el socket, no el io_service) concurrentemente.

Eso parece negar el punto de permitir que múltiples hilos invoquen ejecutar en primer lugar.

No, a menos que el procesamiento lleve una cantidad considerable de tiempo.

Los primeros párrafos de la introducción de la muestra de Timer.5 parecen una buena exposición sobre su tema.

Sesión

Para separar los datos específicos de la solicitud (búfer y punto final), quiere alguna noción de una sesión. Un mecanismo popular en Asio es shared_ptr compartido o una clase de sesión shared-from-this (la vinculación boost admite el enlace a las instancias boost :: shared_ptr directamente).

Hebra

Para evitar el acceso concurrente y no sincronizado a los miembros de m_socket , puede agregar bloqueos o utilizar el enfoque de strand como se documenta en la muestra de Timer.5 vinculada anteriormente.

Manifestación

Aquí, para su disfrute, se encuentra el servidor daytime UDP asincrónico Daytime.6 , modificado para funcionar con muchos hilos IO del servicio.

Tenga en cuenta que, lógicamente, todavía hay un único hilo IO (el strand ) por lo que no violamos la seguridad de hilos documentada de la clase socket.

Sin embargo, a diferencia de la muestra oficial, las respuestas pueden ponerse en cola fuera de orden, dependiendo del tiempo que tome el procesamiento real en udp_session::handle_request .

Nota la

  • una clase udp_session para contener los búferes y el punto final remoto por solicitud
  • un grupo de subprocesos, que pueden escalar la carga del procesamiento real (no el IO) en múltiples núcleos.

#include <ctime> #include <iostream> #include <string> #include <boost/array.hpp> #include <boost/bind.hpp> #include <boost/shared_ptr.hpp> #include <boost/enable_shared_from_this.hpp> #include <boost/make_shared.hpp> #include <boost/asio.hpp> #include <boost/thread.hpp> using namespace boost; using asio::ip::udp; using system::error_code; std::string make_daytime_string() { using namespace std; // For time_t, time and ctime; time_t now = time(0); return ctime(&now); } class udp_server; // forward declaration struct udp_session : enable_shared_from_this<udp_session> { udp_session(udp_server* server) : server_(server) {} void handle_request(const error_code& error); void handle_sent(const error_code& ec, std::size_t) { // here response has been sent if (ec) { std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "/n"; } } udp::endpoint remote_endpoint_; array<char, 100> recv_buffer_; std::string message; udp_server* server_; }; class udp_server { typedef shared_ptr<udp_session> shared_session; public: udp_server(asio::io_service& io_service) : socket_(io_service, udp::endpoint(udp::v4(), 1313)), strand_(io_service) { receive_session(); } private: void receive_session() { // our session to hold the buffer + endpoint auto session = make_shared<udp_session>(this); socket_.async_receive_from( asio::buffer(session->recv_buffer_), session->remote_endpoint_, strand_.wrap( bind(&udp_server::handle_receive, this, session, // keep-alive of buffer/endpoint asio::placeholders::error, asio::placeholders::bytes_transferred))); } void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) { // now, handle the current session on any available pool thread socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec)); // immediately accept new datagrams receive_session(); } void enqueue_response(shared_session const& session) { socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_, strand_.wrap(bind(&udp_session::handle_sent, session, // keep-alive of buffer/endpoint asio::placeholders::error, asio::placeholders::bytes_transferred))); } udp::socket socket_; asio::strand strand_; friend struct udp_session; }; void udp_session::handle_request(const error_code& error) { if (!error || error == asio::error::message_size) { message = make_daytime_string(); // let''s assume this might be slow // let the server coordinate actual IO server_->enqueue_response(shared_from_this()); } } int main() { try { asio::io_service io_service; udp_server server(io_service); thread_group group; for (unsigned i = 0; i < thread::hardware_concurrency(); ++i) group.create_thread(bind(&asio::io_service::run, ref(io_service))); group.join_all(); } catch (std::exception& e) { std::cerr << e.what() << std::endl; } }

Pensamientos de cierre

Curiosamente, en la mayoría de los casos, verá que la versión de subproceso único funciona igual de bien, y no hay ninguna razón para complicar el diseño.

Alternativamente, puede usar un io_service subproceso io_service dedicado al IO y usar un pool de trabajadores anticuado para hacer el procesamiento en segundo plano de las solicitudes si esta es realmente la parte intensiva de CPU. En primer lugar, esto simplifica el diseño, en segundo lugar, esto podría mejorar el rendimiento en las tareas de IO porque ya no es necesario coordinar las tareas publicadas en el capítulo.