means c++ performance optimization

c++ - K-means sin ramas(u otras optimizaciones)



k means c++ (6)

Nota: agradecería más una guía sobre cómo abordar y proponer este tipo de soluciones en lugar de la solución en sí.

Tengo una función muy crítica para el rendimiento en mi sistema que aparece como un punto de acceso de perfiles número uno en contextos específicos. Está en el medio de una iteración k-means (ya multihilo usando un paralelo para procesar sub-rangos de puntos en cada hilo de trabajo).

ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; if (dist < pt.min_dist) // <-- #1 hotspot { pt.min_dist = dist; pt.min_index = i; } }

Cualquier ahorro en el tiempo requerido para procesar esta sección del código cuenta sustancialmente, por lo que a menudo he estado jugando mucho con él. Podría valer la pena poner el bucle centroide afuera, por ejemplo, e iterar a través de los puntos en paralelo para un centroide dado. El número de puntos de clúster aquí se extiende por millones, mientras que el número de centroides se extiende por miles. El algoritmo se aplica para un puñado de iteraciones (a menudo menores de 10). No busca una convergencia / estabilidad perfecta, solo una aproximación ''razonable''.

Se agradece cualquier idea, pero lo que realmente estoy ansioso por descubrir es si este código se puede hacer sin ramas, ya que permitiría una versión SIMD. Realmente no he desarrollado el tipo de habilidad mental para comprender fácilmente cómo llegar a soluciones sin ramificaciones: mi cerebro falla allí como lo hizo cuando estuve expuesto por primera vez a la recursión en los primeros días, así que una guía sobre cómo escribir sin ramificaciones código y cómo desarrollar la mentalidad adecuada para ello también sería útil.

En resumen, estoy buscando guías, sugerencias y sugerencias (no necesariamente soluciones) sobre cómo micro-optimizar este código. Lo más probable es que tenga espacio para mejoras algorítmicas, pero mi punto ciego siempre ha estado en soluciones de microoptimización (y tengo curiosidad por aprender cómo aplicarlas de manera más efectiva sin exagerar). Ya está fuertemente multiproceso con un paralelo grueso para la lógica, por lo que estoy bastante empujado a la esquina de la microoptimización como una de las cosas más rápidas de probar sin un algoritmo más inteligente. Somos completamente libres de cambiar el diseño de la memoria.

En respuesta a sugerencias algorítmicas

Acerca de ver todo esto mal al tratar de micro-optimizar un algoritmo O (knm) que claramente podría mejorarse a nivel algorítmico, estoy totalmente de acuerdo. Esto empuja esta pregunta específica a un ámbito algo académico y poco práctico. Sin embargo, si se me permitiera una anécdota, vengo de una experiencia original de programación de alto nivel: gran énfasis en un punto de vista amplio, a gran escala, seguridad y muy poco en los detalles de implementación de bajo nivel. Recientemente cambié los proyectos a un tipo muy diferente de sabor moderno y estoy aprendiendo todo tipo de nuevos trucos de mis pares de eficiencia de caché, GPGPU, técnicas sin ramificación, SIMD, asignadores de memoria para propósitos especiales que realmente superan a Malloc ( pero para escenarios específicos), etc.

Es donde estoy tratando de ponerme al día con las últimas tendencias de rendimiento, y sorprendentemente descubrí que esas estructuras de datos antiguas que a menudo prefería durante los años 90, que a menudo estaban vinculadas / estructuras de tipo árbol, en realidad están siendo superadas por mucho más ingenuas. , código brutalizado, micro-optimizado, paralelo, aplicando instrucciones sintonizadas sobre bloques de memoria contiguos. Es algo decepcionante al mismo tiempo, ya que siento que ahora estamos ajustando más los algoritmos a la máquina y reduciendo las posibilidades de esta manera (especialmente con GPGPU).

Lo más divertido es que encuentro este tipo de código micro-optimizado y rápido de procesamiento de matriz mucho más fácil de mantener que los sofisticados algoritmos y estructuras de datos que estaba usando antes. Para empezar, son más fáciles de generalizar. Además, mis compañeros a menudo pueden recibir una queja de un cliente sobre una desaceleración específica en un área, simplemente abofetear un paralelo y posiblemente algo de SIMD y llamarlo con una velocidad decente. Las mejoras algorítmicas a menudo pueden ofrecer mucho más, pero la velocidad y la no intrusión a las que se pueden aplicar estas microoptimizaciones me dan ganas de aprender más en esa área, ya que leer documentos sobre mejores algoritmos puede llevar algo de tiempo (además de requerir más cambios extensos). Así que últimamente me he subido a ese carro de micro-optimización un poco más, y tal vez demasiado en este caso específico, pero mi curiosidad es más sobre expandir mi rango de posibles soluciones para cualquier escenario.

Desmontaje

Nota: Soy muy, muy malo en el ensamblaje, por lo que a menudo he ajustado las cosas más de una manera de prueba y error, llegando a conjeturas un tanto educadas sobre por qué un punto de acceso que se muestra en vtune podría ser el cuello de botella y luego probar las cosas para ver si los tiempos mejoran, suponiendo que las conjeturas tengan algún indicio de verdad si los tiempos mejoran, o si no lo hacen, se pierden por completo.

