c++ linux macos fork boost-asio

c++ - ¿Cómo manejo correctamente fork() con boost:: asio en un programa multiproceso?



linux macos (2)

Estoy teniendo algunos problemas para entender cómo manejar correctamente la creación de un proceso secundario desde un programa de multiproceso que utiliza Boost Asio de una manera de multiproceso.

Si entiendo correctamente, la forma de iniciar un proceso secundario en el mundo Unix es llamar a fork() seguido de un exec*() . Además, si entiendo correctamente, llamar a fork() duplicará todos los descriptores de archivo y así sucesivamente, y estos deben cerrarse en el proceso hijo a menos que se marque como FD_CLOEXEC (y, por lo tanto, se cierre de forma atómica al llamar a exec*() ).

Boost Asio requiere que se le notifique cuando se llame a fork() para que funcione correctamente llamando a notify_fork() . Sin embargo, en un programa multiproceso esto crea varios problemas:

  1. Los sockets son heredados de forma predeterminada por los procesos secundarios si lo entiendo correctamente. Se pueden establecer en SOCK_CLOEXEC , pero no directamente en la creación *, lo que lleva a una ventana de tiempo si se crea un proceso secundario desde otro hilo.

  2. notify_fork() requiere que ningún otro subproceso llame a ninguna otra función io_service , ni a ninguna función en ningún otro objeto de E / S asociado con el io_service . Esto realmente no parece ser factible, después de todo, el programa es multihilo por una razón.

  3. Si entiendo correctamente, cualquier llamada de función realizada entre fork() y exec*() debe ser segura para la señal asíncrona (consulte la documentación de fork() ). No hay documentación de que la llamada a notify_fork() sea ​​segura para la señal asíncrona. De hecho, si miro el código fuente de Boost Asio (al menos en la versión 1.54), puede haber llamadas a pthread_mutex_lock , que no son seguras para las señales asíncronas si las entiendo correctamente (vea Conceptos de señales , también se están realizando otras llamadas). que no están en la lista blanca).

Problema # 1 Probablemente pueda solucionar el problema al separar la creación de procesos secundarios y sockets + archivos para asegurar que no se cree ningún proceso secundario en la ventana entre un socket que se está creando y la configuración de SOCK_CLOEXEC . El problema # 2 es más complicado, probablemente necesito asegurarme de que todos los subprocesos de los manejadores de asio estén detenidos, hacer el fork y luego recrearlos de nuevo, lo que es tideous en el mejor de los casos, y realmente muy malo en el peor de los casos (¿qué pasa con mis temporizadores pendientes? ). El problema # 3 parece hacer que sea completamente imposible usar esto correctamente.

¿Cómo uso correctamente Boost Asio en un programa multiproceso junto con fork() + exec*() ? ... o estoy "bifurcado"?

Por favor, avíseme si he entendido mal algunos conceptos fundamentales (me planteo en la programación de Windows, no * nix ...).

Edición: * - En realidad, es posible crear sockets con SOCK_CLOEXEC configurado directamente en Linux, disponible desde 2.6.27 (consulte la documentación de socket() ). En Windows, el indicador correspondiente WSA_FLAG_NO_HANDLE_INHERIT está disponible desde Windows 7 SP 1 / Windows Server 2008 R2 SP 1 (consulte la documentación de WSASocket() ). OS X no parece apoyar esto sin embargo.


Considera lo siguiente:

  • fork() crea solo un hilo en el proceso hijo. Necesitarías recrear los otros hilos.
  • Los mutex retenidos por otros subprocesos en el proceso principal permanecen bloqueados para siempre en el proceso secundario porque los subprocesos propietarios no sobreviven a fork() . Las devoluciones de llamadas registradas con pthread_atfork() podrían liberar los mutex, pero la mayoría de las bibliotecas nunca se molestan en usar pthread_atfork() . En otras palabras, su proceso hijo podría bloquearse para siempre al llamar a malloc() o new porque el asignador de almacenamiento dinámico estándar utiliza mutexes.

