matriz matrices formato dispersas dispersa c++ performance optimization sparse-matrix

c++ - matriz - formato csr matrices dispersas



¿Cómo acelerar mi solución de matriz dispersa? (7)

Estoy escribiendo un solucionador de matriz dispersa utilizando el método de Gauss-Seidel. Al perfilar, he determinado que aproximadamente la mitad del tiempo de mi programa se gasta dentro del solucionador. La parte crítica del rendimiento es la siguiente:

size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1; for (size_t y = 1; y < d_ny - 1; ++y) { for (size_t x = 1; x < d_nx - 1; ++x) { d_x[ic] = d_b[ic] - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie] - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]; ++ic; ++iw; ++ie; ++is; ++in; } ic += 2; iw += 2; ie += 2; is += 2; in += 2; }

Todas las matrices involucradas son de tipo float . En realidad, no son matrices, sino objetos con un operador [] sobrecargado, que (creo) debería optimizarse, pero se define de la siguiente manera:

inline float &operator[](size_t i) { return d_cells[i]; } inline float const &operator[](size_t i) const { return d_cells[i]; }

Para d_nx = d_ny = 128 , esto puede ejecutarse aproximadamente 3500 veces por segundo en un Intel i7 920. Esto significa que el cuerpo del bucle interno funciona 3500 * 128 * 128 = 57 millones de veces por segundo. Como solo está involucrada alguna aritmética simple, eso me parece un número bajo para un procesador de 2.66 GHz.

Tal vez no esté limitado por el poder de la CPU, sino por el ancho de banda de la memoria? Bueno, una matriz float 128 * 128 float 65 kB, por lo que las 6 matrices deberían caber fácilmente en el caché L3 de la CPU (que es de 8 MB). Suponiendo que no hay nada almacenado en caché en los registros, cuento 15 accesos de memoria en el cuerpo del bucle interno. En un sistema de 64 bits, esto es 120 bytes por iteración, por lo que 57 millones * 120 bytes = 6.8 GB / s. El caché L3 se ejecuta a 2.66 GHz, por lo que es el mismo orden de magnitud. Mi conjetura es que la memoria es de hecho el cuello de botella.

Para acelerar esto, he intentado lo siguiente:

  • Compilar con g++ -O3 . (Bueno, había estado haciendo esto desde el principio.)

  • Paralelizando sobre 4 núcleos utilizando pragmas OpenMP. Tengo que cambiar al algoritmo de Jacobi para evitar lecturas y escrituras en la misma matriz. Esto requiere que haga el doble de iteraciones, lo que lleva a un resultado neto de aproximadamente la misma velocidad.

  • Jugando con los detalles de implementación del cuerpo del bucle, como el uso de punteros en lugar de índices. Sin efecto.

¿Cuál es el mejor enfoque para acelerar a este tipo? ¿Ayudaría a reescribir el cuerpo interno en ensamblaje (tendría que aprender eso primero)? ¿Debería ejecutar esto en la GPU en su lugar (lo que sé cómo hacerlo, pero es una molestia)? ¿Alguna otra idea brillante?

(NB: tomo "no" por respuesta, como en: "no se puede hacer mucho más rápido, porque ...")

Actualización : según lo solicitado, aquí hay un programa completo:

#include <iostream> #include <cstdlib> #include <cstring> using namespace std; size_t d_nx = 128, d_ny = 128; float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n; void step() { size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1; for (size_t y = 1; y < d_ny - 1; ++y) { for (size_t x = 1; x < d_nx - 1; ++x) { d_x[ic] = d_b[ic] - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie] - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]; ++ic; ++iw; ++ie; ++is; ++in; } ic += 2; iw += 2; ie += 2; is += 2; in += 2; } } void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) { step(); } } void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); } int main(int argc, char **argv) { size_t n = d_nx * d_ny; d_x = new float[n]; clear(d_x); d_b = new float[n]; clear(d_b); d_w = new float[n]; clear(d_w); d_e = new float[n]; clear(d_e); d_s = new float[n]; clear(d_s); d_n = new float[n]; clear(d_n); solve(atoi(argv[1])); cout << d_x[0] << endl; // prevent the thing from being optimized away }

