c++ - Spinlock de ensamblaje en línea más rápido
assembly x86 (5)
Estoy escribiendo una aplicación multiproceso en C ++, donde el rendimiento es crítico. Necesito usar mucho bloqueo mientras copio estructuras pequeñas entre hilos, para esto he elegido usar spinlocks.
Hice algunas investigaciones y pruebas de velocidad en esto y encontré que la mayoría de las implementaciones son más o menos igualmente rápidas:
- Microsofts CRITICAL_SECTION, con SpinCount establecido en 1000, obtiene aproximadamente 140 unidades de tiempo
- Implementando este algoritmo con Microsofts InterlockedCompareExchange puntajes de aproximadamente 95 unidades de tiempo
- También intenté usar un ensamblaje en línea con
__asm {}
usando algo como este código y puntúa unas 70 unidades de tiempo, pero no estoy seguro de que se haya creado una barrera de memoria adecuada.
Editar: Los tiempos indicados aquí son el tiempo que tardan 2 hilos para bloquear y desbloquear el bloqueo de giro 1,000,000 de veces.
Sé que esto no es mucha diferencia, pero como un spinlock es un objeto muy utilizado, uno pensaría que los programadores habrían acordado la forma más rápida posible de hacer un spinlock. Sin embargo, buscar en Google conduce a muchos enfoques diferentes. Creo que este método antes mencionado sería el más rápido si se implementa utilizando el ensamblaje en línea y usando la instrucción CMPXCHG8B
lugar de comparar los registros de 32 bits. Además, se deben tener en cuenta las barreras de memoria, esto podría hacerse con LOCK CMPXHG8B (¿en serio?) , Que garantiza "derechos exclusivos" a la memoria compartida entre núcleos. Por último [algunos sugieren] que para las esperas ocupadas debería ir acompañado de NOP: REP que permitiría a los procesadores Hyper-threading cambiar a otro hilo, pero no estoy seguro de si esto es cierto o no.
De mi prueba de rendimiento de diferentes spinlocks, se ve que no hay mucha diferencia, pero con fines puramente académicos, me gustaría saber cuál es el más rápido. Sin embargo, como tengo una experiencia extremadamente limitada en el lenguaje ensamblador y con barreras de memoria, me gustaría que alguien pudiera escribir el código ensamblador para el último ejemplo que proporcioné con LOCK CMPXCHG8B y barreras de memoria adecuadas en la siguiente plantilla:
__asm
{
spin_lock:
;locking code.
spin_unlock:
;unlocking code.
}
Aunque ya hay una respuesta aceptada, hay algunas cosas que no se pudieron utilizar para mejorar todas las respuestas, tomadas de este artículo de Intel, todas las anteriores implementaciones de bloqueo rápido :
- Girar en una lectura volátil, no en una instrucción atómica, esto evita el bloqueo innecesario del bus, especialmente en bloqueos altamente disputados.
- Use retroceso para bloqueos altamente disputados
- Inline the lock, preferiblemente con intrínsecos para los compiladores donde el asm en línea es perjudicial (básicamente MSVC).
Generalmente no soy de los que se quejan de que alguien se esfuerce por lograr un código rápido: generalmente es un ejercicio muy bueno que resulta en una mejor comprensión de la programación y un código más rápido.
No me quejaré aquí tampoco, pero puedo afirmar inequívocamente que la cuestión de un spinlock rápido 3 instrucciones largas o algunas más es, al menos en la arquitectura x86, una búsqueda fútil.
Este es el por qué:
Invocar un spinlock con una secuencia de código típica
lock_variable DW 0 ; 0 <=> free
mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]
; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don''t
Liberar un spinlock es trivial
mov ebx,offset lock_variable
mov dword ptr [ebx],0
La instrucción xchg levanta el pasador de bloqueo en el procesador, lo que en realidad significa que quiero el bus durante los siguientes ciclos de reloj. Esta señal recorre los cachés y desciende hasta el dispositivo de masterización de bus más lento, que generalmente es el bus PCI. Cuando cada dispositivo de busmastering ha completado la señal locka (lock acknowledge) se envía de regreso. Entonces se produce el intercambio real. El problema es que la secuencia de bloqueo / bloqueo lleva MUCHO tiempo. El bus PCI puede funcionar a 33 MHz con varios ciclos de latencia. En una CPU de 3.3 GHz eso significa que cada ciclo de bus PCI toma cien ciclos de CPU.
Como regla general, supongo que un bloqueo requerirá entre 300 y 3000 ciclos de CPU para completarse y, al final, no sé si seré el propietario del bloqueo. Por lo tanto, los pocos ciclos que puede guardar mediante un spinlock "rápido" serán un espejismo porque ningún bloqueo es como el siguiente, sino que dependerá de la situación de su autobús durante ese breve período de tiempo.
________________EDITAR________________
Acabo de leer que el spinlock es un "objeto muy usado". Bueno, obviamente no entiendes que un spinlock consume una enorme cantidad de ciclos de CPU cada vez que se invoca. O, para decirlo de otra manera, cada vez que lo invoca pierde una cantidad significativa de su capacidad de procesamiento.
El truco al usar spinlocks (o su hermano mayor, la sección crítica) es usarlos con la menor cantidad posible mientras se logra la función prevista del programa. Usarlos por todos lados es fácil y terminará con un rendimiento mediocre como resultado.
No se trata solo de escribir código rápido, sino también de organizar sus datos. Cuando escriba "copiar estructuras pequeñas entre hilos", debe tener en cuenta que el bloqueo puede tardar cientos de veces más en completarse que la copia real.
________________EDITAR________________
Cuando calcule un tiempo promedio de bloqueo, probablemente diga muy poco, ya que se mide en su máquina, que puede no ser el objetivo deseado (que puede tener características de uso del bus completamente diferentes). Para su máquina, el promedio estará compuesto de tiempos individuales muy rápidos (cuando las actividades de masterización del bus no interfirieron) hasta tiempos muy lentos (cuando la interferencia de masterización del bus fue significativa).
Podría introducir un código que determine los casos más rápidos y más lentos, y calcule el cociente para ver hasta qué punto pueden variar los tiempos de los bloqueos.
________________EDITAR________________
Actualización de mayo de 2016
Peter Cordes promovió la idea de que "ajustar un bloqueo en el caso no contencioso puede tener sentido" y que los tiempos de bloqueo de muchos cientos de ciclos de reloj no ocurren en CPU modernas excepto en situaciones donde la variable de bloqueo está desalineada. Comencé a preguntarme si mi programa de prueba anterior , escrito en Watcom C de 32 bits, podría verse obstaculizado por WOW64 ya que se estaba ejecutando en un sistema operativo de 64 bits: Windows 7.
Así que escribí un programa de 64 bits y lo compilé con TDG gcc 5.3. El programa utiliza la variante de instrucción "XCHG r, m" para el bloqueo y una asignación simple "MOV m, r" para el desbloqueo. En algunas variantes de bloqueo, la variable de bloqueo se probó previamente para determinar si era factible incluso intentar un bloqueo (usando una comparación simple "CMP r, m", probablemente sin aventurarse fuera de L3). Aquí está:
// compiler flags used:
// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0
#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD
typedef unsigned char u1;
typedef unsigned short u2;
typedef unsigned long u4;
typedef unsigned int ud;
typedef unsigned long long u8;
typedef signed char i1;
typedef short i2;
typedef long i4;
typedef int id;
typedef long long i8;
typedef float f4;
typedef double f8;
#define usizeof(a) ((ud)sizeof(a))
#define LOOPS 25000000
#include <stdio.h>
#include <windows.h>
#ifndef bool
typedef signed char bool;
#endif
u8 CPU_rdtsc (void)
{
ud tickl, tickh;
__asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
return ((u8)tickh << 32)|tickl;
}
volatile u8 bus_lock (volatile u8 * block, u8 value)
{
__asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");
return value;
}
void bus_unlock (volatile u8 * block, u8 value)
{
__asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}
void rfence (void)
{
__asm__ __volatile__( "lfence" : : : "memory");
}
void rwfence (void)
{
__asm__ __volatile__( "mfence" : : : "memory");
}
void wfence (void)
{
__asm__ __volatile__( "sfence" : : : "memory");
}
volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
return (bool)(*lockVariablePointer == 0ull);
}
volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}
void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
*lockVariablePointer = 0ull;
}
static volatile u8 lockVariable = 0ull,
lockCounter = 0ull;
static volatile i8 threadHold = 1;
static u8 tstr[4][32]; /* 32*8=256 bytes for each thread''s parameters should result in them residing in different cache lines */
struct LOCKING_THREAD_STRUCTURE
{
u8 numberOfFailures, numberOfPreTests;
f8 clocksPerLock, failuresPerLock, preTestsPerLock;
u8 threadId;
HANDLE threadHandle;
ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};
DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
ud n = LOOPS;
u8 clockCycles;
SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);
while (threadHold) {}
clockCycles = CPU_rdtsc ();
while (n)
{
Sleep (0);
#ifdef CLASSIC_BUS_LOCK
while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
while (1)
{
do
{
++ltsp->numberOfPreTests;
} while (!LOCK_spinlockPreTestIfFree (&lockVariable));
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
#else
while (1)
{
++ltsp->numberOfPreTests;
if (LOCK_spinlockPreTestIfFree (&lockVariable))
{
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
}
#endif
#endif
++lockCounter;
LOCK_spinlockLeave (&lockVariable);
#ifdef CLASSIC_BUS_LOCK
while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
while (1)
{
do
{
++ltsp->numberOfPreTests;
} while (!LOCK_spinlockPreTestIfFree (&lockVariable));
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
#else
while (1)
{
++ltsp->numberOfPreTests;
if (LOCK_spinlockPreTestIfFree (&lockVariable))
{
if (!LOCK_spinlockFailed (&lockVariable)) break;
++ltsp->numberOfFailures;
}
}
#endif
#endif
--lockCounter;
LOCK_spinlockLeave (&lockVariable);
n-=2;
}
clockCycles = CPU_rdtsc ()-clockCycles;
ltsp->clocksPerLock = (f8)clockCycles/ (f8)LOOPS;
ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;
//rwfence ();
ltsp->idx = 4u;
ExitThread (0);
return 0;
}
int main (int argc, char *argv[])
{
u8 processAffinityMask, systemAffinityMask;
memset (tstr, 0u, usizeof(tstr));
lts[0]->idx = 3;
lts[1]->idx = 2;
lts[2]->idx = 1;
lts[3]->idx = 0;
GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);
SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
SetThreadAffinityMask (GetCurrentThread (), 1ull);
lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)<s[0]->threadId);
#ifndef SINGLE_THREAD
lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)<s[1]->threadId);
lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)<s[2]->threadId);
lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)<s[3]->threadId);
#endif
SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);
threadHold = 0;
#ifdef SINGLE_THREAD
while (lts[0]->idx<4u) {Sleep (1);}
#else
while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif
printf ("T0:%1.1f,%1.1f,%1.1f/n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
printf ("T1:%1.1f,%1.1f,%1.1f/n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
printf ("T2:%1.1f,%1.1f,%1.1f/n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
printf ("T3:%1.1f,%1.1f,%1.1f/n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);
printf ("T*:%1.1f,%1.1f,%1.1f/n", (lts[0]->clocksPerLock+ lts[1]->clocksPerLock+ lts[2]->clocksPerLock+ lts[3]->clocksPerLock)/ 4.,
(lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
(lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);
printf ("LC:%u/n", (ud)lockCounter);
return 0;
}
El programa se ejecutó en una computadora DELL i5-4310U con DDR3-800, 2 núcleos / 2 HT capaces de 2.7GHz y un caché L3 común.
Para empezar, parece que el impacto de WOW64 fue insignificante.
Un único hilo que realiza bloqueos / desbloqueos no controlados fue capaz de hacerlo una vez cada 110 ciclos. Ajustar el bloqueo no deseado es inútil: cualquier código agregado para mejorar la instrucción XCHG solo lo hará más lento.
Con cuatro HT que bombardean la variable de bloqueo con intentos de bloqueo, la situación cambia radicalmente. El tiempo requerido para lograr un bloqueo exitoso salta a 994 ciclos de los cuales una parte significativa se puede atribuir a los 2.2 intentos fallidos de bloqueo. Para decirlo de otra manera, en una situación de alta contención, se deben intentar 3.2 bloqueos promedio para lograr un bloqueo exitoso. Obviamente, los 110 ciclos no se han convertido en 110 * 3.2, sino más cerca de 110 * 9. Entonces, otros mecanismos están en juego, al igual que con las pruebas en la máquina más antigua. Además, el promedio de 994 ciclos abarca un rango entre 716 y 1157
Las variantes de bloqueo que implementan la prueba previa requirieron aproximadamente el 95% de los ciclos reutilizados por la variante más simple (XCHG). En promedio, realizarían 17 CMP para encontrar factible intentar 1.75 bloqueos de los cuales 1 fue exitoso. Recomiendo usar la prueba previa no solo porque es más rápida: impone menos tensión en el mecanismo de bloqueo del bus (3.2-1.75 = 1.45 menos intentos de bloqueo) a pesar de que aumenta ligeramente la complejidad.
Sólo preguntaba:
Antes de profundizar en el spinlock y las estructuras de datos casi sin bloqueo:
¿Usted, en sus puntos de referencia y su aplicación, se aseguró de que los hilos de la competencia estén garantizados para ejecutarse en diferentes núcleos?
Si no, puede terminar con un programa que funciona bien en su máquina de desarrollo, pero que chupa / falla mucho en el campo porque un hilo debe ser tanto el casillero como el desbloqueador de su spinlock.
Para darle una cifra: en Windows tiene una porción de tiempo estándar de 10 milisegundos. Si no te aseguras de que dos hilos físicos están involucrados en el bloqueo / desbloqueo terminarás con alrededor de 500 bloqueos / desbloqueos por segundo, y este resultado será muy poco
Solo mira aquí: spinlock x86 usando cmpxchg
Y gracias a Cory Nelson
__asm{
spin_lock:
xorl %ecx, %ecx
incl %ecx
spin_lock_retry:
xorl %eax, %eax
lock; cmpxchgl %ecx, (lock_addr)
jnz spin_lock_retry
ret
spin_unlock:
movl $0 (lock_addr)
ret
}
Y otra fuente dice: http://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm
lock cmpxchg8b qword ptr [esi]
is replaceable with the following sequence
try:
lock bts dword ptr [edi],0
jnb acquired
wait:
test dword ptr [edi],1
je try
pause ; if available
jmp wait
acquired:
cmp eax,[esi]
jne fail
cmp edx,[esi+4]
je exchange
fail:
mov eax,[esi]
mov edx,[esi+4]
jmp done
exchange:
mov [esi],ebx
mov [esi+4],ecx
done:
mov byte ptr [edi],0
Y aquí hay una discusión sobre las implementaciones lock-free vs lock: http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html
Wikipedia tiene un buen artículo sobre spinlocks, aquí está la implementación x86
http://en.wikipedia.org/wiki/Spinlock#Example_implementation
Observe que su implementación no usa el prefijo "lock", porque es redundante en x86 para la instrucción "xchg"; implícitamente tiene semántica de bloqueo, como se discutió en esta discusión de :
En un multinúcleo x86, ¿es necesario un BLOQUEO como prefijo de XCHG?
El REP: NOP es un alias para la instrucción PAUSE, puede obtener más información al respecto aquí
¿Cómo funciona la instrucción pausa x86 en spinlock * y * puede usarse en otros escenarios?
Sobre el tema de las barreras de memoria, aquí hay todo lo que tal vez quieras saber
Barreras de memoria: una vista de hardware para piratas informáticos por Paul E. McKenney
http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf