Cola de seguridad de hilos C++ 11
multithreading c++11 (7)
Agregando a la respuesta aceptada, diría que la implementación de una correcta cola multi productores / consumidores múltiples es difícil (más fácil desde C ++ 11, sin embargo)
Le sugiero que pruebe la (muy buena) biblioteca de impulso libre de bloqueo , la estructura "queue" hará lo que quiera, con garantías de espera / bloqueo y sin la necesidad de un compilador C ++ 11 .
Estoy agregando esta respuesta ahora porque la biblioteca sin candado es bastante nueva para impulsar (desde 1.53 creo)
Un proyecto en el que estoy trabajando usa varios subprocesos para trabajar en una colección de archivos. Cada hilo puede agregar archivos a la lista de archivos que se procesarán, así que armé (lo que pensé que era) una cola segura para subprocesos. Las porciones relevantes siguen:
// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
// notify waiting threads of a new item in the queue
void FileQueue::enqueue(std::string&& filename)
{
std::lock_guard<std::mutex> lock(qMutex);
q.push(std::move(filename));
// Notify anyone waiting for additional files that more have arrived
populatedNotifier.notify_one();
}
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if (q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
std::string ret = q.front();
q.pop();
return ret;
}
else {
return std::string();
}
}
else {
std::string ret = q.front();
q.pop();
return ret;
}
}
Sin embargo, de vez en cuando estoy segfaulting dentro del bloque if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }
, y la inspección en gdb indica que los valores predeterminados están ocurriendo porque la cola está vacía. ¿Cómo es esto posible? Comprendí que wait_for
solo devuelve cv_status::no_timeout
cuando se ha notificado, y esto solo debería ocurrir después de que FileQueue::enqueue
acaba de FileQueue::enqueue
un nuevo elemento en la cola.
De acuerdo con la condition_variables
estándar condition_variables
las variables pueden reactivarse espuriamente, incluso si el evento no se ha producido. En caso de cv_status::no_timeout
espuria, devolverá cv_status::no_timeout
(ya que se despertó en lugar de agotar el tiempo de espera), aunque no se haya notificado. La solución correcta para esto es, por supuesto, verificar si el despertador fue realmente legítimo antes de proceder.
Los detalles se especifican en el estándar §30.5.1 [thread.condition.condvar]:
-La función se desbloqueará cuando se la señalice mediante una llamada a notify_one (), una llamada a notify_all (), la expiración del tiempo de espera absoluto (30.2.4) especificado por abs_time, o de manera espuria.
...
Devuelve: cv_status :: timeout si el tiempo de espera absoluto (30.2.4) especificado por abs_time expiró, other-ise cv_status :: no_timeout.
Esta es probablemente la forma en que debes hacerlo:
void push(std::string&& filename)
{
{
std::lock_guard<std::mutex> lock(qMutex);
q.push(std::move(filename));
}
populatedNotifier.notify_one();
}
bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
return false;
filename = std::move(q.front());
q.pop();
return true;
}
Le puede gustar lfqueue, https://github.com/Taymindis/lfqueue . Es una cola simultánea sin bloqueo. Actualmente lo estoy usando para consumir la cola de múltiples llamadas entrantes y funciona como un encanto.
Reescribiría tu función de dequeue como:
std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
std::unique_lock<std::mutex> lock(qMutex);
while(q.empty()) {
if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout )
return std::string();
}
std::string ret = q.front();
q.pop();
return ret;
}
Es más corto y no tiene código duplicado como el tuyo. Solo problema, puede esperar más tiempo. Para evitar eso, debe recordar la hora de inicio antes del ciclo, verifique el tiempo de espera y ajuste el tiempo de espera en consecuencia. O especifica el tiempo absoluto en la condición de espera.
Solo mirándolo, cuando revisa una variable de condición, es mejor usar un ciclo while (de modo que si se despierta y aún no es inválido, vuelva a verificar). Acabo de escribir una plantilla para una cola asíncrona, espero que esto ayude.
#ifndef SAFE_QUEUE
#define SAFE_QUEUE
#include <queue>
#include <mutex>
#include <condition_variable>
// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
SafeQueue(void)
: q()
, m()
, c()
{}
~SafeQueue(void)
{}
// Add an element to the queue.
void enqueue(T t)
{
std::lock_guard<std::mutex> lock(m);
q.push(t);
c.notify_one();
}
// Get the "front"-element.
// If the queue is empty, wait till a element is avaiable.
T dequeue(void)
{
std::unique_lock<std::mutex> lock(m);
while(q.empty())
{
// release lock as long as the wait and reaquire it afterwards.
c.wait(lock);
}
T val = q.front();
q.pop();
return val;
}
private:
std::queue<T> q;
mutable std::mutex m;
std::condition_variable c;
};
#endif
También hay una solución GLib para este caso, aún no lo intenté, pero creo que es una buena solución. https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new