000007FEEE3FB8A1 jl thread_partition+70h (7FEEE3FB780h) { ClusterPoint& pt = points[j]; pt.min_index = -1; pt.min_dist = numeric_limits<float>::max(); for (int i = 0; i < num_centroids; ++i) 000007FEEE3FB8A7 cmp ecx,r10d 000007FEEE3FB8AA jge thread_partition+1F4h (7FEEE3FB904h) 000007FEEE3FB8AC lea rax,[rbx+rbx*2] 000007FEEE3FB8B0 add rax,rax 000007FEEE3FB8B3 lea r8,[rbp+rax*8+8] { const ClusterCentroid& cent = centroids[i]; const float x = pt.pos[0] - cent.pos[0]; const float y = pt.pos[1] - cent.pos[1]; 000007FEEE3FB8B8 movss xmm0,dword ptr [rdx] const float z = pt.pos[2] - cent.pos[2]; 000007FEEE3FB8BC movss xmm2,dword ptr [rdx+4] 000007FEEE3FB8C1 movss xmm1,dword ptr [rdx-4] 000007FEEE3FB8C6 subss xmm2,dword ptr [r8] 000007FEEE3FB8CB subss xmm0,dword ptr [r8-4] 000007FEEE3FB8D1 subss xmm1,dword ptr [r8-8] const float dist = x*x + y*y + z*z; 000007FEEE3FB8D7 mulss xmm2,xmm2 000007FEEE3FB8DB mulss xmm0,xmm0 000007FEEE3FB8DF mulss xmm1,xmm1 000007FEEE3FB8E3 addss xmm2,xmm0 000007FEEE3FB8E7 addss xmm2,xmm1 if (dist < pt.min_dist) // VTUNE HOTSPOT 000007FEEE3FB8EB comiss xmm2,dword ptr [rdx-8] 000007FEEE3FB8EF jae thread_partition+1E9h (7FEEE3FB8F9h) { pt.min_dist = dist; 000007FEEE3FB8F1 movss dword ptr [rdx-8],xmm2 pt.min_index = i; 000007FEEE3FB8F6 mov dword ptr [rdx-10h],ecx 000007FEEE3FB8F9 inc ecx 000007FEEE3FB8FB add r8,30h 000007FEEE3FB8FF cmp ecx,r10d 000007FEEE3FB902 jl thread_partition+1A8h (7FEEE3FB8B8h) for (int j = *irange.first; j < *irange.last; ++j) 000007FEEE3FB904 inc edi 000007FEEE3FB906 add rdx,20h 000007FEEE3FB90A cmp edi,dword ptr [rsi+4] 000007FEEE3FB90D jl thread_partition+31h (7FEEE3FB741h) 000007FEEE3FB913 mov rbx,qword ptr [irange] } } } }

Nos vemos obligados a apuntar a SSE 2, un poco atrasados ​​en nuestros tiempos, pero la base de usuarios realmente se disparó una vez cuando asumimos que incluso SSE 4 estaba bien como requisito mínimo (el usuario tenía un prototipo de máquina Intel).

Actualización con prueba independiente: ~ 5.6 segundos

¡Aprecio mucho toda la ayuda que se me ofrece! Debido a que la base de código es bastante extensa y las condiciones para activar ese código son complejas (eventos del sistema activados en varios subprocesos), es un poco difícil de realizar cambios experimentales y perfilarlos cada vez. Así que configuré una prueba superficial como aplicación independiente que otros también pueden ejecutar y probar para que pueda experimentar con todas estas soluciones ofrecidas con gracia.

#define _SECURE_SCL 0 #include <iostream> #include <fstream> #include <vector> #include <limits> #include <ctime> #if defined(_MSC_VER) #define ALIGN16 __declspec(align(16)) #else #include <malloc.h> #define ALIGN16 __attribute__((aligned(16))) #endif using namespace std; // Aligned memory allocation (for SIMD). static void* malloc16(size_t amount) { #ifdef _MSC_VER return _aligned_malloc(amount, 16); #else void* mem = 0; posix_memalign(&mem, 16, amount); return mem; #endif } template <class T> static T* malloc16_t(size_t num_elements) { return static_cast<T*>(malloc16(num_elements * sizeof(T))); } // Aligned free. static void free16(void* mem) { #ifdef _MSC_VER return _aligned_free(mem); #else free(mem); #endif } // Test parameters. enum {num_centroids = 512}; enum {num_points = num_centroids * 2000}; enum {num_iterations = 5}; static const float range = 10.0f; class Points { public: Points(): data(malloc16_t<Point>(num_points)) { for (int p=0; p < num_points; ++p) { const float xyz[3] = { range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX, range * static_cast<float>(rand()) / RAND_MAX }; init(p, xyz); } } ~Points() { free16(data); } void init(int n, const float* xyz) { data[n].centroid = -1; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void associate(int n, int new_centroid) { data[n].centroid = new_centroid; } int centroid(int n) const { return data[n].centroid; } float* operator[](int n) { return data[n].xyz; } private: Points(const Points&); Points& operator=(const Points&); struct Point { int centroid; float xyz[3]; }; Point* data; }; class Centroids { public: Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids)) { // Naive initial selection algorithm, but outside the // current area of interest. for (int c=0; c < num_centroids; ++c) init(c, points[c]); } ~Centroids() { free16(data); } void init(int n, const float* xyz) { data[n].count = 0; data[n].xyz[0] = xyz[0]; data[n].xyz[1] = xyz[1]; data[n].xyz[2] = xyz[2]; } void reset(int n) { data[n].count = 0; data[n].xyz[0] = 0.0f; data[n].xyz[1] = 0.0f; data[n].xyz[2] = 0.0f; } void sum(int n, const float* pt_xyz) { data[n].xyz[0] += pt_xyz[0]; data[n].xyz[1] += pt_xyz[1]; data[n].xyz[2] += pt_xyz[2]; ++data[n].count; } void average(int n) { if (data[n].count > 0) { const float inv_count = 1.0f / data[n].count; data[n].xyz[0] *= inv_count; data[n].xyz[1] *= inv_count; data[n].xyz[2] *= inv_count; } } float* operator[](int n) { return data[n].xyz; } int find_nearest(const float* pt_xyz) const { float min_dist_squared = numeric_limits<float>::max(); int min_centroid = -1; for (int c=0; c < num_centroids; ++c) { const float* cen_xyz = data[c].xyz; const float x = pt_xyz[0] - cen_xyz[0]; const float y = pt_xyz[1] - cen_xyz[1]; const float z = pt_xyz[2] - cen_xyz[2]; const float dist_squared = x*x + y*y * z*z; if (min_dist_squared > dist_squared) { min_dist_squared = dist_squared; min_centroid = c; } } return min_centroid; } private: Centroids(const Centroids&); Centroids& operator=(const Centroids&); struct Centroid { int count; float xyz[3]; }; Centroid* data; }; // A high-precision real timer would be nice, but we lack C++11 and // the coarseness of the testing here should allow this to suffice. static double sys_time() { return static_cast<double>(clock()) / CLOCKS_PER_SEC; } static void k_means(Points& points, Centroids& centroids) { // Find the closest centroid for each point. for (int p=0; p < num_points; ++p) { const float* pt_xyz = points[p]; points.associate(p, centroids.find_nearest(pt_xyz)); } // Reset the data of each centroid. for (int c=0; c < num_centroids; ++c) centroids.reset(c); // Compute new position sum of each centroid. for (int p=0; p < num_points; ++p) centroids.sum(points.centroid(p), points[p]); // Compute average position of each centroid. for (int c=0; c < num_centroids; ++c) centroids.average(c); } int main() { Points points; Centroids centroids(points); cout << "Starting simulation..." << endl; double start_time = sys_time(); for (int i=0; i < num_iterations; ++i) k_means(points, centroids); cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl; cout << "# Points: " << num_points << endl; cout << "# Centroids: " << num_centroids << endl; // Write the centroids to a file to give us some crude verification // of consistency as we make changes. ofstream out("centroids.txt"); for (int c=0; c < num_centroids; ++c) out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl; }

Soy consciente de los peligros de las pruebas superficiales, pero como ya se considera un punto de acceso de sesiones anteriores del mundo real, espero que sea excusable. También me interesan las técnicas generales asociadas con la microoptimización de dicho código.

Obtuve resultados ligeramente diferentes al perfilar este. Los tiempos están un poco más uniformemente dispersos dentro del ciclo aquí, y no estoy seguro de por qué. Quizás sea porque los datos son más pequeños (omití miembros y min_dist miembro min_dist y lo convertí en una variable local). La relación exacta entre los centroides y los puntos también es un poco diferente, pero con suerte lo suficientemente cerca como para traducir las mejoras aquí al código original. También es de un solo subproceso en esta prueba superficial, y el desmontaje se ve bastante diferente, por lo que puedo arriesgarme a optimizar esta prueba superficial sin el original (un riesgo que estoy dispuesto a asumir por ahora, ya que estoy más interesado en expandir mi conocimiento de técnicas que podrían optimizar estos casos en lugar de una solución para este caso exacto).

Actualización con la sugerencia de Yochai Timmer - ~ 12.5 segundos

Ah, me enfrento a los problemas de la microoptimización sin comprender muy bien el ensamblaje. Reemplacé esto:

-if (min_dist_squared > dist_squared) -{ - min_dist_squared = dist_squared; - pt.centroid = c; -}

Con este:

+const bool found_closer = min_dist_squared > dist_squared; +pt.centroid = bitselect(found_closer, c, pt.centroid); +min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);

.. solo para encontrar los tiempos escalados de ~ 5.6 segundos a ~ 12.5 segundos. Sin embargo, eso no es su culpa ni le quita el valor de su solución, eso es mío por no entender lo que realmente está sucediendo a nivel de máquina y apuñalar en la oscuridad. Ese aparentemente falló, y aparentemente no fui víctima de la predicción errónea de la rama como inicialmente pensé. Sin embargo, su solución propuesta es una función maravillosa y generalizada para probar en tales casos, y estoy agradecido de agregarla a mi caja de herramientas de consejos y trucos. Ahora para la ronda 2.

Solución SIMD de Harold - 2.496 segundos (ver advertencia)

Esta solución puede ser asombrosa. ¡Después de convertir el representante del clúster a SoA, obtengo tiempos de ~ 2.5 segundos con este! Desafortunadamente, parece haber un problema técnico de algún tipo. Estoy obteniendo resultados muy diferentes para el resultado final que sugiere más que ligeras diferencias de precisión, incluidos algunos centroides hacia el final con valores de 0 (lo que implica que no se encontraron en la búsqueda). He estado tratando de pasar por la lógica SIMD con el depurador para ver qué podría estar pasando, podría ser simplemente un error de transcripción de mi parte, pero aquí está el código en caso de que alguien pueda detectar el error.

Si el error pudiera corregirse sin ralentizar los resultados, ¡esta mejora de velocidad es más de lo que alguna vez imaginé de una microoptimización pura!

// New version of Centroids::find_nearest (from harold''s solution): int find_nearest(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; }

Solución SIMD de Harold (corregida) - ~ 2.5 segundos

¡Después de aplicar las correcciones y probarlas, los resultados están intactos y funcionan correctamente con mejoras similares a la base de código original!

Dado que esto llega al santo grial del conocimiento que buscaba comprender mejor (SIMD sin ramas), voy a otorgarle a la solución algunos accesorios adicionales por más que duplicar la velocidad de la operación. Tengo mi tarea cortada al tratar de entenderlo, ya que mi objetivo no era simplemente mitigar este punto crítico, sino ampliar mi comprensión personal de las posibles soluciones para tratar con ellos.

Sin embargo, estoy agradecido por todas las contribuciones aquí, desde las sugerencias algorítmicas hasta el genial truco de selección de bitselect . Desearía poder aceptar todas las respuestas. Puede que termine probándolos todos en algún momento, pero por ahora tengo mi tarea cortada para comprender algunas de estas operaciones SIMD no aritméticas.

int find_nearest_simd(const float* pt_xyz) const { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]); __m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]); __m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]); __m128 xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x)); __m128 ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y)); __m128 zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i=4; i < num_centroids; i += 4) { xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i)); ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i)); zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } ALIGN16 float mdist[4]; ALIGN16 uint32_t mindex[4]; _mm_store_ps(mdist, min_dist); _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i=1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; }


