Tengo una aplicación en $ work donde tengo que moverme entre dos subprocesos en tiempo real que están programados en diferentes frecuencias. (La programación real está fuera de mi control). La aplicación es difícil en tiempo real (uno de los subprocesos tiene que controlar una interfaz de hardware), por lo que la transferencia de datos entre los subprocesos debe estar libre de bloqueos y de espera para el en la medida de lo posible.

Es importante tener en cuenta que solo es necesario transferir un bloque de datos: debido a que los dos subprocesos se ejecutan a velocidades diferentes, habrá ocasiones en que se completen dos iteraciones del subproceso más rápido entre dos activaciones del subproceso más lento; en este caso, está bien sobrescribir los datos en el búfer de escritura para que el subproceso más lento solo obtenga los datos más recientes.

En otras palabras, en lugar de una cola, una solución con doble búfer es suficiente. Los dos buffers se asignan durante la inicialización, y el lector y los hilos de escritura pueden llamar a los métodos de la clase para obtener punteros a uno de estos buffers.

Código C ++:

#include <mutex> template <typename T> class ProducerConsumerDoubleBuffer { public: ProducerConsumerDoubleBuffer() { m_write_busy = false; m_read_idx = m_write_idx = 0; } ~ProducerConsumerDoubleBuffer() { } // The writer thread using this class must call // start_writing() at the start of its iteration // before doing anything else to get the pointer // to the current write buffer. T * start_writing(void) { std::lock_guard<std::mutex> lock(m_mutex); m_write_busy = true; m_write_idx = 1 - m_read_idx; return &m_buf[m_write_idx]; } // The writer thread must call end_writing() // as the last thing it does // to release the write busy flag. void end_writing(void) { std::lock_guard<std::mutex> lock(m_mutex); m_write_busy = false; } // The reader thread must call start_reading() // at the start of its iteration to get the pointer // to the current read buffer. // If the write thread is not active at this time, // the read buffer pointer will be set to the // (previous) write buffer - so the reader gets the latest data. // If the write buffer is busy, the read pointer is not changed. // In this case the read buffer may contain stale data, // it is up to the user to deal with this case. T * start_reading(void) { std::lock_guard<std::mutex> lock(m_mutex); if (!m_write_busy) { m_read_idx = m_write_idx; } return &m_buf[m_read_idx]; } // The reader thread must call end_reading() // at the end of its iteration. void end_reading(void) { std::lock_guard<std::mutex> lock(m_mutex); m_read_idx = m_write_idx; } private: T m_buf[2]; bool m_write_busy; unsigned int m_read_idx, m_write_idx; std::mutex m_mutex; };

Para evitar datos obsoletos en el subproceso del lector, la estructura de la carga útil está versionada. Para facilitar la transferencia de datos bidireccional entre los hilos, se utilizan dos instancias de la monstruosidad anterior, en direcciones opuestas.


  • ¿Es este esquema seguro para hilos? Si está roto, ¿dónde?
  • ¿Se puede hacer sin el mutex? Tal vez con solo barreras de memoria o instrucciones CAS?
  • ¿Se puede hacer mejor?

Aquí una versión que usa InterlockedExchangePointer() y SLISTs.

Esta solución no admite la re-lectura del último búfer. Pero si es necesario, se puede hacer en el lado del lector por medio de una copia y un if( NULL == doubleBuffer.beginReader(...) ) { use backup copy ... } .
Esto no se hace porque es difícil de agregar, pero porque no es muy realista. Imagine que su último valor conocido se vuelve más y más viejo: segundos, días, semanas. Es poco probable que la aplicación aún quiera usarla. Por lo tanto, tener en cuenta la funcionalidad de releer en el doble búfer le quita flexibilidad a la aplicación.

El búfer doble tiene 1 miembro puntero de lectura. Cuando se llama a beginRead (), este valor se devuelve y se reemplaza atómicamente con NULL. Piense en ello como "El lector TOMA el búfer".
Con endRead() , el lector devuelve el búfer y se agrega a la SLIST, que contiene los búferes disponibles para las operaciones de escritura.

Inicialmente, ambos buffers se agregan a la SLIST, el puntero de lectura es NULL.

beginWrite() el siguiente búfer disponible de la SLIST. Y este valor nunca puede ser NULL, debido a la forma en que se implementa endWrite() .

Por último, no menos importante, endWrite() intercambia de forma atómica el puntero de lectura con el búfer devuelto y recién escrito y, si el puntero de lectura no era NULL, lo empuja hacia la SLIST.

Entonces, incluso si el lado del lector nunca lee, el lado del escritor nunca se queda sin búferes. Cuando el lector lee, obtiene el último valor conocido (¡una vez!).