Lo compilo y lo ejecuto de la siguiente manera:

$ g++ -o gstest -O3 gstest.cpp $ time ./gstest 8000 0 real 0m1.052s user 0m1.050s sys 0m0.010s

(Hace 8000 en lugar de 3500 iteraciones por segundo porque mi programa "real" también hace muchas otras cosas. Pero es representativo.)

Actualización 2 : Me han dicho que los valores unitarios pueden no ser representativos porque los valores de NaN e Inf pueden ralentizar las cosas. Ahora borrando la memoria en el código de ejemplo. Sin embargo, para mí no hay diferencia en la velocidad de ejecución.


Creo que logré optimizarlo, aquí hay un código, crear un nuevo proyecto en VC ++, agregar este código y simplemente compilar bajo "Lanzamiento".

#include <iostream> #include <cstdlib> #include <cstring> #define _WIN32_WINNT 0x0400 #define WIN32_LEAN_AND_MEAN #include <windows.h> #include <conio.h> using namespace std; size_t d_nx = 128, d_ny = 128; float *d_x, *d_b, *d_w, *d_e, *d_s, *d_n; void step_original() { size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1; for (size_t y = 1; y < d_ny - 1; ++y) { for (size_t x = 1; x < d_nx - 1; ++x) { d_x[ic] = d_b[ic] - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie] - d_s[ic] * d_x[is] - d_n[ic] * d_x[in]; ++ic; ++iw; ++ie; ++is; ++in; } ic += 2; iw += 2; ie += 2; is += 2; in += 2; } } void step_new() { //size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1; float *d_b_ic, *d_w_ic, *d_e_ic, *d_x_ic, *d_x_iw, *d_x_ie, *d_x_is, *d_x_in, *d_n_ic, *d_s_ic; d_b_ic = d_b; d_w_ic = d_w; d_e_ic = d_e; d_x_ic = d_x; d_x_iw = d_x; d_x_ie = d_x; d_x_is = d_x; d_x_in = d_x; d_n_ic = d_n; d_s_ic = d_s; for (size_t y = 1; y < d_ny - 1; ++y) { for (size_t x = 1; x < d_nx - 1; ++x) { /*d_x[ic] = d_b[ic] - d_w[ic] * d_x[iw] - d_e[ic] * d_x[ie] - d_s[ic] * d_x[is] - d_n[ic] * d_x[in];*/ *d_x_ic = *d_b_ic - *d_w_ic * *d_x_iw - *d_e_ic * *d_x_ie - *d_s_ic * *d_x_is - *d_n_ic * *d_x_in; //++ic; ++iw; ++ie; ++is; ++in; d_b_ic++; d_w_ic++; d_e_ic++; d_x_ic++; d_x_iw++; d_x_ie++; d_x_is++; d_x_in++; d_n_ic++; d_s_ic++; } //ic += 2; iw += 2; ie += 2; is += 2; in += 2; d_b_ic += 2; d_w_ic += 2; d_e_ic += 2; d_x_ic += 2; d_x_iw += 2; d_x_ie += 2; d_x_is += 2; d_x_in += 2; d_n_ic += 2; d_s_ic += 2; } } void solve_original(size_t iters) { for (size_t i = 0; i < iters; ++i) { step_original(); } } void solve_new(size_t iters) { for (size_t i = 0; i < iters; ++i) { step_new(); } } void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); } int main(int argc, char **argv) { size_t n = d_nx * d_ny; d_x = new float[n]; clear(d_x); d_b = new float[n]; clear(d_b); d_w = new float[n]; clear(d_w); d_e = new float[n]; clear(d_e); d_s = new float[n]; clear(d_s); d_n = new float[n]; clear(d_n); if(argc < 3) printf("app.exe (x)iters (o/n)algo/n"); bool bOriginalStep = (argv[2][0] == ''o''); size_t iters = atoi(argv[1]); /*printf("Press any key to start!"); _getch(); printf(" Running speed test../n");*/ __int64 freq, start, end, diff; if(!::QueryPerformanceFrequency((LARGE_INTEGER*)&freq)) throw "Not supported!"; freq /= 1000000; // microseconds! { ::QueryPerformanceCounter((LARGE_INTEGER*)&start); if(bOriginalStep) solve_original(iters); else solve_new(iters); ::QueryPerformanceCounter((LARGE_INTEGER*)&end); diff = (end - start) / freq; } printf("Speed (%s)/t/t: %u/n", (bOriginalStep ? "original" : "new"), diff); //_getch(); //cout << d_x[0] << endl; // prevent the thing from being optimized away }