En primer lugar, sugeriría que antes de intentar cualquier cambio de código, observe el desensamblaje en una compilación optimizada. Idealmente, desea ver los datos del generador de perfiles a nivel de ensamblaje. Esto puede mostrar varias cosas, por ejemplo:

  1. Es posible que el compilador no haya generado una instrucción de bifurcación real.
  2. La línea de código que tiene el cuello de botella puede tener muchas más instrucciones asociadas de las que podría pensar, por ejemplo, el cálculo de dist.

Además de eso, existe el truco estándar de que cuando se habla de distancias, calcularlas a menudo requiere una raíz cuadrada. Debe hacer esa raíz cuadrada al final del proceso en el valor mínimo cuadrado.

SSE puede procesar cuatro valores a la vez, sin ramificaciones, utilizando _mm_min_ps . Si realmente necesita velocidad, entonces quiere usar intrínsecos SSE (o AVX). Aquí hay un ejemplo básico:

float MinimumDistance(const float *values, int count) { __m128 min = _mm_set_ps(FLT_MAX, FLT_MAX, FLT_MAX, FLT_MAX); int i=0; for (; i < count - 3; i+=4) { __m128 distances = _mm_loadu_ps(&values[i]); min = _mm_min_ps(min, distances); } // Combine the four separate minimums to a single value min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(2, 3, 0, 1))); min = _mm_min_ps(min, _mm_shuffle_ps(min, min, _MM_SHUFFLE(1, 0, 3, 2))); // Deal with the last 0-3 elements the slow way float result = FLT_MAX; if (count > 3) _mm_store_ss(&result, min); for (; i < count; i++) { result = min(values[i], result); } return result; }

Para obtener el mejor rendimiento de SSE, debe asegurarse de que las cargas sucedan en direcciones alineadas. Puede manejar los primeros elementos desalineados de la misma manera que los últimos en el código anterior si es necesario.

La otra cosa a tener en cuenta es el ancho de banda de memoria. Si hay varios miembros de la estructura ClusterCentroid que no usa durante ese ciclo, entonces leerá mucha más información de la que realmente necesita, ya que la memoria se lee en fragmentos de tamaño de línea de caché, que son 64 bytes cada uno.


Esto podría ir en ambos sentidos, pero probaría la siguiente estructura:

std::vector<float> centDists(num_centroids); //<-- one for each thread. for (size_t p=0; p<num_points; ++p) { Point& pt = points[p]; for (size_t c=0; c<num_centroids; ++c) { const float dist = ...; centDists[c]=dist; } pt.min_idx it= min_element(centDists.begin(),centDists.end())-centDists.begin(); }

Obviamente, ahora tiene que iterar dos veces sobre la memoria, lo que probablemente perjudica la proporción de aciertos / errores de caché (también podría dividirla en sub rangos), pero por otro lado, cada uno de los bucles internos debería ser fácil de vectorizar y desenrollar. así que solo tienes que medir si vale la pena.

E incluso si se apega a su versión, trataría de usar variables locales para realizar un seguimiento del índice mínimo y la distancia y aplicar los resultados para señalar al final.
Lo racional es que cada lectura o escritura pt.min_dist se realiza de manera efectiva a través de un puntero que, dependiendo de las optimizaciones del compilador, puede o no disminuir su rendimiento.

Otra cosa que es importante para las vectorizaciones es convertir una matriz de estructuras (en este caso, cententroides) en una estructura de matrices (por ejemplo, una matriz para cada coordenada de los puntos), porque de esa manera no necesita instrucciones de recolección adicionales en Para cargar los datos para su uso con instrucciones SIMD. Vea la charla de Eric Brumer para obtener más información sobre ese tema.

EDITAR: Algunos números para mi sistema (haswell, clang 3.5):
Hice una prueba corta con su punto de referencia y en mi sistema, el código anterior desaceleró el algoritmo en aproximadamente un 10%, esencialmente, no se pudo vectorizar nada.

Sin embargo, al aplicar la transformación de AoS a SoA para sus centroides, el cálculo de la distancia se vectorizó, lo que condujo a una reducción del tiempo de ejecución general de aproximadamente el 40% en comparación con su estructura original con la transformación de AoS a SoA aplicada.


Una posible micro optimizaciones: Almacene min_dist y min_index en variables locales. Es posible que el compilador tenga que escribir en la memoria con mayor frecuencia de la forma en que lo ha escrito; En algunas arquitecturas esto puede tener un gran impacto en el rendimiento. Vea mi respuesta aquí para otro ejemplo.

La sugerencia de Adams de hacer 4 comparaciones a la vez también es buena.

Sin embargo, su mejor aceleración vendrá reduciendo la cantidad de centroides que debe verificar. Idealmente, construya un árbol kd (o similar) alrededor de los centroides, luego consulte para encontrar el punto más cercano.

Si no tiene ningún código de construcción de árboles, aquí está mi búsqueda de punto más cercana "pobre":

Sort the points by one coordinate, e.g. cent.pos[0] Pick a starting index for the query point (pt) Iterate forwards through the candidate points until you reach the end, OR when abs(pt.pos[0] - cent.pos[0]) > min_dist Repeat the previous step going the opposite direction.

La condición de detención adicional para la búsqueda significa que debe omitir una buena cantidad de puntos; También tiene la garantía de no omitir ningún punto más cercano que el mejor que ya ha encontrado.

Entonces, para su código, esto se parece a

// sort centroid by x coordinate. min_index = -1; min_dist = numeric_limits<float>::max(); // pick the start index. This works well if the points are evenly distributed. float min_x = centroids[0].pos[0]; float max_x = centroids[num_centroids-1].pos[0]; float cur_x = pt.pos[0]; float t = (max_x - cur_x) / (max_x - min_x); // TODO clamp t between 0 and 1 int start_index = int(t * float(num_centroids)) // Forward search for (int i=start_index ; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; if (fabs(cent.pos[0] - pt.pos[0]) > min_i) // Everything to the right of this must be further min_dist, so break. // This is where the savings comes from! break; const float dist = ...; if (dist < min_dist) { min_dist = dist; min_index = i; } } // Backwards search for (int i=start_index ; i >= 0; --i) { // same as above } pt.min_dist = min_dist pt.min_index = min_index

(Tenga en cuenta que esto supone que está calculando la distancia entre puntos, pero su ensamblaje indica que es la distancia al cuadrado. Ajuste la condición de ruptura en consecuencia).

Hay una ligera sobrecarga para construir el árbol o clasificar los centroides, pero esto debe compensarse haciendo que los cálculos sean más rápidos en el bucle más grande (sobre el número de puntos).


C ++ es un lenguaje de alto nivel. Su suposición de que el flujo de control en el código fuente de C ++ se traduce en instrucciones de ramificación es errónea. No tengo la definición de algunos tipos de su ejemplo, por lo que hice un programa de prueba simple con asignaciones condicionales similares:

int g(int, int); int f(const int *arr) { int min = 10000, minIndex = -1; for ( int i = 0; i < 1000; ++i ) { if ( arr[i] < min ) { min = arr[i]; minIndex = i; } } return g(min, minIndex); }

Tenga en cuenta que el uso de la "g" indefinida es simplemente para evitar que el optimizador elimine todo. Traduje esto con G ++ 4.9.2 con -O3 y -S al ensamblado x86_64 (sin siquiera tener que cambiar el valor predeterminado para -march) y el resultado (no es demasiado sorprendente) es que el cuerpo del bucle no contiene ramas

movl (%rdi,%rax,4), %ecx movl %edx, %r8d cmpl %edx, %ecx cmovle %ecx, %r8d cmovl %eax, %esi addq $1, %rax

Aparte de eso, la suposición de que sin ramas es necesariamente más rápida también puede ser defectuosa porque la probabilidad de que una nueva distancia "venza" a la anterior está disminuyendo cuantos más elementos haya observado. No es un lanzamiento de moneda. El truco de "selección de bits" se inventó cuando los compiladores eran mucho menos agresivos en la generación de ensamblaje "como si" de lo que son hoy en día. Prefiero sugerir que eche un vistazo al tipo de ensamblaje que su compilador realmente está generando antes de intentar reelaborar el código para que el compilador pueda optimizarlo mejor o tomar el resultado como base para el ensamblaje escrito a mano. Si desea analizar SIMD, le sugiero que intente un enfoque de "mínimo de mínimos" con dependencias de datos reducidas (en mi ejemplo, las dependencias de "min" son probablemente un cuello de botella).


Lástima que no podamos usar SSE4.1, pero muy bien, SSE2 lo es. No he probado esto, solo lo compilé para ver si había errores de sintaxis y para ver si el ensamblaje tenía sentido (está bien, aunque GCC derrama min_index incluso con algunos registros xmm no utilizados, no estoy seguro de por qué sucede eso)

int find_closest(float *x, float *y, float *z, float pt_x, float pt_y, float pt_z, int n) { __m128i min_index = _mm_set_epi32(3, 2, 1, 0); __m128 xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x)); __m128 ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y)); __m128 zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z)); __m128 min_dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); __m128i index = min_index; for (int i = 4; i < n; i += 4) { xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i)); ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i)); zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i)); __m128 dist = _mm_add_ps(_mm_add_ps(_mm_mul_ps(xdif, xdif), _mm_mul_ps(ydif, ydif)), _mm_mul_ps(zdif, zdif)); index = _mm_add_epi32(index, _mm_set1_epi32(4)); __m128i mask = _mm_castps_si128(_mm_cmplt_ps(dist, min_dist)); min_dist = _mm_min_ps(min_dist, dist); min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index)); } float mdist[4]; _mm_store_ps(mdist, min_dist); uint32_t mindex[4]; _mm_store_si128((__m128i*)mindex, min_index); float closest = mdist[0]; int closest_i = mindex[0]; for (int i = 1; i < 4; i++) { if (mdist[i] < closest) { closest = mdist[i]; closest_i = mindex[i]; } } return closest_i; }

