c multithreading concurrency atomic compare-and-swap

Incrementar atómicamente dos enteros con CAS



multithreading concurrency (3)

Hazme pensar en una secuencia de bloqueo. No es muy preciso (poner esto de memoria) pero algo así como:

sean x, y y s los enteros de 64 bits.

Para incrementar:

atomic s++ (me refiero al incremento atómico usando la operación CAS de 64 bits)

memory barrier atomic x++ atomic y++ atomic s++ memory barrier

Leer:

do { S1 = load s X = load x Y = load y memory barrier S2 = load s } while (S1 != S2)

También vea https://en.wikipedia.org/wiki/Seqlock

Aparentemente, es posible incrementar atómicamente dos enteros con instrucciones de comparar y cambiar. Esta charla afirma que tal algoritmo existe, pero no detalla cómo se ve.

¿Cómo se puede hacer esto?

(Tenga en cuenta que la solución obvia de incrementar los enteros uno tras otro no es atómica. Además, el relleno de múltiples enteros en una palabra de máquina no cuenta porque restringiría el rango posible).


Si sse2 está disponible, puede usar paddq para agregar 2 enteros de 64 bits a otros dos enteros de 64 bits en una sola instrucción.

#include "emmintrin.h" //initialize your values somewhere: //const __m128i ones = _mm_set1_epi64x(1); //volatile register __m128i vars = // _mm_set_epi64x(24,7); static inline __m128i inc_both(__m128i vars, __m128i ones){ return _mm_add_epi64(vars,ones); }

Esto debería compilar para

paddq %xmm0, %xmm1

Sin embargo, como está en línea estática, puede usar otros registros xmm. Si hay una presión de registro significativa, los operandos pueden convertirse en unos (℅rip)

Nota: esto se puede usar para agregar valores que no sean 1 y hay operaciones similares para la mayoría de las otras matemáticas, bit a bit y instrucciones de comparación, si las necesita.

Por lo tanto, puede usar el prefijo de bloqueo y convertirlo en una macro asm en línea

#define inc64x2(vars) asm volatile( / "paddq %0, %1/n":"+x"(vars):"x"(ones) / );

El equivalente de neón del brazo es algo como: vaddq_s64 (...), pero aquí hay un gran artículo sobre los equivalentes de arm / x86.


Tengo una solución que he probado. Aquí se encuentra un programa de prueba de concepto de sopa a nueces.

El algoritmo es un "uso de puerta de ID de subproceso CAS" como el 3er entero. Vi el video hablar dos veces, y creo que esto califica. Puede que no sea el algoritmo en el que el presentador estaba pensando, pero funciona.

Los valores de X e Y pueden estar en cualquier lugar de la memoria y el programa los coloca lo suficientemente alejados entre sí como para que estén en diferentes líneas de caché. Realmente no importa

Una descripción rápida del algoritmo:

Cada hilo tiene un unique id number o tid (distinto de cero), tomado de la fuente favorita: pthead_t , getpid , gettid , gettid make one up by whatever means you want . En el programa, los asigna secuencialmente a partir de 1.

Cada hilo llamará a la función de incremento con este número.

La función de incremento girará en una variable de gate global utilizando CAS con un valor antiguo de 0 y un nuevo valor de tid .

Cuando el CAS tiene éxito, el hilo ahora "posee" cosas. En otras palabras, si la gate es cero, está en juego. Un valor distinto de cero es el tid del propietario y la gate está bloqueada.

Ahora, el propietario es libre de incrementar los valores de X e Y con x += 1 simple y y += 1 .

Después de eso, la función de incremento se libera al hacer un almacenamiento de 0 en la gate .

Aquí está el programa de diagnóstico / prueba de concepto con todo. El algoritmo en no tiene restricciones, pero lo codifiqué para mi máquina.