Contra lo que esta implementación no es segura es si hay varios lectores o escritores simultáneos. Pero ese no era el objetivo en primer lugar.

En el lado feo, los Buffers deben ser estructuras con algún miembro de SLIST_HEADER en la parte superior.

Aquí, el código, pero ten en cuenta que no es mi culpa si tu Mars Rover aterriza en Venus.

const size_t MAX_DATA_SIZE = 512; typedef //__declspec(align(MEMORY_ALLOCATION_ALIGNMENT)) struct DataItem_tag { SLIST_ENTRY listNode; uint8_t data[MAX_DATA_SIZE]; size_t length; } DataItem_t; class CDoubleBuffer { SLIST_HEADER m_writePointers; DataItem_t m_buffers[2]; volatile DataItem_t *m_readPointer; public: CDoubleBuffer() : m_writePointers() , m_buffers() , m_readPointer(NULL) { InitializeSListHead(&m_writePointers); InterlockedPushEntrySList(&m_writePointers, &m_buffers[0].listNode); InterlockedPushEntrySList(&m_writePointers, &m_buffers[1].listNode); } DataItem_t *beginRead() { DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, NULL)); return result; } void endRead(DataItem_t *dataItem) { if (NULL != dataItem) { InterlockedPushEntrySList(&m_writePointers, &dataItem->listNode); } } DataItem_t *beginWrite() { DataItem_t *result = reinterpret_cast<DataItem_t*>(InterlockedPopEntrySList(&m_writePointers)); return result; } void endWrite(DataItem_t *dataItem) { DataItem_t *oldReadPointer = reinterpret_cast<DataItem_t*>(InterlockedExchangePointer((volatile PVOID*)&m_readPointer, dataItem)); if (NULL != oldReadPointer) { InterlockedPushEntrySList(&m_writePointers, &oldReadPointer->listNode); } } };

Y aquí el código de prueba para ello. (Para ambos, el código anterior y el código de prueba que necesita <windows.h> y <assert.h>.)

CDoubleBuffer doubleBuffer; DataItem_t *readValue; DataItem_t *writeValue; // nothing to read yet. Make sure NULL is returned. assert(NULL == doubleBuffer.beginRead()); doubleBuffer.endRead(NULL); // we got nothing, we return nothing. // First write without read writeValue = doubleBuffer.beginWrite(); assert(NULL != writeValue); // if we get NULL here it is a bug. writeValue->length = 0; doubleBuffer.endWrite(writeValue); // Second write without read writeValue = doubleBuffer.beginWrite(); assert(NULL != writeValue); // if we get NULL here it is a bug. writeValue->length = 1; doubleBuffer.endWrite(writeValue); // Third write without read - works because it reuses the old buffer for the new write. writeValue = doubleBuffer.beginWrite(); assert(NULL != writeValue); // if we get NULL here it is a bug. writeValue->length = 2; doubleBuffer.endWrite(writeValue); readValue = doubleBuffer.beginRead(); assert(NULL != readValue); // NULL would obviously be a terrible bug. assert(2 == readValue->length); // We got the latest and greatest? doubleBuffer.endRead(readValue); readValue = doubleBuffer.beginRead(); assert(NULL == readValue); // We expect NULL here. Re-reading is not a feature of this implementation! doubleBuffer.endRead(readValue);

Problema muy interesante! Mucho más complicado de lo que pensé al principio :-) Me gustan las soluciones sin bloqueo, por lo que he intentado trabajar una a continuación.

Hay muchas maneras de pensar acerca de este sistema. Puede modelarlo como un búfer / cola circular de tamaño fijo (con dos entradas), pero luego pierde la capacidad de actualizar el siguiente valor disponible para el consumo, ya que no sabe si el consumidor ha comenzado a leer el último. Valor publicado o todavía está (potencialmente) leyendo el anterior. Por lo tanto, se necesita un estado adicional más allá del búfer de anillo estándar para alcanzar una solución más óptima.

En primer lugar, tenga en cuenta que siempre hay una celda en la que el productor puede escribir de manera segura en cualquier momento dado; si el consumidor lee una celda, la otra puede escribirse en. Llamemos a la celda que puede escribirse de manera segura en la celda "activa" (la celda que puede leerse potencialmente desde cualquier celda no es la activa). La celda activa solo se puede cambiar si la otra celda no se está leyendo.

A diferencia de la celda activa, en la que siempre se puede escribir, la celda no activa solo se puede leer si contiene un valor; Una vez que se consume ese valor, se va. (Esto significa que Livelock se evita en el caso de un productor agresivo; en algún momento, el consumidor habrá vaciado una celda y dejará de tocar las celdas. Una vez que eso suceda, el productor definitivamente puede publicar un valor, mientras que antes de ese punto, solo puede publicar un valor (cambiar la celda activa) si el consumidor no está en medio de una lectura).

Si hay un valor que está listo para ser consumido, solo el consumidor puede cambiar ese hecho (para la celda no activa, de todos modos); las producciones posteriores pueden cambiar qué celda está activa y el valor publicado, pero un valor siempre estará listo para ser leído hasta que se consuma.

Una vez que el productor termina de escribir en la celda activa, puede "publicar" este valor cambiando qué celda es la activa (intercambiando el índice), siempre que el consumidor no esté leyendo la otra celda. Si el consumidor está en el medio de leer la otra celda, el intercambio no puede ocurrir, pero en ese caso el consumidor puede cambiar después de que haya terminado de leer el valor, siempre que el productor no esté en medio de una escritura (y si es así, El productor cambiará una vez que esté hecho. De hecho, en general, el consumidor siempre puede intercambiar después de que termine la lectura (si es la única que accede al sistema) porque los intercambios falsos por parte del consumidor son benignos: si hay algo en la otra celda, entonces el intercambio hará que se lea. A continuación, y si no lo hay, el intercambio no afecta a nada.

Entonces, necesitamos una variable compartida para rastrear qué es la celda activa, y también necesitamos una forma para que tanto el productor como el consumidor indiquen si están en medio de una operación. Podemos almacenar estos tres estados de estado en una variable atómica para poder afectarlos todos a la vez (atómicamente). También necesitamos una forma para que el consumidor verifique si hay algo en la celda no activa en primer lugar, y para que ambos subprocesos modifiquen ese estado según corresponda. Intenté algunos otros enfoques, pero al final, lo más fácil fue incluir esta información en la otra variable atómica. Esto simplifica mucho las cosas, ya que todos los cambios de estado en el sistema son atómicos de esta manera.

He creado una implementación sin esperas (sin bloqueo, y todas las operaciones se completan en un número limitado de instrucciones).

Código de tiempo!

#include <atomic> #include <cstdint> template <typename T> class ProducerConsumerDoubleBuffer { public: ProducerConsumerDoubleBuffer() : m_state(0) { } ~ProducerConsumerDoubleBuffer() { } // Never returns nullptr T* start_writing() { // Increment active users; once we do this, no one // can swap the active cell on us until we''re done auto state = m_state.fetch_add(0x2, std::memory_order_relaxed); return &m_buf[state & 1]; } void end_writing() { // We want to swap the active cell, but only if we were the last // ones concurrently accessing the data (otherwise the consumer // will do it for us when *it''s* done accessing the data) auto state = m_state.load(std::memory_order_relaxed); std::uint32_t flag = (8 << (state & 1)) ^ (state & (8 << (state & 1))); state = m_state.fetch_add(flag - 0x2, std::memory_order_release) + flag - 0x2; if ((state & 0x6) == 0) { // The consumer wasn''t in the middle of a read, we should // swap (unless the consumer has since started a read or // already swapped or read a value and is about to swap). // If we swap, we also want to clear the full flag on what // will become the active cell, otherwise the consumer could // eventually read two values out of order (it reads a new // value, then swaps and reads the old value while the // producer is idle). m_state.compare_exchange_strong(state, (state ^ 0x1) & ~(0x10 >> (state & 1)), std::memory_order_release); } } // Returns nullptr if there appears to be no more data to read yet T* start_reading() { m_readState = m_state.load(std::memory_order_relaxed); if ((m_readState & (0x10 >> (m_readState & 1))) == 0) { // Nothing to read here! return nullptr; } // At this point, there is guaranteed to be something to // read, because the full flag is never turned off by the // producer thread once it''s on; the only thing that could // happen is that the active cell changes, but that can // only happen after the producer wrote a value into it, // in which case there''s still a value to read, just in a // different cell. m_readState = m_state.fetch_add(0x2, std::memory_order_acquire) + 0x2; // Now that we''ve incremented the user count, nobody can swap until // we decrement it return &m_buf[(m_readState & 1) ^ 1]; } void end_reading() { if ((m_readState & (0x10 >> (m_readState & 1))) == 0) { // There was nothing to read; shame to repeat this // check, but if these functions are inlined it might // not matter. Otherwise the API could be changed. // Or just don''t call this method if start_reading() // returns nullptr -- then you could also get rid // of m_readState. return; } // Alright, at this point the active cell cannot change on // us, but the active cell''s flag could change and the user // count could change. We want to release our user count // and remove the flag on the value we read. auto state = m_state.load(std::memory_order_relaxed); std::uint32_t sub = (0x10 >> (state & 1)) | 0x2; state = m_state.fetch_sub(sub, std::memory_order_relaxed) - sub; if ((state & 0x6) == 0 && (state & (0x8 << (state & 1))) == 1) { // Oi, we were the last ones accessing the data when we released our cell. // That means we should swap, but only if the producer isn''t in the middle // of producing something, and hasn''t already swapped, and hasn''t already // set the flag we just reset (which would mean they swapped an even number // of times). Note that we don''t bother swapping if there''s nothing to read // in the other cell. m_state.compare_exchange_strong(state, state ^ 0x1, std::memory_order_relaxed); } } private: T m_buf[2]; // The bottom (lowest) bit will be the active cell (the one for writing). // The active cell can only be switched if there''s at most one concurrent // user. The next two bits of state will be the number of concurrent users. // The fourth bit indicates if there''s a value available for reading // in m_buf[0], and the fifth bit has the same meaning but for m_buf[1]. std::atomic<std::uint32_t> m_state; std::uint32_t m_readState; };

Tenga en cuenta que la semántica es tal que el consumidor nunca puede leer un valor dado dos veces, y un valor que lee siempre es más nuevo que el último valor que leyó. También es bastante eficiente en el uso de la memoria (dos buffers, como su solución original). Evité los bucles CAS porque son generalmente menos eficientes que una sola operación atómica en disputa.

Si decide utilizar el código anterior, le sugiero que primero realice algunas pruebas unitarias integrales (con hilos). Y puntos de referencia adecuados. Lo probé, pero solo apenas. Déjame saber si encuentras algún error :-)