Como de costumbre, espera que los punteros estén alineados en 16. Además, el relleno debe estar con puntos en el infinito (para que nunca estén más cerca del objetivo).

SSE 4.1 le permitiría reemplazar esto

min_index = _mm_or_si128(_mm_and_si128(index, mask), _mm_andnot_si128(mask, min_index));

Por esto

min_index = _mm_blendv_epi8(min_index, index, mask);

Aquí hay una versión asm, hecha para vsyasm , probada un poco (parece funcionar)

bits 64 section .data align 16 centroid_four: dd 4, 4, 4, 4 centroid_index: dd 0, 1, 2, 3 section .text global find_closest proc_frame find_closest ; ; arguments: ; ecx: number of points (multiple of 4 and at least 4) ; rdx -> array of 3 pointers to floats (x, y, z) (the points) ; r8 -> array of 3 floats (the reference point) ; alloc_stack 0x58 save_xmm128 xmm6, 0 save_xmm128 xmm7, 16 save_xmm128 xmm8, 32 save_xmm128 xmm9, 48 [endprolog] movss xmm0, [r8] shufps xmm0, xmm0, 0 movss xmm1, [r8 + 4] shufps xmm1, xmm1, 0 movss xmm2, [r8 + 8] shufps xmm2, xmm2, 0 ; pointers to x, y, z in r8, r9, r10 mov r8, [rdx] mov r9, [rdx + 8] mov r10, [rdx + 16] ; reference point is in xmm0, xmm1, xmm2 (x, y, z) movdqa xmm3, [rel centroid_index] ; min_index movdqa xmm4, xmm3 ; current index movdqa xmm9, [rel centroid_four] ; index increment paddd xmm4, xmm9 ; calculate initial min_dist, xmm5 movaps xmm5, [r8] subps xmm5, xmm0 movaps xmm7, [r9] subps xmm7, xmm1 movaps xmm8, [r10] subps xmm8, xmm2 mulps xmm5, xmm5 mulps xmm7, xmm7 mulps xmm8, xmm8 addps xmm5, xmm7 addps xmm5, xmm8 add r8, 16 add r9, 16 add r10, 16 sub ecx, 4 jna _tail _loop: movaps xmm6, [r8] subps xmm6, xmm0 movaps xmm7, [r9] subps xmm7, xmm1 movaps xmm8, [r10] subps xmm8, xmm2 mulps xmm6, xmm6 mulps xmm7, xmm7 mulps xmm8, xmm8 addps xmm6, xmm7 addps xmm6, xmm8 add r8, 16 add r9, 16 add r10, 16 movaps xmm7, xmm6 cmpps xmm6, xmm5, 1 minps xmm5, xmm7 movdqa xmm7, xmm6 pand xmm6, xmm4 pandn xmm7, xmm3 por xmm6, xmm7 movdqa xmm3, xmm6 paddd xmm4, xmm9 sub ecx, 4 ja _loop _tail: ; calculate horizontal minumum pshufd xmm0, xmm5, 0xB1 minps xmm0, xmm5 pshufd xmm1, xmm0, 0x4E minps xmm0, xmm1 ; find index of the minimum cmpps xmm0, xmm5, 0 movmskps eax, xmm0 bsf eax, eax ; index into xmm3, sort of movaps [rsp + 64], xmm3 mov eax, [rsp + 64 + rax * 4] movaps xmm9, [rsp + 48] movaps xmm8, [rsp + 32] movaps xmm7, [rsp + 16] movaps xmm6, [rsp] add rsp, 0x58 ret endproc_frame