Algunas advertencias:

  • Se asume gcc / clang
  • Se asume un arco x86_64 64 bits.
  • Esto se codificó usando nada más que asm en línea y no necesita [ni utiliza ningún] soporte atomic compilador para mayor claridad, simplicidad y transparencia.
  • Esto se construyó bajo Linux, pero debería funcionar en cualquier máquina / sistema operativo x86 "razonable" (por ejemplo, BSD, OSX debería estar bien, probablemente cygwin y mingw)
  • Otros arcos están bien si son compatibles con CAS, yo no los codifiqué (por ejemplo, el arm podría funcionar si se codifica el CAS con pares ldex/stex )
  • Hay suficientes primitivas abstractas que esto sería / debería ser fácil.
  • No intente la compatibilidad con Windows [si lo desea, haga su propio puerto, pero no me envíe lágrimas, o comentarios :-)].
  • El makefile y el programa han sido predeterminados a los mejores valores
  • Es posible que algunas CPU x86 necesiten usar diferentes valores predeterminados (por ejemplo, necesitan instrucciones de cercado). Ver el makefile.

De todos modos, aquí está:

// caslock -- prove cas lock algorithm #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <time.h> #include <pthread.h> #define systls __thread // repeat the madness only once #ifdef __clang__ #define inline_common inline #else #define inline_common static inline #endif #define inline_always inline_common __attribute__((__always_inline__)) #define inline_never __attribute__((__noinline__)) // WARNING: inline CAS fails for gcc but works for clang! #if _USE_CASINLINE_ #define inline_cas inline_always #else #define inline_cas inline_never #endif typedef unsigned int u32; typedef unsigned long long u64; #ifndef LOOPMAX #define LOOPMAX 1000000 #endif #ifndef TIDMAX #define TIDMAX 20 #endif #if _USE_VPTR_ typedef volatile u32 *xptr32_p; typedef volatile u64 *xptr64_p; #else typedef u32 *xptr32_p; typedef u64 *xptr64_p; #endif #if _USE_TID64_ typedef u64 tid_t; #define tidload(_xptr) loadu64(_xptr) #define tidcas(_xptr,_oval,_nval) casu64(_xptr,_oval,_nval) #define tidstore(_xptr,_nval) storeu64(_xptr,_nval) #else typedef u32 tid_t; #define tidload(_xptr) loadu32(_xptr) #define tidcas(_xptr,_oval,_nval) casu32(_xptr,_oval,_nval) #define tidstore(_xptr,_nval) storeu32(_xptr,_nval) #endif tid_t tidgate; // gate control tid_t readycnt; // number of threads ready tid_t donecnt; // number of threads complete // ensure that the variables are nowhere near each other u64 ary[100]; #define kickoff ary[32] // sync to fire threads #define xval ary[31] // the X value #define yval ary[87] // the Y value int inctype; // increment algorithm to use tid_t tidmax; // maximum number of tasks u64 loopmax; // loop maximum for each task // task control struct tsk { tid_t tsk_tid; // task id u32 tsk_casmiss; // cas miss count }; typedef struct tsk tsk_t; tsk_t *tsklist; // task list systls tsk_t *tskcur; // current task block // show progress #define PGR(_pgr) / do { / fputs(_pgr,stdout); / fflush(stdout); / } while (0) // NOTE: some x86 arches need fence instructions // 0 -- no fence instructions // 1 -- use mfence // 2 -- use lfence/sfence #if _USE_BARRIER_ == 0 #define BARRIER_RELEASE "" #define BARRIER_ACQUIRE "" #define BARRIER_ALL "" #elif _USE_BARRIER_ == 1 #define BARRIER_ACQUIRE "/tmfence/n" #define BARRIER_RELEASE "/tmfence/n" #define BARRIER_ALL "/tmfence/n" #elif _USE_BARRIER_ == 2 #define BARRIER_ACQUIRE "/tlfence/n" #define BARRIER_RELEASE "/tsfence/n" #define BARRIER_ALL "/tmfence/n" #else #error caslock: unknown barrier type #endif // barrier_acquire -- acquire barrier inline_always void barrier_acquire(void) { __asm__ __volatile__ ( BARRIER_ACQUIRE : : : "memory"); } // barrier_release -- release barrier inline_always void barrier_release(void) { __asm__ __volatile__ ( BARRIER_RELEASE : : : "memory"); } // barrier -- barrier inline_always void barrier(void) { __asm__ __volatile__ ( BARRIER_ALL : : : "memory"); } // casu32 -- compare and exchange four bytes // RETURNS: 1=ok, 0=fail inline_cas int casu32(xptr32_p xptr,u32 oldval,u32 newval) { char ok; __asm__ __volatile__ ( " lock/n" " cmpxchg %[newval],%[xptr]/n" " sete %[ok]/n" : [ok] "=r" (ok), [xptr] "=m" (*xptr) : "a" (oldval), [newval] "r" (newval) : "memory"); return ok; } // casu64 -- compare and exchange eight bytes // RETURNS: 1=ok, 0=fail inline_cas int casu64(xptr64_p xptr,u64 oldval,u64 newval) { char ok; __asm__ __volatile__ ( " lock/n" " cmpxchg %[newval],%[xptr]/n" " sete %[ok]/n" : [ok] "=r" (ok), [xptr] "=m" (*xptr) : "a" (oldval), [newval] "r" (newval) : "memory"); return ok; } // loadu32 -- load value with barrier // RETURNS: loaded value inline_always u32 loadu32(const xptr32_p xptr) { u32 val; barrier_acquire(); val = *xptr; return val; } // loadu64 -- load value with barrier // RETURNS: loaded value inline_always u64 loadu64(const xptr64_p xptr) { u64 val; barrier_acquire(); val = *xptr; return val; } // storeu32 -- store value with barrier inline_always void storeu32(xptr32_p xptr,u32 val) { *xptr = val; barrier_release(); } // storeu64 -- store value with barrier inline_always void storeu64(xptr64_p xptr,u64 val) { *xptr = val; barrier_release(); } // qsleep -- do a quick sleep inline_always void qsleep(int bigflg) { struct timespec ts; if (bigflg) { ts.tv_sec = 1; ts.tv_nsec = 0; } else { ts.tv_sec = 0; ts.tv_nsec = 1000; } nanosleep(&ts,NULL); } // incby_tidgate -- increment by using thread id gate void incby_tidgate(tid_t tid) // tid -- unique id for accessing entity (e.g. thread id) { tid_t *gptr; tid_t oval; gptr = &tidgate; // acquire the gate while (1) { oval = 0; // test mode -- just do a nop instead of CAS to prove diagnostic #if _USE_CASOFF_ *gptr = oval; break; #else if (tidcas(gptr,oval,tid)) break; #endif ++tskcur->tsk_casmiss; } #if _USE_INCBARRIER_ barrier_acquire(); #endif // increment the values xval += 1; yval += 1; #if _USE_INCBARRIER_ barrier_release(); #endif // release the gate // NOTE: CAS will always provide a barrier #if _USE_CASPOST_ && (_USE_CASOFF_ == 0) oval = tidcas(gptr,tid,0); #else tidstore(gptr,0); #endif } // tskcld -- child task void * tskcld(void *arg) { tid_t tid; tid_t oval; u64 loopcur; tskcur = arg; tid = tskcur->tsk_tid; // tell master thread that we''re fully ready while (1) { oval = tidload(&readycnt); if (tidcas(&readycnt,oval,oval + 1)) break; } // wait until we''re given the starting gun while (1) { if (loadu64(&kickoff)) break; qsleep(0); } // do the increments for (loopcur = loopmax; loopcur > 0; --loopcur) incby_tidgate(tid); barrier(); // tell master thread that we''re fully complete while (1) { oval = tidload(&donecnt); if (tidcas(&donecnt,oval,oval + 1)) break; } return (void *) 0; } // tskstart -- start a child task void tskstart(tid_t tid) { pthread_attr_t attr; pthread_t thr; int err; tsk_t *tsk; tsk = tsklist + tid; tsk->tsk_tid = tid; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr,1); err = pthread_create(&thr,&attr,tskcld,tsk); pthread_attr_destroy(&attr); if (err) printf("tskstart: error -- err=%d/n",err); } // tskall -- run a single test void tskall(void) { tid_t tidcur; tsk_t *tsk; u64 incmax; u64 val; int err; xval = 0; yval = 0; kickoff = 0; readycnt = 0; donecnt = 0; tidgate = 0; // prealloc the task blocks tsklist = calloc(tidmax + 1,sizeof(tsk_t)); // start all tasks PGR(" St"); for (tidcur = 1; tidcur <= tidmax; ++tidcur) tskstart(tidcur); // wait for all tasks to be fully ready PGR(" Sw"); while (1) { if (tidload(&readycnt) == tidmax) break; qsleep(1); } // the starting gun -- all tasks are waiting for this PGR(" Ko"); storeu64(&kickoff,1); // wait for all tasks to be fully done PGR(" Wd"); while (1) { if (tidload(&donecnt) == tidmax) break; qsleep(1); } PGR(" Done/n"); // check the final count incmax = loopmax * tidmax; // show per-task statistics for (tidcur = 1; tidcur <= tidmax; ++tidcur) { tsk = tsklist + tidcur; printf("tskall: tsk=%llu tsk_casmiss=%d (%.3f%%)/n", (u64) tidcur,tsk->tsk_casmiss,(double) tsk->tsk_casmiss / loopmax); } err = 0; // check for failure val = loadu64(&xval); if (val != incmax) { printf("tskall: xval fault -- xval=%lld incmax=%lld/n",val,incmax); err = 1; } // check for failure val = loadu64(&yval); if (val != incmax) { printf("tskall: yval fault -- yval=%lld incmax=%lld/n",val,incmax); err = 1; } if (! err) printf("tskall: SUCCESS/n"); free(tsklist); } // main -- master control int main(void) { loopmax = LOOPMAX; tidmax = TIDMAX; inctype = 0; tskall(); return 0; }