Mi prueba de unidad:

ProducerConsumerDoubleBuffer<int> buf; std::thread producer([&]() { for (int i = 0; i != 500000; ++i) { int* item = buf.start_writing(); if (item != nullptr) { // Always true *item = i; } buf.end_writing(); } }); std::thread consumer([&]() { int prev = -1; for (int i = 0; i != 500000; ++i) { int* item = buf.start_reading(); if (item != nullptr) { assert(*item > prev); prev = *item; } buf.end_reading(); } }); producer.join(); consumer.join();

En cuanto a su implementación original, solo la observé con dificultad (es mucho más divertido diseñar cosas nuevas, jeje), pero la respuesta de david.pfx parece abordar esa parte de su pregunta.

Sí, creo que está roto.

Si el lector hace un inicio / final / inicio en sucesión, actualizará su índice de lectura al índice de escritura, y posiblemente leerá los datos del índice de escritura, incluso si la escritura está ocupada.

El problema esencialmente es que el escritor no sabe qué búfer utilizará el lector, por lo que el escritor debe asegurarse de que ambos búferes sean válidos en todo momento. No se puede hacer eso, si va a tomar algún tiempo escribir datos en un búfer [a menos que haya entendido mal algo de la lógica que no se muestra aquí]

Sí, creo que se puede hacer sin bloqueos, utilizando CAS o lógica equivalente. No voy a tratar de expresar un algoritmo en este espacio. Estoy seguro de que existe, pero no que pueda escribirlo correctamente la primera vez. Y un poco de búsqueda en la web resultó en algunos candidatos plausibles. El IPC sin espera que utiliza CAS parece ser un tema bastante interesante y el tema de algunas investigaciones.

Después de un pensamiento adicional, el algoritmo es el siguiente. Necesitas:

  • 3 buffers: uno para el escritor, uno para el lector, y otro extra. Los buffers están ordenados: forman un anillo (pero vea la nota).
  • Un estado para cada búfer: libre, completo, escritura, lectura.
  • Una función que puede inspeccionar el estado del búfer y cambiar condicionalmente el estado a un valor diferente en una sola operación atómica. Usaré CSET para eso.


Find the first buffer that is FREE or FULL Fail: assert (should never fail, reader can only use one buffer) CSET buffer to WRITING Write into the buffer CSET buffer to FULL


Find first buffer that is FULL Fail: wait (writer may be slow) CSET buffer to READING Read and consume buffer CSET buffer to FREE

Nota: este algoritmo no garantiza que los búferes se traten estrictamente en orden de llegada, y ningún simple cambio lo hará hacerlo. Si esto es importante, el algoritmo debe mejorarse con un número de secuencia en el búfer, establecido por el escritor para que el lector pueda elegir el búfer más reciente.

Dejo el código como detalle de implementación.

La función CSET no es trivial. Tiene que probar atómicamente que una ubicación de memoria compartida particular es igual a un valor esperado y, si es así, cambiarlo a un nuevo valor. Devuelve true si realizó correctamente el cambio y false en caso contrario. La implementación debe evitar las condiciones de carrera si dos subprocesos acceden a la misma ubicación al mismo tiempo (y posiblemente en diferentes procesadores).

La biblioteca de operaciones atómicas estándar de C ++ contiene un conjunto de funciones atomic_compare_exchange que deberían cumplir su función, si están disponibles.