A la luz de lo anterior, la única opción robusta en un proceso de subprocesos múltiples es llamar a fork() y luego a exec() .

Tenga en cuenta que el proceso padre no se ve afectado por fork() siempre que no se utilicen los controladores pthread_atfork() .

Con respecto a forking and boost::asio , hay una función notify_fork() que debe llamarse antes de forking en el padre y después de forking tanto en el padre como en el hijo. Lo que hace depende en última instancia del reactor que se utiliza. Para los reactores Linux / UNIX select_reactor , select_reactor , epoll_reactor , dev_poll_reactor , esta función no hace nada al padre antes o después de la bifurcación, pero en el hijo recrea el estado del reactor y vuelve a registrar los descriptores de archivo. Aunque no estoy seguro de lo que hace en Windows.

Puede encontrar un ejemplo de su uso en process_per_connection.cpp , simplemente puede copiarlo:

void handle_accept(const boost::system::error_code& ec) { if (!ec) { // Inform the io_service that we are about to fork. The io_service cleans // up any internal resources, such as threads, that may interfere with // forking. io_service_.notify_fork(boost::asio::io_service::fork_prepare); if (fork() == 0) { // Inform the io_service that the fork is finished and that this is the // child process. The io_service uses this opportunity to create any // internal file descriptors that must be private to the new process. io_service_.notify_fork(boost::asio::io_service::fork_child); // The child won''t be accepting new connections, so we can close the // acceptor. It remains open in the parent. acceptor_.close(); // The child process is not interested in processing the SIGCHLD signal. signal_.cancel(); start_read(); } else { // Inform the io_service that the fork is finished (or failed) and that // this is the parent process. The io_service uses this opportunity to // recreate any internal resources that were cleaned up during // preparation for the fork. io_service_.notify_fork(boost::asio::io_service::fork_parent); socket_.close(); start_accept(); } } else { std::cerr << "Accept error: " << ec.message() << std::endl; start_accept(); } }


En un programa multihilo, no es seguro invocar notify_fork() en el hijo. Sin embargo, Boost.Asio espera que se llame de acuerdo con el soporte de fork() , ya que esto ocurre cuando el hijo cierra los descriptores de archivos internos anteriores del padre y crea otros nuevos. Mientras Boost.Asio enumera explícitamente las condiciones previas para invocar io_service::notify_fork() , garantizando el estado de sus componentes internos durante la fork() , un breve vistazo a la implementation indica que std::vector::push_back() puede asigne memoria de la tienda libre, y no se garantiza que la asignación sea async-signal-safe.

Dicho esto, una solución que puede valer la pena considerar es fork() del proceso cuando todavía está en un solo hilo. El proceso hijo seguirá siendo de un solo hilo y ejecutará fork() y exec() cuando se lo indique el proceso principal a través de la comunicación entre procesos. Esta separación simplifica el problema al eliminar la necesidad de administrar el estado de varios subprocesos al realizar fork() y exec() .

Este es un ejemplo completo que demuestra este enfoque, en el que el servidor multihebra recibirá los nombres de archivos a través de UDP y un proceso secundario ejecutará fork() y exec() para ejecutar /usr/bin/touch en el nombre del archivo. Con la esperanza de hacer el ejemplo un poco más legible, he optado por usar coroutines apilables .

#include <unistd.h> // execl, fork #include <iostream> #include <string> #include <boost/bind.hpp> #include <boost/asio.hpp> #include <boost/asio/spawn.hpp> #include <boost/make_shared.hpp> #include <boost/shared_ptr.hpp> #include <boost/thread.hpp> /// @brief launcher receives a command from inter-process communication, /// and will then fork, allowing the child process to return to /// the caller. class launcher { public: launcher(boost::asio::io_service& io_service, boost::asio::local::datagram_protocol::socket& socket, std::string& command) : io_service_(io_service), socket_(socket), command_(command) {} void operator()(boost::asio::yield_context yield) { std::vector<char> buffer; while (command_.empty()) { // Wait for server to write data. std::cout << "launcher is waiting for data" << std::endl; socket_.async_receive(boost::asio::null_buffers(), yield); // Resize buffer and read all data. buffer.resize(socket_.available()); socket_.receive(boost::asio::buffer(buffer)); io_service_.notify_fork(boost::asio::io_service::fork_prepare); if (fork() == 0) // child { io_service_.notify_fork(boost::asio::io_service::fork_child); command_.assign(buffer.begin(), buffer.end()); } else // parent { io_service_.notify_fork(boost::asio::io_service::fork_parent); } } } private: boost::asio::io_service& io_service_; boost::asio::local::datagram_protocol::socket& socket_; std::string& command_; }; using boost::asio::ip::udp; /// @brief server reads filenames from UDP and then uses /// inter-process communication to delegate forking and exec /// to the child launcher process. class server { public: server(boost::asio::io_service& io_service, boost::asio::local::datagram_protocol::socket& socket, short port) : io_service_(io_service), launcher_socket_(socket), socket_(boost::make_shared<udp::socket>( boost::ref(io_service), udp::endpoint(udp::v4(), port))) {} void operator()(boost::asio::yield_context yield) { udp::endpoint sender_endpoint; std::vector<char> buffer; for (;;) { std::cout << "server is waiting for data" << std::endl; // Wait for data to become available. socket_->async_receive_from(boost::asio::null_buffers(), sender_endpoint, yield); // Resize buffer and read all data. buffer.resize(socket_->available()); socket_->receive_from(boost::asio::buffer(buffer), sender_endpoint); std::cout << "server got data: "; std::cout.write(&buffer[0], buffer.size()); std::cout << std::endl; // Write filename to launcher. launcher_socket_.async_send(boost::asio::buffer(buffer), yield); } } private: boost::asio::io_service& io_service_; boost::asio::local::datagram_protocol::socket& launcher_socket_; // To be used as a coroutine, server must be copyable, so make socket_ // copyable. boost::shared_ptr<udp::socket> socket_; }; int main(int argc, char* argv[]) { std::string filename; // Try/catch provides exception handling, but also allows for the lifetime // of the io_service and its IO objects to be controlled. try { if (argc != 2) { std::cerr << "Usage: <port>/n"; return 1; } boost::thread_group threads; boost::asio::io_service io_service; // Create two connected sockets for inter-process communication. boost::asio::local::datagram_protocol::socket parent_socket(io_service); boost::asio::local::datagram_protocol::socket child_socket(io_service); boost::asio::local::connect_pair(parent_socket, child_socket); io_service.notify_fork(boost::asio::io_service::fork_prepare); if (fork() == 0) // child { io_service.notify_fork(boost::asio::io_service::fork_child); parent_socket.close(); boost::asio::spawn(io_service, launcher(io_service, child_socket, filename)); } else // parent { io_service.notify_fork(boost::asio::io_service::fork_parent); child_socket.close(); boost::asio::spawn(io_service, server(io_service, parent_socket, std::atoi(argv[1]))); // Spawn additional threads. for (std::size_t i = 0; i < 3; ++i) { threads.create_thread( boost::bind(&boost::asio::io_service::run, &io_service)); } } io_service.run(); threads.join_all(); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << "/n"; } // Now that the io_service and IO objects have been destroyed, all internal // Boost.Asio file descriptors have been closed, so the execl should be // in a clean state. If the filename has been set, then exec touch. if (!filename.empty()) { std::cout << "creating file: " << filename << std::endl; execl("/usr/bin/touch", "touch", filename.c_str(), static_cast<char*>(0)); } }

Terminal 1:

$ ls a.out example.cpp $ ./a.out 12345 server is waiting for data launcher is waiting for data server got data: a server is waiting for data launcher is waiting for data creating file: a server got data: b server is waiting for data launcher is waiting for data creating file: b server got data: c server is waiting for data launcher is waiting for data creating file: c ctrl + c $ ls a a.out b c example.cpp

Terminal 2:

$ nc -u 127.0.0.1 12345 actrl + dbctrl + dcctrl + d