Aquí está el Makefile. Lo siento por la placa extra:

# caslock/Makefile -- make file for caslock # # options: # LOOPMAX -- maximum loops / thread # # TIDMAX -- maximum number of threads # # BARRIER -- generate fence/barrier instructions # 0 -- none # 1 -- use mfence everywhere # 2 -- use lfence for acquire, sfence for release # # CASOFF -- disable CAS to prove diagnostic works # 0 -- normal mode # 1 -- inhibit CAS during X/Y increment # # CASINLINE -- inline the CAS functions # 0 -- do _not_ inline # 1 -- inline them (WARNING: this fails for gcc but works for clang!) # # CASPOST -- increment gate release mode # 0 -- use fenced store # 1 -- use CAS store (NOTE: not really required) # # INCBARRIER -- use extra barriers around increments # 0 -- rely on CAS for barrier # 1 -- add extra safety barriers immediately before increment of X/Y # # TID64 -- use 64 bit thread "id"s # 0 -- use 32 bit # 1 -- use 64 bit # # VPTR -- use volatile pointers in function definitions # 0 -- use ordinary pointers # 1 -- use volatile pointers (NOTE: not really required) ifndef _CASLOCK_MK_ _CASLOCK_MK_ = 1 OLIST += caslock.o ifndef LOOPMAX LOOPMAX = 1000000 endif ifndef TIDMAX TIDMAX = 20 endif ifndef BARRIER BARRIER = 0 endif ifndef CASINLINE CASINLINE = 0 endif ifndef CASOFF CASOFF = 0 endif ifndef CASPOST CASPOST = 0 endif ifndef INCBARRIER INCBARRIER = 0 endif ifndef TID64 TID64 = 0 endif ifndef VPTR VPTR = 0 endif CFLAGS += -DLOOPMAX=$(LOOPMAX) CFLAGS += -DTIDMAX=$(TIDMAX) CFLAGS += -D_USE_BARRIER_=$(BARRIER) CFLAGS += -D_USE_CASINLINE_=$(CASINLINE) CFLAGS += -D_USE_CASOFF_=$(CASOFF) CFLAGS += -D_USE_CASPOST_=$(CASPOST) CFLAGS += -D_USE_INCBARRIER_=$(INCBARRIER) CFLAGS += -D_USE_TID64_=$(TID64) CFLAGS += -D_USE_VPTR_=$(VPTR) STDLIB += -lpthread ALL += caslock CLEAN += caslock OVRPUB := 1 ifndef OVRTOP OVRTOP := $(shell pwd) OVRTOP := $(dir $(OVRTOP)) endif endif # ovrlib/rules.mk -- rules control # # options: # GDB -- enable debug symbols # 0 -- normal # 1 -- use -O0 and define _USE_GDB_=1 # # CLANG -- use clang instead of gcc # 0 -- use gcc # 1 -- use clang # # BNC -- enable benchmarks # 0 -- normal mode # 1 -- enable benchmarks for function enter/exit pairs ifdef OVRPUB ifndef SDIR SDIR := $(shell pwd) STAIL := $(notdir $(SDIR)) endif ifndef GENTOP GENTOP := $(dir $(SDIR)) endif ifndef GENDIR GENDIR := $(GENTOP)/$(STAIL) endif ifndef ODIR ODIR := $(GENDIR) endif PROTOLST := true PROTOGEN := @true endif ifndef SDIR $(error rules: SDIR not defined) endif ifndef ODIR $(error rules: ODIR not defined) endif ifndef GENDIR $(error rules: GENDIR not defined) endif ifndef GENTOP $(error rules: GENTOP not defined) endif ifndef _RULES_MK_ _RULES_MK_ = 1 CLEAN += *.proto CLEAN += *.a CLEAN += *.o CLEAN += *.i CLEAN += *.dis CLEAN += *.TMP QPROTO := $(shell $(PROTOLST) -i -l -O$(GENTOP) $(SDIR)/*.c $(CPROTO)) HDEP += $(QPROTO) ###VPATH += $(GENDIR) ###VPATH += $(SDIR) ifdef INCLUDE_MK -include $(INCLUDE_MK) endif ifdef GSYM CFLAGS += -gdwarf-2 endif ifdef GDB CFLAGS += -gdwarf-2 DFLAGS += -D_USE_GDB_ else CFLAGS += -O2 endif ifndef ZPRT DFLAGS += -D_USE_ZPRT_=0 endif ifdef BNC DFLAGS += -D_USE_BNC_=1 endif ifdef CLANG CC := clang endif DFLAGS += -I$(GENTOP) DFLAGS += -I$(OVRTOP) CFLAGS += -Wall -Werror CFLAGS += -Wno-unknown-pragmas CFLAGS += -Wempty-body CFLAGS += -fno-diagnostics-color # NOTE: we now need this to prevent inlining (enabled at -O2) ifndef CLANG CFLAGS += -fno-inline-small-functions endif # NOTE: we now need this to prevent inlining (enabled at -O3) CFLAGS += -fno-inline-functions CFLAGS += $(DFLAGS) endif all: $(PREP) proto $(ALL) %.o: %.c $(HDEP) $(CC) $(CFLAGS) -c -o $*.o $< %.i: %.c cpp $(DFLAGS) -P $*.c > $*.i %.s: %.c $(CC) $(CFLAGS) -S -o $*.s $< # build a library (type (2) build) $(LIBNAME):: $(OLIST) ar rv $@ $(OLIST) .PHONY: proto proto:: $(PROTOGEN) -i -v -O$(GENTOP) $(SDIR)/*.c $(CPROTO) .PHONY: clean clean:: rm -f $(CLEAN) .PHONY: help help:: egrep ''^#'' Makefile caslock:: $(OLIST) $(LIBLIST) $(STDLIB) $(CC) $(CFLAGS) -o caslock $(OLIST) $(LIBLIST) $(STDLIB)

