para matriz matrices estructura dispersas dispersa datos codigo aplicaciones c++ optimization simd sparse-array avx2

c++ - matrices - Compresión de matriz dispersa usando SIMD(AVX2)



matrices dispersas java (3)

Aunque el conjunto de instrucciones AVX2 tiene muchas instrucciones GATHER, pero su rendimiento es demasiado lento. Y la forma más efectiva de hacerlo: procesar una matriz manualmente.

Tengo una matriz dispersa a (principalmente ceros):

unsigned char a[1000000];

y me gustaría crear una matriz b de índices para elementos distintos de cero usando instrucciones SIMD en la arquitectura Intel x64 con AVX2. Estoy buscando consejos sobre cómo hacerlo de manera eficiente. Específicamente, ¿hay instrucción (es) SIMD para obtener posiciones de elementos consecutivos distintos de cero en el registro SIMD, dispuestos de forma contigua?


Cinco métodos para calcular los índices de los nonzeros son:

  • Bucle semiautomático: cargue un vector SIMD con caracteres, compare con cero y aplique una máscara de movimiento. Use un pequeño bucle escalar si alguno de los caracteres no es cero (también sugerido por @stgatilov ). Esto funciona bien para matrices muy dispersas. La función arr2ind_movmsk en el siguiente código usa instrucciones BMI1 para el bucle escalar.

  • Bucle vectorizado: los procesadores Intel Haswell y los nuevos soportan los conjuntos de instrucciones BMI1 y BMI2. BMI2 contiene la instrucción pext ( extracto de Parallel bits, ver el enlace de wikipedia ), que resulta útil aquí. Ver arr2ind_pext en el siguiente código.

  • Bucle escalar clásico con instrucción if: arr2ind_if .

  • Bucle escalar sin ramas: arr2ind_cmov .

  • Tabla de búsqueda: @stgatilov muestra que es posible usar una tabla de búsqueda en lugar del pdep y otras instrucciones enteras. Sin embargo, esto podría funcionar bien, la tabla de búsqueda es bastante grande: no cabe en la memoria caché L1. No probado aquí. Ver también la discusión aquí .


/* gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros: ./a.out 20000 25 */ #include <stdio.h> #include <immintrin.h> #include <stdint.h> #include <omp.h> #include <string.h> __attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0, k; __m256i msk; m0=0; for (i=0;i<n;i=i+32){ /* Load 32 bytes and compare with zero: */ msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256()); k=_mm256_movemask_epi8(msk); k=~k; /* Search for nonzero bits instead of zero bits. */ while (k){ ind[m0]=i+_tzcnt_u32(k); /* Count the number of trailing zero bits in k. */ m0++; k=_blsr_u32(k); /* Clear the lowest set bit in k. */ } } *m=m0; return 0; } __attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0; uint64_t cntr_const = 0xFEDCBA9876543210; __m256i shft = _mm256_set_epi64x(0x04,0x00,0x04,0x00); __m256i vmsk = _mm256_set1_epi8(0x0F); __m256i cnst16 = _mm256_set1_epi32(16); __m256i shf_lo = _mm256_set_epi8(0x80,0x80,0x80,0x0B, 0x80,0x80,0x80,0x03, 0x80,0x80,0x80,0x0A, 0x80,0x80,0x80,0x02, 0x80,0x80,0x80,0x09, 0x80,0x80,0x80,0x01, 0x80,0x80,0x80,0x08, 0x80,0x80,0x80,0x00); __m256i shf_hi = _mm256_set_epi8(0x80,0x80,0x80,0x0F, 0x80,0x80,0x80,0x07, 0x80,0x80,0x80,0x0E, 0x80,0x80,0x80,0x06, 0x80,0x80,0x80,0x0D, 0x80,0x80,0x80,0x05, 0x80,0x80,0x80,0x0C, 0x80,0x80,0x80,0x04); __m128i pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80, 0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00); __m256i i_vec = _mm256_setzero_si256(); m0=0; for (i=0;i<n;i=i+16){ __m128i v = _mm_load_si128((__m128i *)&a[i]); /* Load 16 bytes. */ __m128i msk = _mm_cmpeq_epi8(v,_mm_setzero_si128()); /* Generate 16x8 bit mask. */ msk = _mm_srli_epi64(msk,4); /* Pack 16x8 bit mask to 16x4 bit mask. */ msk = _mm_shuffle_epi8(msk,pshufbcnst); /* Pack 16x8 bit mask to 16x4 bit mask. */ msk = _mm_xor_si128(msk,_mm_set1_epi32(-1)); /* Invert 16x4 mask. */ uint64_t msk64 = _mm_cvtsi128_si64x(msk); /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/ int p = _mm_popcnt_u64(msk64)>>2; /* p is the number of nonzeros in 16 bytes of a. */ uint64_t cntr = _pext_u64(cntr_const,msk64); /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */ /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers. */ __m256i cntr256 = _mm256_set1_epi64x(cntr); cntr256 = _mm256_srlv_epi64(cntr256,shft); cntr256 = _mm256_and_si256(cntr256,vmsk); __m256i cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo); __m256i cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi); cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo); cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi); _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo); /* Note that the stores of iteration i and i+16 may overlap. */ _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi); /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ m0 = m0+p; i_vec = _mm256_add_epi32(i_vec,cnst16); } *m=m0; return 0; } __attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0; m0=0; for (i=0;i<n;i++){ if (a[i]!=0){ ind[m0]=i; m0=m0+1; } } *m=m0; return 0; } __attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){ int i, m0; m0=0; for (i=0;i<n;i++){ ind[m0]=i; m0=(a[i]==0)? m0 : m0+1; /* Compiles to cmov instruction. */ } *m=m0; return 0; } __attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){ int i; for (i=0;i<m;i++) printf("i=%d, ind[i]=%d a[ind[i]]=%u/n",i,ind[i],a[ind[i]]); printf("/n"); fflush( stdout ); return 0; } __attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){ int i; /* Compute a hash to compare the results of different methods. */ unsigned int chk=0; for (i=0;i<m;i++){ chk=((chk<<1)|(chk>>31))^(ind[i]); } printf("chk = %10X/n",chk); return 0; } int main(int argc, char **argv){ int n, i, m; unsigned int j, k, d; unsigned char *a; int *ind; double t0,t1; int meth, nrep; char txt[30]; sscanf(argv[1],"%d",&n); /* Length of array a. */ n=n>>5; /* Adjust n to a multiple of 32. */ n=n<<5; sscanf(argv[2],"%u",&d); /* The approximate fraction of nonzeros in a is: d/1024 */ printf("n=%d, d=%u/n",n,d); a=_mm_malloc(n*sizeof(char),32); ind=_mm_malloc(n*sizeof(int),32); /* Generate a pseudo random array a. */ j=73659343; for (i=0;i<n;i++){ j=j*653+1; k=(j & 0x3FF00)>>8; /* k is a pseudo random number between 0 and 1023 */ if (k<d){ a[i] = (j&0xFE)+1; /* Set a[i] to nonzero. */ }else{ a[i] = 0; } } /* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d, a[i]=%u/n",i,a[i]);}} printf("/n"); */ /* Uncomment this line to print the nonzeros in a. */ char txt0[]="arr2ind_movmsk: "; char txt1[]="arr2ind_pext: "; char txt2[]="arr2ind_if: "; char txt3[]="arr2ind_cmov: "; nrep=10000; /* Repeat a function nrep times to make relatively accurate timings possible. */ /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 519 */ /* With nrep=10000: ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513 */ printf("nrep = /%d /n/n",nrep); arr2ind_movmsk(a,n,ind,&m); /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */ for (meth=0;meth<4;meth++){ t0=omp_get_wtime(); switch (meth){ case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m); strcpy(txt,txt0); break; case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m); strcpy(txt,txt1); break; case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m); strcpy(txt,txt2); break; case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m); strcpy(txt,txt3); break; default: ; } t1=omp_get_wtime(); printf("method = %s ",txt); /* print_chk(a,ind,m); */ printf(" elapsed time = %6.2f/n",t1-t0); } print_nonz(a, ind, 2); /* Do something with the results */ printf("density = %f %% /n/n",((double)m)/((double)n)*100); /* Actual nonzero density of array a. */ /* print_nonz(a, ind, m); */ /* Uncomment this line to print the indices of the nonzeros. */ return 0; } /* With nrep=1000000: ./a.out 10016 4 ; ./a.out 10016 4 ; ./a.out 10016 48 ; ./a.out 10016 48 ; ./a.out 10016 519 ; ./a.out 10016 519 With nrep=10000: ./a.out 1000000 5 ; ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 52 ; ./a.out 1000000 513 ; ./a.out 1000000 513 */


El código se probó con un tamaño de matriz de n = 10016 (los datos encajan en la memoria caché L1) yn = 1000000, con diferentes densidades distintas de cero de aproximadamente 0,5%, 5% y 50%. Para una sincronización precisa, las funciones se llamaron 1000000 y 10000 veces, respectivamente.


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500 0.53% 5.1% 50.0% arr2ind_movmsk: 0.27 0.53 4.89 arr2ind_pext: 1.44 1.59 1.45 arr2ind_if: 5.93 8.95 33.82 arr2ind_cmov: 6.82 6.83 6.82 Time in seconds, size n=1000000, 1e4 function calls. 0.49% 5.1% 50.1% arr2ind_movmsk: 0.57 2.03 5.37 arr2ind_pext: 1.47 1.47 1.46 arr2ind_if: 5.88 8.98 38.59 arr2ind_cmov: 6.82 6.81 6.81


