c++ - boost:: asio async_receive_from Punto final UDP compartido entre hilos?
multithreading boost-asio (1)
Boost asio permite específicamente que múltiples hilos invoquen el método run () en un io_service. Esto parece una excelente manera de crear un servidor UDP multiproceso. Sin embargo, me he encontrado con un problema por el que estoy luchando para obtener una respuesta.
Mirando una típica llamada async_receive_from:
m_socket->async_receive_from(
boost::asio::buffer(m_recv_buffer),
m_remote_endpoint,
boost::bind(
&udp_server::handle_receive,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
El punto final remoto y el búfer de mensajes no se pasan al manejador, pero tienen un nivel de alcance más alto (variable de miembro en mi ejemplo). El código para manejar el mensaje UDP cuando llegue se verá algo así como:
void dns_server::handle_receive(const boost::system::error_code &error, std::size_t size)
{
// process message
blah(m_recv_buffer, size);
// send something back
respond(m_remote_endpoint);
}
Si hay varios subprocesos en ejecución, ¿cómo funciona la sincronización? Tener un punto final único y un búfer de recepción compartido entre los hilos implica que asio espera que un controlador complete en un solo hilo antes de llamar al controlador en otro hilo en el caso de que llegue un mensaje mientras tanto. Eso parece negar el punto de permitir que múltiples hilos invoquen ejecutar en primer lugar.
Si deseo recibir solicitudes simultáneas, parece que tengo que entregar los paquetes de trabajo, junto con una copia del punto final, a un hilo separado que permite que el método del controlador vuelva inmediatamente para que asio pueda pasar y pasar. otro mensaje en paralelo a otro de los hilos que llamó a run ().
Eso parece más que desagradable. ¿Que me estoy perdiendo aqui?
Tener un solo punto final y un búfer de recepción compartido entre los hilos implica que asio espera a que un manejador complete en un solo hilo
Si quiere decir "al ejecutar el servicio con un solo hilo", entonces esto es correcto.
De lo contrario, este no es el caso. En su lugar, Asio simplemente dice que el comportamiento es "indefinido" cuando llama operaciones en un único objeto de servicio (es decir, el socket, no el io_service) concurrentemente.
Eso parece negar el punto de permitir que múltiples hilos invoquen ejecutar en primer lugar.
No, a menos que el procesamiento lleve una cantidad considerable de tiempo.
Los primeros párrafos de la introducción de la muestra de Timer.5 parecen una buena exposición sobre su tema.
Sesión
Para separar los datos específicos de la solicitud (búfer y punto final), quiere alguna noción de una sesión. Un mecanismo popular en Asio es shared_ptr
compartido o una clase de sesión shared-from-this (la vinculación boost admite el enlace a las instancias boost :: shared_ptr directamente).
Hebra
Para evitar el acceso concurrente y no sincronizado a los miembros de m_socket
, puede agregar bloqueos o utilizar el enfoque de strand
como se documenta en la muestra de Timer.5 vinculada anteriormente.
Manifestación
Aquí, para su disfrute, se encuentra el servidor daytime UDP asincrónico Daytime.6 , modificado para funcionar con muchos hilos IO del servicio.
Tenga en cuenta que, lógicamente, todavía hay un único hilo IO (el strand
) por lo que no violamos la seguridad de hilos documentada de la clase socket.
Sin embargo, a diferencia de la muestra oficial, las respuestas pueden ponerse en cola fuera de orden, dependiendo del tiempo que tome el procesamiento real en udp_session::handle_request
.
Nota la
- una clase
udp_session
para contener los búferes y el punto final remoto por solicitud - un grupo de subprocesos, que pueden escalar la carga del procesamiento real (no el IO) en múltiples núcleos.
#include <ctime>
#include <iostream>
#include <string>
#include <boost/array.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/make_shared.hpp>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
using namespace boost;
using asio::ip::udp;
using system::error_code;
std::string make_daytime_string()
{
using namespace std; // For time_t, time and ctime;
time_t now = time(0);
return ctime(&now);
}
class udp_server; // forward declaration
struct udp_session : enable_shared_from_this<udp_session> {
udp_session(udp_server* server) : server_(server) {}
void handle_request(const error_code& error);
void handle_sent(const error_code& ec, std::size_t) {
// here response has been sent
if (ec) {
std::cout << "Error sending response to " << remote_endpoint_ << ": " << ec.message() << "/n";
}
}
udp::endpoint remote_endpoint_;
array<char, 100> recv_buffer_;
std::string message;
udp_server* server_;
};
class udp_server
{
typedef shared_ptr<udp_session> shared_session;
public:
udp_server(asio::io_service& io_service)
: socket_(io_service, udp::endpoint(udp::v4(), 1313)),
strand_(io_service)
{
receive_session();
}
private:
void receive_session()
{
// our session to hold the buffer + endpoint
auto session = make_shared<udp_session>(this);
socket_.async_receive_from(
asio::buffer(session->recv_buffer_),
session->remote_endpoint_,
strand_.wrap(
bind(&udp_server::handle_receive, this,
session, // keep-alive of buffer/endpoint
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
}
void handle_receive(shared_session session, const error_code& ec, std::size_t /*bytes_transferred*/) {
// now, handle the current session on any available pool thread
socket_.get_io_service().post(bind(&udp_session::handle_request, session, ec));
// immediately accept new datagrams
receive_session();
}
void enqueue_response(shared_session const& session) {
socket_.async_send_to(asio::buffer(session->message), session->remote_endpoint_,
strand_.wrap(bind(&udp_session::handle_sent,
session, // keep-alive of buffer/endpoint
asio::placeholders::error,
asio::placeholders::bytes_transferred)));
}
udp::socket socket_;
asio::strand strand_;
friend struct udp_session;
};
void udp_session::handle_request(const error_code& error)
{
if (!error || error == asio::error::message_size)
{
message = make_daytime_string(); // let''s assume this might be slow
// let the server coordinate actual IO
server_->enqueue_response(shared_from_this());
}
}
int main()
{
try {
asio::io_service io_service;
udp_server server(io_service);
thread_group group;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
group.create_thread(bind(&asio::io_service::run, ref(io_service)));
group.join_all();
}
catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
}
Pensamientos de cierre
Curiosamente, en la mayoría de los casos, verá que la versión de subproceso único funciona igual de bien, y no hay ninguna razón para complicar el diseño.
Alternativamente, puede usar un io_service
subproceso io_service
dedicado al IO y usar un pool de trabajadores anticuado para hacer el procesamiento en segundo plano de las solicitudes si esta es realmente la parte intensiva de CPU. En primer lugar, esto simplifica el diseño, en segundo lugar, esto podría mejorar el rendimiento en las tareas de IO porque ya no es necesario coordinar las tareas publicadas en el capítulo.