switch sentencia resueltos que programacion lenguaje else ejercicios ejemplos caracteres cadenas cadena c++ multithreading c++11 synchronization condition-variable

sentencia - ¿Cuál es la mejor manera de esperar en múltiples variables de condición en C++ 11?



sentencia if en c++ (3)

Recientemente resolví este problema con la ayuda de una variable de condición única y una variable booleana independiente para cada productor / trabajador. El predicado dentro de la función de espera en el subproceso del consumidor puede verificar estas banderas y tomar la decisión de qué productor / trabajador ha cumplido la condición.

Primero, un poco de contexto : estoy en el proceso de aprender acerca de subprocesos en C ++ 11 y para este propósito, estoy tratando de construir una clase de pequeños actor , esencialmente (dejé fuera la materia de manejo y propagación de excepciones) así como :

class actor { private: std::atomic<bool> stop; private: std::condition_variable interrupt; private: std::thread actor_thread; private: message_queue incoming_msgs; public: actor() : stop(false), actor_thread([&]{ run_actor(); }) {} public: virtual ~actor() { // if the actor is destroyed, we must ensure the thread dies too stop = true; // to this end, we have to interrupt the actor thread which is most probably // waiting on the incoming_msgs queue: interrupt.notify_all(); actor_thread.join(); } private: virtual void run_actor() { try { while(!stop) // wait for new message and process it // but interrupt the waiting process if interrupt is signaled: process(incoming_msgs.wait_and_pop(interrupt)); } catch(interrupted_exception) { // ... } }; private: virtual void process(const message&) = 0; // ... };

Cada actor se ejecuta en su propio actor_thread , espera un nuevo mensaje incoming_msgs en incoming_msgs y, cuando llega un mensaje, lo procesa.

El actor_thread se crea junto con el actor y debe morir junto con él, por lo que necesito algún tipo de mecanismo de interrupción en el message_queue::wait_and_pop(std::condition_variable interrupt) .

Esencialmente, requiero que wait_and_pop bloquee hasta que a) llegue un nuevo message o b) hasta que se dispare la interrupt , en cuyo caso, idealmente, se debe lanzar una interrupted_exception .

La llegada de un nuevo mensaje en el message_queue se modela actualmente también por una std::condition_variable new_msg_notification :

// ... // in class message_queue: message wait_and_pop(std::condition_variable& interrupt) { std::unique_lock<std::mutex> lock(mutex); // How to interrupt the following, when interrupt fires?? new_msg_notification.wait(lock,[&]{ return !queue.empty(); }); auto msg(std::move(queue.front())); queue.pop(); return msg; }

Para resumir la historia larga, la pregunta es: ¿Cómo interrumpo la espera de un nuevo mensaje en new_msg_notification.wait(...) cuando se activa la interrupt (sin introducir un tiempo de espera)?

Alternativamente, la pregunta puede leerse como: ¿Cómo espero hasta que se señale cualquiera de dos std::condition_variable s?

Un enfoque ingenuo parece ser no usar std::condition_variable en absoluto para la interrupción y, en su lugar, simplemente usar una bandera atómica std::atomic<bool> interrupted y luego ocupada esperar en new_msg_notification con un tiempo de espera muy pequeño hasta que new_msg_notification una nueva Ha llegado el mensaje o hasta que se true==interrupted . Sin embargo, me gustaría mucho evitar la espera ocupada.

EDITAR:

A partir de los comentarios y la respuesta de pilcrow, parece que básicamente hay dos enfoques posibles.

  1. Poner en cola un mensaje especial de "Terminar", tal como lo propusieron Alan, mukunda y pilcrow. Decidí no optar a esta opción porque no tengo idea del tamaño de la cola en el momento en que quiero que el actor termine. Puede muy bien ser (ya que es principalmente el caso cuando quiero que algo termine rápidamente) que hay miles de mensajes pendientes de procesar en la cola y parece inaceptable esperar a que se procesen hasta que finalmente el mensaje de finalización llegue a su fin. giro.
  2. Implemente una versión personalizada de una variable de condición, que puede ser interrumpida por otro hilo reenviando la notificación a la variable de condición en la que el primer hilo está esperando. Opté por este enfoque.

Para aquellos de ustedes interesados, mi implementación es la siguiente. La variable de condición en mi caso es en realidad un semaphore (porque me gustan más y porque me gustó el ejercicio de hacerlo). He equipado este semáforo con una interrupt asociada que se puede obtener del semáforo a través del semaphore::get_interrupt() . Si ahora un hilo bloquea en semaphore::wait() , otro thread tiene la posibilidad de llamar a semaphore::interrupt::trigger() en la interrupción del semáforo, lo que causa que el primer hilo desbloquee y propague una interrupt_exception .