En estos ejemplos, los bucles vectorizados son más rápidos que los bucles escalares. El rendimiento de arr2ind_movmsk depende mucho de la densidad de a . Solo es más rápido que arr2ind_pext si la densidad es suficientemente pequeña. El punto de equilibrio también depende del tamaño de la matriz n . La función ''arr2ind_if'' claramente adolece de una predicción de bifurcación anómala al 50% de densidad distinta de cero.


Si espera que la cantidad de elementos distintos de cero sea muy baja (es decir, mucho menos del 1%), entonces puede simplemente verificar que cada fragmento de 16 bytes no sea cero:

int mask = _mm_movemask_epi8(_mm_cmpeq_epi8(reg, _mm_setzero_si128()); if (mask != 65535) { //store zero bits of mask with scalar code }

Si el porcentaje de elementos buenos es suficientemente pequeño, el costo de las ramas mal predichas y el costo del código escalar lento dentro de "si" sería insignificante.

En cuanto a una buena solución general, primero considere la implementación SSE de la compactación de la corriente. Elimina todos los elementos cero de la matriz de bytes (idea tomada de aquí ):

__m128i shuf [65536]; //must be precomputed char cnt [65536]; //must be precomputed int compress(const char *src, int len, char *dst) { char *ptr = dst; for (int i = 0; i < len; i += 16) { __m128i reg = _mm_load_si128((__m128i*)&src[i]); __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); int mask = _mm_movemask_epi8(zeroMask); __m128i compressed = _mm_shuffle_epi8(reg, shuf[mask]); _mm_storeu_si128((__m128i*)ptr, compressed); ptr += cnt[mask]; //alternative: ptr += 16-_mm_popcnt_u32(mask); } return ptr - dst; }

Como ve, ( _mm_shuffle_epi8 + tabla de búsqueda) puede hacer maravillas. No conozco otra forma de vectorizar códigos estructuralmente complejos como la compactación de corrientes.

Ahora, el único problema restante con su solicitud es que desea obtener índices. Cada índice debe almacenarse en un valor de 4 bytes, por lo que un fragmento de 16 bytes de entrada puede producir hasta 64 bytes de salida, que no se ajustan al registro único de SSE.

Una forma de manejar esto es honestamente desempaquetar la salida a 64 bytes. De modo que reemplaza reg con constante (0,1,2,3,4, ..., 15) en el código, luego descomprime el registro SSE en 4 registros y agrega un registro con cuatro valores i . Esto requeriría muchas más instrucciones: 6 instrucciones de desempaquetado, 4 adiciones y 3 tiendas (una ya está allí). En cuanto a mí, esa es una gran sobrecarga, especialmente si esperas menos del 25% de elementos distintos de cero.

Alternativamente, puede limitar el número de bytes distintos de cero procesados ​​por iteración de ciclo simple por 4, de modo que un registro sea siempre suficiente para la salida. Aquí está el código de ejemplo:

__m128i shufMask [65536]; //must be precomputed char srcMove [65536]; //must be precomputed char dstMove [65536]; //must be precomputed int compress_ids(const char *src, int len, int *dst) { const char *ptrSrc = src; int *ptrDst = dst; __m128i offsets = _mm_setr_epi8(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15); __m128i base = _mm_setzero_si128(); while (ptrSrc < src + len) { __m128i reg = _mm_loadu_si128((__m128i*)ptrSrc); __m128i zeroMask = _mm_cmpeq_epi8(reg, _mm_setzero_si128()); int mask = _mm_movemask_epi8(zeroMask); __m128i ids8 = _mm_shuffle_epi8(offsets, shufMask[mask]); __m128i ids32 = _mm_unpacklo_epi16(_mm_unpacklo_epi8(ids8, _mm_setzero_si128()), _mm_setzero_si128()); ids32 = _mm_add_epi32(ids32, base); _mm_storeu_si128((__m128i*)ptrDst, ids32); ptrDst += dstMove[mask]; //alternative: ptrDst += min(16-_mm_popcnt_u32(mask), 4); ptrSrc += srcMove[mask]; //no alternative without LUT base = _mm_add_epi32(base, _mm_set1_epi32(dstMove[mask])); } return ptrDst - dst; }

Una desventaja de este enfoque es que ahora cada iteración de ciclo subsiguiente no puede comenzar hasta que la línea ptrDst += dstMove[mask]; se ejecuta en la iteración anterior. Entonces la ruta crítica ha aumentado dramáticamente. Hardware hyperthreading o su emulación manual puede eliminar esta penalización.

Entonces, como puede ver, hay muchas variaciones de esta idea básica, todas las cuales resuelven su problema con diferente grado de eficiencia. También puede reducir el tamaño de la LUT si no le gusta (una vez más, a costa de reducir el rendimiento).

Este enfoque no puede extenderse completamente a registros más amplios (es decir, AVX2 y AVX-512), pero puede intentar combinar instrucciones de varias iteraciones consecutivas en instrucciones AVX2 o AVX-512 individuales, aumentando así ligeramente el rendimiento.

Nota: No probé ningún código (porque la precomputación de LUT requiere un esfuerzo notable).