En C ++:

extern "C" int find_closest(int n, float** points, float* reference_point);


Podría usar un operador ternario sin ramificación, a veces llamado bitselect (condición? Verdadero: falso).
Solo úselo para los 2 miembros, por defecto para no hacer nada.
No se preocupe por las operaciones adicionales, no son nada en comparación con la ramificación de la declaración if.

implementación de bitselect:

inline static int bitselect(int condition, int truereturnvalue, int falsereturnvalue) { return (truereturnvalue & -condition) | (falsereturnvalue & ~(-condition)); //a when TRUE and b when FALSE } inline static float bitselect(int condition, float truereturnvalue, float falsereturnvalue) { //Reinterpret floats. Would work because it''s just a bit select, no matter the actual value int& at = reinterpret_cast<int&>(truereturnvalue); int& af = reinterpret_cast<int&>(falsereturnvalue); int res = (at & -condition) | (af & ~(-condition)); //a when TRUE and b when FALSE return reinterpret_cast<float&>(res); }

Y tu bucle debería verse así:

for (int i=0; i < num_centroids; ++i) { const ClusterCentroid& cent = centroids[i]; const float dist = ...; bool isSmaeller = dist < pt.min_dist; //use same value if not smaller pt.min_index = bitselect(isSmaeller, i, pt.min_index); pt.min_dist = bitselect(isSmaeller, dist, pt.min_dist); }