NOTA: Es posible que haya volado algunas de las restricciones de asm porque al realizar la función CAS como en línea, la compilación con gcc produce resultados incorrectos. Sin embargo, clang funciona bien con inline. Por lo tanto, el valor predeterminado es que la función CAS no está en línea. Por coherencia, no usé un valor predeterminado diferente para gcc / clang, aunque pude.

Aquí está el desmontaje de la función relevante con inline según lo construido por gcc (esto falla):

00000000004009c0 <incby_tidgate>: 4009c0: 31 c0 xor %eax,%eax 4009c2: f0 0f b1 3d 3a 1a 20 lock cmpxchg %edi,0x201a3a(%rip) # 602404 <tidgate> 4009c9: 00 4009ca: 0f 94 c2 sete %dl 4009cd: 84 d2 test %dl,%dl 4009cf: 75 23 jne 4009f4 <L01> 4009d1: 0f 1f 80 00 00 00 00 nopl 0x0(%rax) 4009d8:L00 64 48 8b 14 25 f8 ff mov %fs:0xfffffffffffffff8,%rdx 4009df: ff ff 4009e1: 83 42 04 01 addl $0x1,0x4(%rdx) 4009e5: f0 0f b1 3d 17 1a 20 lock cmpxchg %edi,0x201a17(%rip) # 602404 <tidgate> 4009ec: 00 4009ed: 0f 94 c2 sete %dl 4009f0: 84 d2 test %dl,%dl 4009f2: 74 e4 je 4009d8 <L00> 4009f4:L01 48 83 05 dc 17 20 00 addq $0x1,0x2017dc(%rip) # 6021d8 <ary+0xf8> 4009fb: 01 4009fc: 48 83 05 94 19 20 00 addq $0x1,0x201994(%rip) # 602398 <ary+0x2b8> 400a03: 01 400a04: c7 05 f6 19 20 00 00 movl $0x0,0x2019f6(%rip) # 602404 <tidgate> 400a0b: 00 00 00 400a0e: c3 retq