Ejecutalo de esta manera:

app.exe 10000 o

app.exe 10000 n

"o" significa código antiguo, el tuyo.

"n" es mía, la nueva.

Mis resultados: Velocidad (original):

1515028

1523171

1495988

Velocidad (nueva):

966012

984110

1006045

Mejora de alrededor del 30%.

La lógica subyacente: ha estado utilizando contadores de índice para acceder / manipular. Yo uso punteros. Mientras se ejecuta, el punto de interrupción en una determinada línea de código de cálculo en el depurador de VC ++ y presione F8. Obtendrás la ventana del desensamblador. Verás los códigos de operación producidos (código de ensamblaje).

De todos modos, mira:

int * x = ...;

x [3] = 123;

Esto le dice a la PC que ponga el puntero x en un registro (digamos EAX). El agregarlo (3 * sizeof (int)). Sólo entonces, establezca el valor en 123.

El enfoque de los punteros es mucho mejor como puede entender, porque cortamos el proceso de adición, en realidad lo manejamos nosotros mismos, por lo que podemos optimizar según sea necesario.

Espero que esto ayude.

Nota al personal de .com: ¡Excelente sitio web, espero haber escuchado sobre esto hace mucho tiempo!


Creo que tienes razón en que la memoria es un cuello de botella. Es un bucle bastante simple con solo una aritmética simple por iteración. ic, iw, ie, is, y en los índices parecen estar en lados opuestos de la matriz, así que supongo que hay un montón de caché que faltan allí.


La respuesta de Poni es la correcta para mí.

Solo quiero señalar que en este tipo de problema, a menudo obtiene beneficios de la localidad de la memoria. En este momento, las matrices b,w,e,s,n están en ubicaciones separadas en la memoria. Si no pudiera encajar el problema en el caché L3 (principalmente en L2), entonces esto sería malo, y una solución de este tipo sería útil:

size_t d_nx = 128, d_ny = 128; float *d_x; struct D { float b,w,e,s,n; }; D *d; void step() { size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1; for (size_t y = 1; y < d_ny - 1; ++y) { for (size_t x = 1; x < d_nx - 1; ++x) { d_x[ic] = d[ic].b - d[ic].w * d_x[iw] - d[ic].e * d_x[ie] - d[ic].s * d_x[is] - d[ic].n * d_x[in]; ++ic; ++iw; ++ie; ++is; ++in; } ic += 2; iw += 2; ie += 2; is += 2; in += 2; } } void solve(size_t iters) { for (size_t i = 0; i < iters; ++i) step(); } void clear(float *a) { memset(a, 0, d_nx * d_ny * sizeof(float)); } int main(int argc, char **argv) { size_t n = d_nx * d_ny; d_x = new float[n]; clear(d_x); d = new D[n]; memset(d,0,n * sizeof(D)); solve(atoi(argv[1])); cout << d_x[0] << endl; // prevent the thing from being optimized away }

Por ejemplo, esta solución a 1280x1280 es un poco menos de 2 veces más rápida que la solución de Poni (13s vs 23s en mi prueba - su implementación original es 22s), mientras que a 128x128 es 30% más lenta (7s vs. 10s - su original es 10s).

(Las iteraciones se escalaron hasta 80000 para el caso base y 800 para el caso más grande de 1280x1280).


No soy un experto en el tema, pero he visto que hay varios artículos académicos sobre cómo mejorar el uso del caché del método de Gauss-Seidel.

Otra posible optimización es el uso de la variante rojo-negro, donde los puntos se actualizan en dos barridos en un patrón similar al tablero de ajedrez. De esta manera, todas las actualizaciones en un barrido son independientes y se pueden paralelizar.


Por un lado, parece que hay un problema de canalización aquí. El bucle se lee del valor en d_x en el que se acaba de escribir, pero aparentemente tiene que esperar a que se complete esa escritura. Solo reorganizar el orden de los cálculos, hacer algo útil mientras está esperando, hace que sea casi el doble de rápido:

d_x[ic] = d_b[ic] - d_e[ic] * d_x[ie] - d_s[ic] * d_x[is] - d_n[ic] * d_x[in] - d_w[ic] * d_x[iw] /* d_x[iw] has just been written to, process this last */;

Fue Eamon Nerbonne quien descubrió esto. ¡Muchos upvotes a él! Nunca lo hubiera adivinado.


Sugiero poner algunas declaraciones de búsqueda previa y también investigar "diseño orientado a datos":

void step_original() { size_t ic = d_ny + 1, iw = d_ny, ie = d_ny + 2, is = 1, in = 2 * d_ny + 1; float dw_ic, dx_ic, db_ic, de_ic, dn_ic, ds_ic; float dx_iw, dx_is, dx_ie, dx_in, de_ic, db_ic; for (size_t y = 1; y < d_ny - 1; ++y) { for (size_t x = 1; x < d_nx - 1; ++x) { // Perform the prefetch // Sorting these statements by array may increase speed; // although sorting by index name may increase speed too. db_ic = d_b[ic]; dw_ic = d_w[ic]; dx_iw = d_x[iw]; de_ic = d_e[ic]; dx_ie = d_x[ie]; ds_ic = d_s[ic]; dx_is = d_x[is]; dn_ic = d_n[ic]; dx_in = d_x[in]; // Calculate d_x[ic] = db_ic - dw_ic * dx_iw - de_ic * dx_ie - ds_ic * dx_is - dn_ic * dx_in; ++ic; ++iw; ++ie; ++is; ++in; } ic += 2; iw += 2; ie += 2; is += 2; in += 2; } }

Esto difiere de su segundo método, ya que los valores se copian en variables temporales locales antes de realizar el cálculo.


Un par de ideas:

  1. Utilice SIMD. Puede cargar 4 flotantes a la vez desde cada matriz en un registro SIMD (por ejemplo, SSE en Intel, VMX en PowerPC). La desventaja de esto es que algunos de los valores d_x serán "obsoletos", por lo que su tasa de convergencia sufrirá (pero no es tan mala como una iteración de jacobi); Es difícil decir si la aceleración lo compensa.

  2. Utilice SOR . Es simple, no agrega mucho cómputo y puede mejorar su tasa de convergencia bastante bien, incluso para un valor de relajación relativamente conservador (por ejemplo, 1.5).

  3. Utilizar gradiente conjugado. Si esto es para el paso de proyección de una simulación fluida (es decir, hacer cumplir la no compresibilidad), debería poder aplicar CG y obtener una tasa de convergencia mucho mejor. Un buen preacondicionador ayuda aún más.

  4. Utilice un solucionador especializado. Si el sistema lineal surge de la ecuación de Poisson , puede hacerlo incluso mejor que el gradiente conjugado utilizando métodos basados ​​en FFT.

Si puedes explicar más sobre cómo se ve el sistema que estás tratando de resolver, probablemente te pueda dar más consejos sobre el # 3 y el # 4.