struct interrupt_exception {}; class semaphore { public: class interrupt; private: mutable std::mutex mutex; // must be declared after our mutex due to construction order! private: interrupt* informed_by; private: std::atomic<long> counter; private: std::condition_variable cond; public: semaphore(); public: ~semaphore() throw(); public: void wait(); public: interrupt& get_interrupt() const { return *informed_by; } public: void post() { std::lock_guard<std::mutex> lock(mutex); counter++; cond.notify_one(); // never throws } public: unsigned long load () const { return counter.load(); } }; class semaphore::interrupt { private: semaphore *forward_posts_to; private: std::atomic<bool> triggered; public: interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) { assert(forward_posts_to); std::lock_guard<std::mutex> lock(forward_posts_to->mutex); forward_posts_to->informed_by = this; } public: void trigger() { assert(forward_posts_to); std::lock_guard<std::mutex>(forward_posts_to->mutex); triggered = true; forward_posts_to->cond.notify_one(); // never throws } public: bool is_triggered () const throw() { return triggered.load(); } public: void reset () throw() { return triggered.store(false); } }; semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {} // must be declared here because otherwise semaphore::interrupt is an incomplete type semaphore::~semaphore() throw() { delete informed_by; } void semaphore::wait() { std::unique_lock<std::mutex> lock(mutex); if(0L==counter) { cond.wait(lock,[&]{ if(informed_by->is_triggered()) throw interrupt_exception(); return counter>0; }); } counter--; }

Usando este semaphore , la implementación de mi cola de mensajes ahora se ve así (usando el semáforo en lugar de la std::condition_variable podría deshacerme del std::mutex :

class message_queue { private: std::queue<message> queue; private: semaphore new_msg_notification; public: void push(message&& msg) { queue.push(std::move(msg)); new_msg_notification.post(); } public: const message wait_and_pop() { new_msg_notification.wait(); auto msg(std::move(queue.front())); queue.pop(); return msg; } public: semaphore::interrupt& get_interrupt() const { return new_msg_notification.get_interrupt(); } };

Mi actor , ahora es capaz de interrumpir su hilo con una latencia muy baja en su hilo. La implementación actualmente es así:

class actor { private: message_queue incoming_msgs; /// must be declared after incoming_msgs due to construction order! private: semaphore::interrupt& interrupt; private: std::thread my_thread; private: std::exception_ptr exception; public: actor() : interrupt(incoming_msgs.get_interrupt()), my_thread( [&]{ try { run_actor(); } catch(...) { exception = std::current_exception(); } }) {} private: virtual void run_actor() { while(!interrupt.is_triggered()) process(incoming_msgs.wait_and_pop()); }; private: virtual void process(const message&) = 0; public: void notify(message&& msg_in) { incoming_msgs.push(std::forward<message>(msg_in)); } public: virtual ~actor() throw (interrupt_exception) { interrupt.trigger(); my_thread.join(); if(exception) std::rethrow_exception(exception); } };


Tal vez esto pueda funcionar:

deshacerse de la interrupcion.

message wait_and_pop(std::condition_variable& interrupt) { std::unique_lock<std::mutex> lock(mutex); { new_msg_notification.wait(lock,[&]{ return !queue.empty() || stop; }); if( !stop ) { auto msg(std::move(queue.front())); queue.pop(); return msg; } else { return NULL; //or some ''terminate'' message } }

En el destructor, reemplaza interrupt.notify_all() con new_msg_notification.notify_all()


Usted pregunta,

¿Cuál es la mejor manera de esperar en múltiples variables de condición en C ++ 11?

No puedes, y debes rediseñar. Un subproceso puede esperar en una sola variable de condición (y su mutex asociado) a la vez. En este sentido, las facilidades de sincronización de Windows son más ricas que las de la familia de primitivos de sincronización "estilo POSIX".

El enfoque típico con colas seguras para subprocesos es poner en cola un especial "¡todo listo!" mensaje, o para diseñar una cola "rompible" (o "capaz de apagarse"). En este último caso, la variable de condición interna de la cola protege un predicado complejo: o hay un elemento disponible o la cola se ha roto.

En un comentario observas que

a Notify_all () no tendrá efecto si no hay nadie esperando

Eso es cierto pero probablemente no sea relevante. wait() en una variable de condición también implica verificar un predicado y verificarlo antes de bloquear realmente una notificación. Por lo tanto, un subproceso de trabajo ocupado procesa un elemento de la cola que "pierde" un notify_all() notará, la próxima vez que inspeccione la condición de la cola, que el predicado (un nuevo elemento esté disponible, o que la cola haya terminado) haya cambiado .