¿Existe una cola de bloqueo de un solo consumidor de múltiples productores para c++?
synchronization lock-free (4)
Cuanto más leo, más confuso me vuelvo ... Me hubiera parecido trivial encontrar una cola de mpsc formalmente correcta implementada en c ++.
Cada vez que encuentro otra puñalada, una investigación adicional parece sugerir que hay problemas como el ABA u otras condiciones de carrera sutiles.
Muchos hablan de la necesidad de recolección de basura. Esto es algo que quiero evitar.
¿Hay una implementación de código abierto correcta aceptada por ahí?
A continuación se muestra la técnica que utilicé para mi biblioteca cooperativa multitarea / multihilo (MACE) http://bytemaster.github.com/mace/ . Tiene la ventaja de estar libre de bloqueo, excepto cuando la cola está vacía.
struct task {
boost::function<void()> func;
task* next;
};
boost::mutex task_ready_mutex;
boost::condition_variable task_ready;
boost::atomic<task*> task_in_queue;
// this can be called from any thread
void thread::post_task( task* t ) {
// atomically post the task to the queue.
task* stale_head = task_in_queue.load(boost::memory_order_relaxed);
do { t->next = stale_head;
} while( !task_in_queue.compare_exchange_weak( stale_head, t, boost::memory_order_release ) );
// Because only one thread can post the ''first task'', only that thread will attempt
// to aquire the lock and therefore there should be no contention on this lock except
// when *this thread is about to block on a wait condition.
if( !stale_head ) {
boost::unique_lock<boost::mutex> lock(task_ready_mutex);
task_ready.notify_one();
}
}
// this is the consumer thread.
void process_tasks() {
while( !done ) {
// this will atomically pop everything that has been posted so far.
pending = task_in_queue.exchange(0,boost::memory_order_consume);
// pending is a linked list in ''reverse post order'', so process them
// from tail to head if you want to maintain order.
if( !pending ) { // lock scope
boost::unique_lock<boost::mutex> lock(task_ready_mutex);
// check one last time while holding the lock before blocking.
if( !task_in_queue ) task_ready.wait( lock );
}
}
Es posible que desee comprobar disruptor; está disponible en C ++ aquí: http://lmax-exchange.github.io/disruptor/
También puede encontrar una explicación de cómo funciona aquí en . Básicamente es un búfer circular sin bloqueo, optimizado para pasar mensajes FIFO entre subprocesos en ranuras de tamaño fijo.
Aquí hay dos implementaciones que me parecieron útiles: Cola de multi-productores multi-productor sin bloqueo en Ring Buffer @ NatSys Lab. Blog y
Otra implementación más de una cola de matriz circular sin bloqueo @ CodeProject
NOTA: el código de abajo es incorrecto, lo dejo solo como un ejemplo de lo difíciles que pueden ser estas cosas.
Si no te gusta la complejidad de la versión de Google, aquí hay algo similar a mí: es mucho más simple, pero lo dejo como un ejercicio para que el lector lo haga funcionar (es parte de un proyecto más grande, no portátil en este momento) . La idea es mantener un búfer circular para los datos y un pequeño conjunto de contadores para identificar las ranuras para escribir / escribir y leer / leer. Dado que cada contador está en su propia línea de caché, y (normalmente) cada uno solo se actualiza de forma atómica una vez en la vida de un mensaje, todos se pueden leer sin ninguna sincronización. Existe un posible punto de contención entre las post_done
escritura en post_done
, se requiere para la garantía FIFO. Los contadores (head_, wrtn_, rdng_, tail_) se seleccionaron para garantizar la corrección y FIFO, por lo que la eliminación de FIFO también requeriría un cambio de contadores (y eso podría ser difícil de hacer sin sacrificar la corrección). Es posible mejorar ligeramente el rendimiento para escenarios con un consumidor, pero no me molestaría, tendría que deshacerlo si se encuentran otros casos de uso con varios lectores.
En mi máquina, la latencia se ve como la siguiente (percentil a la izquierda, media dentro de este percentil a la derecha, la unidad es de microsegundos, medida por rdtsc):
total=1000000 samples, avg=0.24us
50%=0.214us, avg=0.093us
90%=0.23us, avg=0.151us
99%=0.322us, avg=0.159us
99.9%=15.566us, avg=0.173us
Estos resultados son para un solo consumidor de sondeo, es decir, el subproceso de trabajo que llama wheel.read () en un circuito cerrado y se comprueba si no está vacío (por ejemplo, desplácese hacia abajo). Los consumidores en espera (una utilización de la CPU mucho menor) esperarían en el evento (una de las funciones de acquire...
), esto agrega aproximadamente 1-2us a la latencia promedio debido al cambio de contexto.
Dado que hay muy poca contención en la lectura, los consumidores escalan muy bien con el número de subprocesos de trabajo, por ejemplo, para 3 subprocesos en mi máquina:
total=1500000 samples, avg=0.07us
50%=0us, avg=0us
90%=0.155us, avg=0.016us
99%=0.361us, avg=0.038us
99.9%=8.723us, avg=0.044us
Los parches serán bienvenidos :)
// Copyright (c) 2011-2012, Bronislaw (Bronek) Kozicki
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include <core/api.hxx>
#include <core/wheel/exception.hxx>
#include <boost/noncopyable.hpp>
#include <boost/type_traits.hpp>
#include <boost/lexical_cast.hpp>
#include <typeinfo>
namespace core { namespace wheel
{
struct bad_size : core::exception
{
template<typename T> explicit bad_size(const T&, size_t m)
: core::exception(std::string("Slot capacity exceeded, sizeof(")
+ typeid(T).name()
+ ") = "
+ boost::lexical_cast<std::string>(sizeof(T))
+ ", capacity = "
+ boost::lexical_cast<std::string>(m)
)
{}
};
// inspired by Disruptor
template <typename Header>
class wheel : boost::noncopyable
{
__declspec(align(64))
struct slot_detail
{
// slot write: (memory barrier in wheel) > post_done > (memory barrier in wheel)
// slot read: (memory barrier in wheel) > read_done > (memory barrier in wheel)
// done writing or reading, must update wrtn_ or tail_ in wheel, as appropriate
template <bool Writing>
void done(wheel* w)
{
if (Writing)
w->post_done(sequence);
else
w->read_done();
}
// cache line for sequence number and header
long long sequence;
Header header;
// there is no such thing as data type with variable size, but we need it to avoid thrashing
// cache - so we invent one. The memory is reserved in runtime and we simply go beyond last element.
// This is well into UB territory! Using template parameter for this is not good, since it
// results in this small implementation detail leaking to all possible user interfaces.
__declspec(align(8))
char data[8];
};
// use this as a storage space for slot_detail, to guarantee 64 byte alignment
_declspec(align(64))
struct slot_block { long long padding[8]; };
public:
// wrap slot data to outside world
template <bool Writable>
class slot
{
template<typename> friend class wheel;
slot& operator=(const slot&); // moveable but non-assignable
// may only be constructed by wheel
slot(slot_detail* impl, wheel<Header>* w, size_t c)
: slot_(impl) , wheel_(w) , capacity_(c)
{}
public:
slot(slot&& s)
: slot_(s.slot_) , wheel_(s.wheel_) , capacity_(s.capacity_)
{
s.slot_ = NULL;
}
~slot()
{
if (slot_)
{
slot_->done<Writable>(wheel_);
}
}
// slot accessors - use Header to store information on what type is actually stored in data
bool empty() const { return !slot_; }
long long sequence() const { return slot_->sequence; }
Header& header() { return slot_->header; }
char* data() { return slot_->data; }
template <typename T> T& cast()
{
static_assert(boost::is_pod<T>::value, "Data type must be POD");
if (sizeof(T) > capacity_)
throw bad_size(T(), capacity_);
if (empty())
throw no_data();
return *((T*) data());
}
private:
slot_detail* slot_;
wheel<Header>* wheel_;
const size_t capacity_;
};
private:
// dynamic size of slot, with extra capacity, expressed in 64 byte blocks
static size_t sizeof_slot(size_t s)
{
size_t m = sizeof(slot_detail);
// add capacity less 8 bytes already within sizeof(slot_detail)
m += max(8, s) - 8;
// round up to 64 bytes, i.e. alignment of slot_detail
size_t r = m & ~(unsigned int)63;
if (r < m)
r += 64;
r /= 64;
return r;
}
// calculate actual slot capacity back from number of 64 byte blocks
static size_t slot_capacity(size_t s)
{
return s*64 - sizeof(slot_detail) + 8;
}
// round up to power of 2
static size_t round_size(size_t s)
{
// enfore minimum size
if (s <= min_size)
return min_size;
// find rounded value
--s;
size_t r = 1;
while (s)
{
s >>= 1;
r <<= 1;
};
return r;
}
slot_detail& at(long long sequence)
{
// find index from sequence number and return slot at found index of the wheel
return *((slot_detail*) &wheel_[(sequence & (size_ - 1)) * blocks_]);
}
public:
wheel(size_t capacity, size_t size)
: head_(0) , wrtn_(0) , rdng_(0) , tail_(0) , event_()
, blocks_(sizeof_slot(capacity)) , capacity_(slot_capacity(blocks_)) , size_(round_size(size))
{
static_assert(boost::is_pod<Header>::value, "Header type must be POD");
static_assert(sizeof(slot_block) == 64, "This was unexpected");
wheel_ = new slot_block[size_ * blocks_];
// all slots must be initialised to 0
memset(wheel_, 0, size_ * 64 * blocks_);
active_ = 1;
}
~wheel()
{
stop();
delete[] wheel_;
}
// all accessors needed
size_t capacity() const { return capacity_; } // capacity of a single slot
size_t size() const { return size_; } // number of slots available
size_t queue() const { return (size_t)head_ - (size_t)tail_; }
bool active() const { return active_ == 1; }
// enough to call it just once, to fine tune slot capacity
template <typename T>
void check() const
{
static_assert(boost::is_pod<T>::value, "Data type must be POD");
if (sizeof(T) > capacity_)
throw bad_size(T(), capacity_);
}
// stop the wheel - safe to execute many times
size_t stop()
{
InterlockedExchange(&active_, 0);
// must wait for current read to complete
while (rdng_ != tail_)
Sleep(10);
return size_t(head_ - tail_);
}
// return first available slot for write
slot<true> post()
{
if (!active_)
throw stopped();
// the only memory barrier on head seq. number we need, if not overflowing
long long h = InterlockedIncrement64(&head_);
while(h - (long long) size_ > tail_)
{
if (InterlockedDecrement64(&head_) == h - 1)
throw overflowing();
// protection against case of race condition when we are overflowing
// and two or more threads try to post and two or more messages are read,
// all at the same time. If this happens we must re-try, otherwise we
// could have skipped a sequence number - causing infinite wait in post_done
Sleep(0);
h = InterlockedIncrement64(&head_);
}
slot_detail& r = at(h);
r.sequence = h;
// wrap in writeable slot
return slot<true>(&r, this, capacity_);
}
// return first available slot for write, nothrow variant
slot<true> post(std::nothrow_t)
{
if (!active_)
return slot<true>(NULL, this, capacity_);
// the only memory barrier on head seq. number we need, if not overflowing
long long h = InterlockedIncrement64(&head_);
while(h - (long long) size_ > tail_)
{
if (InterlockedDecrement64(&head_) == h - 1)
return slot<true>(NULL, this, capacity_);
// must retry if race condition described above
Sleep(0);
h = InterlockedIncrement64(&head_);
}
slot_detail& r = at(h);
r.sequence = h;
// wrap in writeable slot
return slot<true>(&r, this, capacity_);
}
// read first available slot for read
slot<false> read()
{
slot_detail* r = NULL;
// compare rdng_ and wrtn_ early to avoid unnecessary memory barrier
if (active_ && rdng_ < wrtn_)
{
// the only memory barrier on reading seq. number we need
const long long h = InterlockedIncrement64(&rdng_);
// check if this slot has been written, step back if not
if (h > wrtn_)
InterlockedDecrement64(&rdng_);
else
r = &at(h);
}
// wrap in readable slot
return slot<false>(r , this, capacity_);
}
// waiting for new post, to be used by non-polling clients
void acquire()
{
event_.acquire();
}
bool try_acquire()
{
return event_.try_acquire();
}
bool try_acquire(unsigned long timeout)
{
return event_.try_acquire(timeout);
}
void release()
{}
private:
void post_done(long long sequence)
{
const long long t = sequence - 1;
// the only memory barrier on written seq. number we need
while(InterlockedCompareExchange64(&wrtn_, sequence, t) != t)
Sleep(0);
// this is outside of critical path for polling clients
event_.set();
}
void read_done()
{
// the only memory barrier on tail seq. number we need
InterlockedIncrement64(&tail_);
}
// each in its own cache line
// head_ - wrtn_ = no. of messages being written at this moment
// rdng_ - tail_ = no. of messages being read at the moment
// head_ - tail_ = no. of messages to read (including those being written and read)
// wrtn_ - rdng_ = no. of messages to read (excluding those being written or read)
__declspec(align(64)) volatile long long head_; // currently writing or written
__declspec(align(64)) volatile long long wrtn_; // written
__declspec(align(64)) volatile long long rdng_; // currently reading or read
__declspec(align(64)) volatile long long tail_; // read
__declspec(align(64)) volatile long active_; // flag switched to 0 when stopped
__declspec(align(64))
api::event event_; // set when new message is posted
const size_t blocks_; // number of 64-byte blocks in a single slot_detail
const size_t capacity_; // capacity of data() section per single slot. Initialisation depends on blocks_
const size_t size_; // number of slots available, always power of 2
slot_block* wheel_;
};
}}
Aquí está lo que puede verse el hilo del trabajador consumidor de encuestas:
while (wheel.active())
{
core::wheel::wheel<int>::slot<false> slot = wheel.read();
if (!slot.empty())
{
Data& d = slot.cast<Data>();
// do work
}
// uncomment below for waiting consumer, saving CPU cycles
// else
// wheel.try_acquire(10);
}
Editado ejemplo de consumidor agregado
La implementación más adecuada depende de las propiedades deseadas de una cola. ¿Debería ser ilimitado o acotado uno está bien? ¿Debería ser linearizable , o requisitos menos estrictos estarían bien? ¿Qué tan fuerte te garantiza FIFO que necesitas? ¿Está dispuesto a pagar el costo de revertir la lista por parte del consumidor (existe una implementación muy simple en la que el consumidor toma la cola de una lista con un solo enlace, obteniendo de una vez todos los elementos puestos por los productores hasta el momento)? ¿Debería garantizar que nunca se bloquee un hilo, o hay pocas posibilidades de que se bloquee un hilo? Y etc.
Algunos enlaces útiles:
¿Es posible un productor único, un único consumidor en un entorno sin bloqueo?
http://www.1024cores.net/home/lock-free-algorithms/queues
http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
https://groups.google.com/group/comp.programming.threads/browse_frm/thread/33f79c75146582f3
Espero que ayude.
Supongo que no existe tal cosa, y si lo hace, o no es portátil o no es de código abierto.
Conceptualmente, está intentando controlar dos punteros simultáneamente: el puntero de tail
y el tail->next
puntero de tail
. En general, eso no se puede hacer con primitivos libres de bloqueo.