Aquí está el desmontaje de la función relevante con inline según lo construido por clang (esto tiene éxito):

0000000000400990 <incby_tidgate>: 400990: 31 c0 xor %eax,%eax 400992: f0 0f b1 3d 3a 1a 20 lock cmpxchg %edi,0x201a3a(%rip) # 6023d4 <tidgate> 400999: 00 40099a: 0f 94 c0 sete %al 40099d: eb 1a jmp 4009b9 <L01> 40099f: 90 nop 4009a0:L00 64 48 8b 04 25 f8 ff mov %fs:0xfffffffffffffff8,%rax 4009a7: ff ff 4009a9: ff 40 04 incl 0x4(%rax) 4009ac: 31 c0 xor %eax,%eax 4009ae: f0 0f b1 3d 1e 1a 20 lock cmpxchg %edi,0x201a1e(%rip) # 6023d4 <tidgate> 4009b5: 00 4009b6: 0f 94 c0 sete %al 4009b9:L01 84 c0 test %al,%al 4009bb: 74 e3 je 4009a0 <L00> 4009bd: 48 ff 05 e4 17 20 00 incq 0x2017e4(%rip) # 6021a8 <ary+0xf8> 4009c4: 48 ff 05 9d 19 20 00 incq 0x20199d(%rip) # 602368 <ary+0x2b8> 4009cb: c7 05 ff 19 20 00 00 movl $0x0,0x2019ff(%rip) # 6023d4 <tidgate> 4009d2: 00 00 00 4009d5: c3 retq 4009d6: 66 2e 0f 1f 84 00 00 nopw %cs:0x0(%rax,%rax,1) 4009dd: 00 00 00