zeros para libreria funciones funcion elementos crear con arreglos arreglo array agregar python numpy tensorflow theano numba
data_csv.zip

para - funciones de numpy python 3



Python: reescribe una función matemática numpy en bucle para ejecutar en GPU (5)

¿Puede alguien ayudarme a reescribir esta función (la función doTheMath ) para hacer los cálculos en la GPU? Utilicé algunos buenos días ahora tratando de entenderlo, pero sin resultados. Me pregunto que tal vez alguien pueda ayudarme a reescribir esta función de la manera que parezca más adecuada para el registro, ya que da el mismo resultado al final. Intenté usar @jit desde numba pero por alguna razón, en realidad es mucho más lento que ejecutar el código como de costumbre. Con un tamaño de muestra enorme, el objetivo es disminuir considerablemente el tiempo de ejecución, por lo que, naturalmente, creo que la GPU es la forma más rápida de hacerlo.

Te explico un poco lo que está pasando Los datos reales, que parecen casi idénticos a los datos de muestra creados en el código a continuación, se dividen en tamaños de muestra de aproximadamente 5.000.000 filas cada muestra o alrededor de 150 MB por archivo. En total hay alrededor de 600.000.000 filas o 20GB de datos. Debo recorrer estos datos, muestra por muestra y luego fila por fila en cada muestra, tomar las últimas 2000 (u otra) filas a partir de cada línea y ejecutar la función doTheMath que devuelve un resultado. Ese resultado luego se guarda en el disco duro donde puedo hacer otras cosas con otro programa. Como puede ver a continuación, no necesito todos los resultados de todas las filas, solo aquellos mayores a una cantidad específica. Si ejecuto mi función tal como está ahora en Python, obtengo unos 62 segundos por cada 1.000.000 de filas. Este es un tiempo muy largo considerando todos los datos y la rapidez con la que se debe hacer.

Debo mencionar que subo el archivo de datos reales a la RAM con la ayuda de data = joblib.load(file) por lo que la carga de los datos no es el problema, ya que solo toma alrededor de 0.29 segundos por archivo. Una vez cargado corro el código completo a continuación. Lo que lleva más tiempo es la función doTheMath . Estoy dispuesto a dar todos mis 500 puntos de reputación que tengo en stackoverflow como recompensa por alguien dispuesto a ayudarme a volver a escribir este código simple para ejecutarlo en la GPU. Mi interés está específicamente en la GPU, realmente quiero ver cómo se hace en este problema en cuestión.

EDITAR / ACTUALIZAR 1: Aquí hay un enlace a una pequeña muestra de los datos reales: data_csv.zip Unas 102000 filas de datos reales1 y 2000 filas para datos reales2a y datos2b. Use minimumLimit = 400 en los datos de muestra reales

EDITAR / ACTUALIZAR 2: Para quienes siguen esta publicación, aquí hay un breve resumen de las respuestas a continuación. Hasta ahora tenemos 4 respuestas a la solución original. El ofrecido por @Divakar son solo ajustes al código original. De los dos ajustes, solo el primero es realmente aplicable a este problema, el segundo es un buen ajuste, pero no se aplica aquí. De las otras tres respuestas, dos de ellas son soluciones basadas en CPU y una prueba de tensorflow-GPU. El Tensorflow-GPU de Paul Panzer parece ser prometedor, pero cuando lo ejecuto en la GPU es más lento que el original, por lo que el código aún necesita mejoras.

Las otras dos soluciones basadas en CPU son enviadas por @PaulPanzer (una solución de números puros) y @MSeifert (una solución numba). Ambas soluciones dan muy buenos resultados y ambos procesan datos extremadamente rápido en comparación con el código original. De los dos, el presentado por Paul Panzer es más rápido. Procesa unas 1.000.000 filas en unos 3 segundos. El único problema es con tamaños de lote más pequeños, esto se puede superar cambiando a la solución numba ofrecida por MSeifert, o incluso al código original después de todos los ajustes que se han analizado a continuación.

Estoy muy feliz y agradecida a @PaulPanzer y @MSeifert por el trabajo que hicieron en sus respuestas. Sin embargo, dado que esta es una pregunta sobre una solución basada en GPU, estoy esperando a ver si alguien está dispuesto a probarlo en una versión de GPU y ver cuánto más rápido pueden procesarse los datos en la GPU en comparación con la CPU actual. soluciones Si no hay otras respuestas que superen a la solución numérica pura de @PellPanzer, aceptaré su respuesta como la correcta y obtendré la recompensa :)

EDITAR / ACTUALIZAR 3: @Divakar ha publicado una nueva respuesta con una solución para la GPU. Después de mis pruebas en datos reales, la velocidad ni siquiera es comparable con las soluciones de contraparte de la CPU. La GPU procesa unos 5.000.000 en unos 1,5 segundos. Esto es increíble :) Estoy muy entusiasmado con la solución de GPU y agradezco a @Divakar por publicarla. Además, agradezco a @PaulPanzer y @MSeifert por sus soluciones de CPU :) Ahora mi investigación continúa con una velocidad increíble debido a la GPU :)

import pandas as pd import numpy as np import time def doTheMath(tmpData1, data2a, data2b): A = tmpData1[:, 0] B = tmpData1[:,1] C = tmpData1[:,2] D = tmpData1[:,3] Bmax = B.max() Cmin = C.min() dif = (Bmax - Cmin) abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4) return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() #Declare variables batchSize = 2000 sampleSize = 5000000 resultArray = [] minimumLimit = 490 #use 400 on the real sample data #Create Random Sample Data data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4))) data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit #approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b. #Loop through the data t0 = time.time() for rowNr in range(data1.shape[0]): tmp_df = data1[rowNr:rowNr + batchSize] #rolling window if(tmp_df.shape[0] == batchSize): result = doTheMath(tmp_df, data2a, data2b) if (result >= minimumLimit): resultArray.append([rowNr , result]) print(''Runtime:'', time.time() - t0) #Save data results resultArray = np.array(resultArray) print(resultArray[:,1].sum()) resultArray = pd.DataFrame({''index'':resultArray[:,0], ''result'':resultArray[:,1]}) resultArray.to_csv("Result Array.csv", sep='';'')

Las especificaciones de PC en las que estoy trabajando:

GTX970(4gb) video card; i7-4790K CPU 4.00Ghz; 16GB RAM; a SSD drive running Windows 7;

Como una pregunta adicional, ¿ayudaría una segunda tarjeta de video en SLI en este problema?


Antes de comenzar a ajustar el objetivo (GPU) o utilizar cualquier otra cosa (es decir, ejecuciones paralelas), es posible que desee considerar cómo mejorar el código ya existente. numba la numba numba, así que la numba para mejorar el código: primero operamos en matrices, no en matrices:

data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4))) data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit

Cada vez que llamas a doTheMath esperas que te doTheMath un entero, sin embargo, utilizas muchos arreglos y creas muchos arreglos intermedios:

abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4) return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

Esto crea una matriz intermedia en cada paso:

  • tmp1 = A-Cmin ,
  • tmp2 = tmp1 / dif ,
  • tmp3 = B - Cmin ,
  • tmp4 = tmp3 / dif
  • ... obtienes la esencia

Sin embargo, esta es una función de reducción (array -> integer), por lo que tener una gran cantidad de arreglos intermedios es un peso innecesario, simplemente calcule el valor de la "mosca".

import numba as nb @nb.njit def doTheMathNumba(tmpData, data2a, data2b): Bmax = np.max(tmpData[:, 1]) Cmin = np.min(tmpData[:, 2]) diff = (Bmax - Cmin) idiff = 1 / diff sum_ = 0 for i in range(tmpData.shape[0]): val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff if val <= data2a[i] and val >= data2b[i]: sum_ += 1 return sum_

Hice algo más aquí para evitar múltiples operaciones:

(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4 = ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4 = (A + B + C + D - 4 * Cmin) / (4 * dif) = (A + B + C + D) / (4 * dif) - (Cmin / dif)

Esto realmente reduce el tiempo de ejecución en casi un factor de 10 en mi computadora:

%timeit doTheMath(tmp_df, data2a, data2b) # 1000 loops, best of 3: 446 µs per loop %timeit doTheMathNumba(tmp_df, data2a, data2b) # 10000 loops, best of 3: 59 µs per loop

Ciertamente, también hay otras mejoras, por ejemplo, el uso de un mínimo / máximo rodante para calcular Bmax y Cmin , que realizaría al menos parte del cálculo en O(sampleSize) lugar de O(samplesize * batchsize) . Esto también haría posible reutilizar algunos de los cálculos (A + B + C + D) / (4 * dif) - (Cmin / dif) porque si Cmin y Bmax no cambian para la siguiente muestra, estos valores no '' t difieren Es un poco complicado de hacer porque las comparaciones difieren. ¡Pero definitivamente posible! Mira aquí:

import time import numpy as np import numba as nb @nb.njit def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin): diff = (Bmax - Cmin) idiff = 1 / diff quarter_idiff = 0.25 * idiff sum_ = 0 for i in range(abcd.shape[0]): val = abcd[i] * quarter_idiff - Cmin * idiff if val <= data2a[i] and val >= data2b[i]: sum_ += 1 return sum_ @nb.njit def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray): found = 0 for rowNr in range(data1.shape[0]): if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize): result = doTheMathNumba(abcd[rowNr:rowNr + batchSize], data2a, data2b, Bmaxs[rowNr], Cmins[rowNr]) if (result >= minimumLimit): resultArray[found, 0] = rowNr resultArray[found, 1] = result found += 1 return resultArray[:found] #Declare variables batchSize = 2000 sampleSize = 50000 resultArray = [] minimumLimit = 490 #use 400 on the real sample data data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4))) data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit from scipy import ndimage t0 = time.time() abcd = np.sum(data1, axis=1) Bmaxs = ndimage.maximum_filter1d(data1[:, 1], size=batchSize, origin=-((batchSize-1)//2-1)) # correction for even shapes Cmins = ndimage.minimum_filter1d(data1[:, 2], size=batchSize, origin=-((batchSize-1)//2-1)) result = np.zeros((sampleSize, 2), dtype=np.int64) doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result) print(''Runtime:'', time.time() - t0)

Esto me da un Runtime: 0.759593152999878 (¡después de que Numba compiló las funciones!), Mientras que su versión original tenía Runtime: 24.68975639343262 . ¡Ahora somos 30 veces más rápido!

Con el tamaño de la muestra, todavía toma Runtime: 60.187848806381226 pero eso no es tan malo, ¿verdad?

E incluso si no lo he hecho yo mismo, numba dice que es posible escribir "Numba para las GPU de CUDA" y no parece complicado.


Aquí hay algo de código para demostrar lo que es posible simplemente ajustando el algoritmo. Es un número puro, pero en los datos de muestra que publicaste se obtiene una aceleración de aproximadamente 35 veces con respecto a la versión original (~ 1,000,000 de muestras en ~ 2.5 s en mi máquina bastante modesta):

>>> result_dict = master(''run'') [(''load'', 0.82578349113464355), (''precomp'', 0.028138399124145508), (''max/min'', 0.24333405494689941), (''ABCD'', 0.015314102172851562), (''main'', 1.3356468677520752)] TOTAL 2.44821691513

Ajustes utilizados:

  • A + B + C + D, mira mi otra respuesta
  • ejecutar min / max, incluso evitar calcular (A + B + C + D - 4Cmin) / (4dif) varias veces con el mismo Cmin / dif.

Estos son más o menos rutinarios. Eso deja la comparación con data2a / b, que es caro O (NK) donde N es el número de muestras y K es el tamaño de la ventana. Aquí uno puede aprovechar los datos relativamente bien comportados. El uso de min / max en ejecución puede crear variantes de data2a / b que se pueden usar para probar un rango de compensaciones de ventana a la vez, si la prueba falla, todas estas compensaciones se pueden descartar de inmediato, de lo contrario, el rango se divide.

import numpy as np import time # global variables; they will hold the precomputed pre-screening filters preA, preB = {}, {} CHUNK_SIZES = None def sliding_argmax(data, K=2000): """compute the argmax of data over a sliding window of width K returns: indices -- indices into data switches -- window offsets at which the maximum changes (strictly speaking: where the index of the maximum changes) excludes 0 but includes maximum offset (len(data)-K+1) see last line of compute_pre_screening_filter for a recipe to convert this representation to the vector of maxima """ N = len(data) last = np.argmax(data[:K]) indices = [last] while indices[-1] <= N - 1: ge = np.where(data[last + 1 : last + K + 1] > data[last])[0] if len(ge) == 0: if last + K >= N: break last += 1 + np.argmax(data[last + 1 : last + K + 1]) indices.append(last) else: last += 1 + ge[0] indices.append(last) indices = np.array(indices) switches = np.where(data[indices[1:]] > data[indices[:-1]], indices[1:] + (1-K), indices[:-1] + 1) return indices, np.r_[switches, [len(data)-K+1]] def compute_pre_screening_filter(bound, n_offs): """compute pre-screening filter for point-wise upper bound given a K-vector of upper bounds B and K+n_offs-1-vector data compute K+n_offs-1-vector filter such that for each index j if for any offset 0 <= o < n_offs and index 0 <= i < K such that o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j therefore the number of data points below filter is an upper bound for the maximum number of points below bound in any K-window in data """ pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:]) padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)] indices, switches = sliding_argmax(padded, n_offs) return padded[indices].repeat(np.diff(np.r_[[0], switches])) def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6): """compute upper and lower pre-screening filters for data blocks of sizes K+n_offs-1 where n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk the result is stored in global variables preA and preB """ global CHUNK_SIZES CHUNK_SIZES = min_chnk * 2**np.arange(dyads) preA[1] = upper preB[1] = lower for n in CHUNK_SIZES: preA[n] = compute_pre_screening_filter(upper, n) preB[n] = -compute_pre_screening_filter(-lower, n) def test_bounds(block, counts, threshold=400): """test whether the windows fitting in the data block ''block'' fall within the bounds using pre-screening for efficient bulk rejection array ''counts'' will be overwritten with the counts of compliant samples note that accurate counts will only be returned for above threshold windows, because the analysis of bulk rejected windows is short-circuited also note that bulk rejection only works for ''well behaved'' data and for example not on random numbers """ N = len(counts) K = len(preA[1]) r = N % CHUNK_SIZES[0] # chop up N into as large as possible chunks with matching pre computed # filters # start with small and work upwards counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) & (block[l:l+K] >= preB[1])) for l in range(r)] def bisect(block, counts): M = len(counts) cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M])) if cnts < threshold: counts[:] = cnts return elif M == CHUNK_SIZES[0]: counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) & (block[l:l+K] >= preB[1])) for l in range(M)] else: M //= 2 bisect(block[:-M], counts[:M]) bisect(block[M:], counts[M:]) N = N // CHUNK_SIZES[0] for M in CHUNK_SIZES: if N % 2: bisect(block[r:r+M+K-1], counts[r:r+M]) r += M elif N == 0: return N //= 2 else: for j in range(2*N): bisect(block[r:r+M+K-1], counts[r:r+M]) r += M def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6, threshold=400): samples, upper, lower = data N, K = samples.shape[0], upper.shape[0] times = [time.time()] if use_pre_screening: compute_all_pre_screening_filters(upper, lower, min_chnk, dyads) times.append(time.time()) # compute switching points of max and min for running normalisation upper_inds, upper_swp = sliding_argmax(samples[:, 1], K) lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K) times.append(time.time()) # sum columns ABCD = samples.sum(axis=-1) times.append(time.time()) counts = np.empty((N-K+1,), dtype=int) # main loop # loop variables: offs = 0 u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0] l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0] while True: # check which is switching next, min(C) or max(B) if u_swp > l_swp: # greedily take the largest block possible such that dif and Cmin # do not change block = (ABCD[offs:l_swp+K-1] - 4*l_scale) / * (0.25 / (u_scale-l_scale)) if use_pre_screening: test_bounds(block, counts[offs:l_swp], threshold=threshold) else: counts[offs:l_swp] = [ np.count_nonzero((block[l:l+K] <= upper) & (block[l:l+K] >= lower)) for l in range(l_swp - offs)] # book keeping l_ind += 1 offs = l_swp l_swp = lower_swp[l_ind] l_scale = samples[lower_inds[l_ind], 2] else: block = (ABCD[offs:u_swp+K-1] - 4*l_scale) / * (0.25 / (u_scale-l_scale)) if use_pre_screening: test_bounds(block, counts[offs:u_swp], threshold=threshold) else: counts[offs:u_swp] = [ np.count_nonzero((block[l:l+K] <= upper) & (block[l:l+K] >= lower)) for l in range(u_swp - offs)] u_ind += 1 if u_ind == len(upper_inds): assert u_swp == N-K+1 break offs = u_swp u_swp = upper_swp[u_ind] u_scale = samples[upper_inds[u_ind], 1] times.append(time.time()) return {''counts'': counts, ''valid'': np.where(counts >= 400)[0], ''timings'': np.diff(times)} def master(mode=''calibrate'', data=''fake'', use_pre_screening=True, nrep=3, min_chnk=None, dyads=None): t = time.time() if data in (''fake'', ''load''): data1 = np.loadtxt(''data1.csv'', delimiter='';'', skiprows=1, usecols=[1,2,3,4]) data2a = np.loadtxt(''data2a.csv'', delimiter='';'', skiprows=1, usecols=[1]) data2b = np.loadtxt(''data2b.csv'', delimiter='';'', skiprows=1, usecols=[1]) if data == ''fake'': data1 = np.tile(data1, (10, 1)) threshold = 400 elif data == ''random'': data1 = np.random.random((102000, 4)) data2b = np.random.random(2000) data2a = np.random.random(2000) threshold = 490 if use_pre_screening or mode == ''calibrate'': print(''WARNING: pre-screening not efficient on artificial data'') else: raise ValueError("data mode {} not recognised".format(data)) data = data1, data2a, data2b t_load = time.time() - t if mode == ''calibrate'': min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads timings = np.zeros((len(min_chnk), len(dyads))) print(''max bisect '' + '' ''.join([ '' n.a.'' if dy == 0 else ''{:7d}''.format(dy) for dy in dyads]), end='''') for i, mc in enumerate(min_chnk): print(''/nmin chunk {}''.format(mc), end='' '') for j, dy in enumerate(dyads): for k in range(nrep): if dy == 0: # no pre-screening timings[i, j] += analyse( data, False, mc, dy, threshold)[''timings''][3] else: timings[i, j] += analyse( data, True, mc, dy, threshold)[''timings''][3] timings[i, j] /= nrep print(''{:7.3f}''.format(timings[i, j]), end='' '', flush=True) best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()), timings.shape) print(''/nbest'', min_chnk[best_mc], dyads[best_dy]) return timings, min_chnk[best_mc], dyads[best_dy] if mode == ''run'': min_chnk = 2 if min_chnk is None else min_chnk dyads = 5 if dyads is None else dyads res = analyse(data, use_pre_screening, min_chnk, dyads, threshold) times = np.r_[[t_load], res[''timings'']] print(list(zip((''load'', ''precomp'', ''max/min'', ''ABCD'', ''main''), times))) print(''TOTAL'', times.sum()) return res


Introducción y código de solución.

Bueno, lo pediste! Por lo tanto, en esta publicación se PyCUDA una implementación con PyCUDA que utiliza envolturas ligeras que extienden la mayoría de las capacidades de CUDA dentro del entorno de Python. Tendremos la funcionalidad SourceModule que nos permite escribir y compilar los núcleos CUDA que permanecen en el entorno Python.

Para resolver el problema que nos ocupa, entre los cálculos involucrados, tenemos deslizamiento máximo y mínimo, pocas diferencias y divisiones y comparaciones. Para las partes máximas y mínimas, que involucran la búsqueda de bloque máximo (para cada ventana deslizante), usaremos la técnica de reducción como se explica con más detalle here . Esto se haría a nivel de bloque. Para las iteraciones de nivel superior a través de ventanas deslizantes, usaríamos la indexación de nivel de cuadrícula en los recursos de CUDA. Para obtener más información sobre este bloque y formato de cuadrícula, consulte la page-18 . PyCUDA también admite incorporaciones para reducciones de cómputo como max y min, pero perdemos el control, específicamente pretendemos usar memoria especializada como compartida y memoria constante para aprovechar GPU en su nivel casi óptimo.

Listado del código de la solución PyCUDA-NumPy -

1] parte de PyCUDA -

import pycuda.autoinit import pycuda.driver as drv import numpy as np from pycuda.compiler import SourceModule mod = SourceModule(""" #define TBP 1024 // THREADS_PER_BLOCK __device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset) { int tid = threadIdx.x; int inv = TBP; __shared__ float dS[TBP][2]; dS[tid][0] = d1[tid+offset]; dS[tid][1] = d2[tid+offset]; __syncthreads(); if(tid<L-TBP) { dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]); dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]); } __syncthreads(); inv = inv/2; while(inv!=0) { if(tid<inv) { dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]); dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]); } __syncthreads(); inv = inv/2; } __syncthreads(); if(tid==0) { out[0] = dS[0][0]; out[1] = dS[0][1]; } __syncthreads(); } __global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN) { int L = BLOCKLEN[0]; int tid = threadIdx.x; int iterID = blockIdx.x; float Bmax_Cmin[2]; int inv; float Cmin, dif; __shared__ float dS[TBP*2]; get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID); Cmin = Bmax_Cmin[1]; dif = (Bmax_Cmin[0] - Cmin); inv = TBP; dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif); __syncthreads(); if(tid<L-TBP) dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif); dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0; __syncthreads(); if(tid<L-TBP) dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0; __syncthreads(); inv = inv/2; while(inv!=0) { if(tid<inv) dS[tid] += dS[tid+inv]; __syncthreads(); inv = inv/2; } if(tid==0) out[iterID] = dS[0]; __syncthreads(); } """)

Tenga en cuenta que THREADS_PER_BLOCK, TBP se establecerá en función del batchSize . La regla de oro aquí es asignar una potencia de 2 valores a TBP que sea menor que batchSize . Por lo tanto, para batchSize = 2000 , necesitábamos TBP como 1024 .

2] NumPy part -

def gpu_app_v1(A, B, C, D, batchSize, minimumLimit): func1 = mod.get_function("main1") outlen = len(A)-batchSize+1 # Set block and grid sizes BSZ = (1024,1,1) GSZ = (outlen,1) dest = np.zeros(outlen).astype(np.float32) N = np.int32(batchSize) func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), / drv.In(data2b), drv.In(data2a),/ drv.In(N), block=BSZ, grid=GSZ) idx = np.flatnonzero(dest >= minimumLimit) return idx, dest[idx]

Benchmarking

He probado en GTX 960M. Tenga en cuenta que PyCUDA espera que las matrices sean de orden contiguo. Por lo tanto, tenemos que cortar las columnas y hacer copias. Estoy esperando / asumiendo que los datos podrían leerse de los archivos de tal manera que los datos se distribuyan a lo largo de filas en lugar de ser como columnas. Por lo tanto, mantener a aquellos fuera de la función de evaluación comparativa por ahora.

Enfoque original -

def org_app(data1, batchSize, minimumLimit): resultArray = [] for rowNr in range(data1.shape[0]-batchSize+1): tmp_df = data1[rowNr:rowNr + batchSize] #rolling window result = doTheMath(tmp_df, data2a, data2b) if (result >= minimumLimit): resultArray.append([rowNr , result]) return resultArray

Tiempos y verificación -

In [2]: #Declare variables ...: batchSize = 2000 ...: sampleSize = 50000 ...: resultArray = [] ...: minimumLimit = 490 #use 400 on the real sample data ...: ...: #Create Random Sample Data ...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32) ...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32) ...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32) ...: ...: # Make column copies ...: A = data1[:,0].copy() ...: B = data1[:,1].copy() ...: C = data1[:,2].copy() ...: D = data1[:,3].copy() ...: ...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit) ...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T ...: print(np.allclose(gpu_out1, cpu_out1)) ...: print(np.allclose(gpu_out2, cpu_out2)) ...: True False

Entonces, hay algunas diferencias entre los recuentos de CPU y GPU. Investiguémoslos

In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2)) In [8]: idx Out[8]: array([12776, 15208, 17620, 18326]) In [9]: gpu_out2[idx] - cpu_out2[idx] Out[9]: array([-1., -1., 1., 1.])

Hay cuatro instancias de cuentas no coincidentes. Estos están desactivados al máximo por 1 . En la investigación, me encontré con alguna información sobre esto. Básicamente, ya que estamos usando los cálculos de matemática para los cálculos de max y min, y creo que esos están causando que el último bit binario en la representación de pt flotante sea diferente al de la contraparte de la CPU. Esto se denomina error ULP y se ha discutido en detalle here y here .

Finalmente, dejando de lado el problema, vamos a lo más importante, el rendimiento:

In [10]: %timeit org_app(data1, batchSize, minimumLimit) 1 loops, best of 3: 2.18 s per loop In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit) 10 loops, best of 3: 82.5 ms per loop In [12]: 2180.0/82.5 Out[12]: 26.424242424242426

Probemos con conjuntos de datos más grandes. Con sampleSize = 500000 , obtenemos -

In [14]: %timeit org_app(data1, batchSize, minimumLimit) 1 loops, best of 3: 23.2 s per loop In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit) 1 loops, best of 3: 821 ms per loop In [16]: 23200.0/821 Out[16]: 28.25822168087698

Por lo tanto, la aceleración se mantiene constante en alrededor de 27 .

Limitaciones:

1) Estamos utilizando números float32 , ya que las GPU funcionan mejor con esos. La precisión doble, especialmente en las GPU que no son de servidor, no es popular cuando se trata de rendimiento y, como está trabajando con una GPU de este tipo, probé con float32.

Mejoramiento adicional :

1) Podríamos usar una constant memory más rápida para alimentar data2a y data2b , en lugar de usar global memory .


Tweak # 1

Por lo general, se recomienda vectorizar las cosas cuando se trabaja con matrices NumPy. Pero con arreglos muy grandes, creo que estás fuera de opciones allí. Por lo tanto, para mejorar el rendimiento, es posible optimizar un pequeño ajuste en el último paso de la suma.

Podríamos reemplazar el paso que hace una matriz de 1s y 0s y que suma:

np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()

con np.count_nonzero que funciona de manera eficiente para contar valores True en una matriz booleana, en lugar de convertir a 1s y 0s -

np.count_nonzero((abcd <= data2a) & (abcd >= data2b))

Prueba de tiempo de ejecución

In [45]: abcd = np.random.randint(11,99,(10000)) In [46]: data2a = np.random.randint(11,99,(10000)) In [47]: data2b = np.random.randint(11,99,(10000)) In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() 10000 loops, best of 3: 81.8 µs per loop In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b)) 10000 loops, best of 3: 28.8 µs per loop

Tweak # 2

Utilice un recíproco precalculado cuando se trate de casos que se someten a radiodifusión implícita. Un poco más de información here . Por lo tanto, almacene recíproco de dif y use eso en lugar del paso:

((((A - Cmin) / dif) + ((B - Cmin) / dif) + ...

Prueba de muestra

In [52]: A = np.random.rand(10000) In [53]: dif = 0.5 In [54]: %timeit A/dif 10000 loops, best of 3: 25.8 µs per loop In [55]: %timeit A*(1.0/dif) 100000 loops, best of 3: 7.94 µs per loop

Tienes cuatro lugares usando división por dif . Así que, ¡espero que esto también produzca un impulso notable allí!


Esto es técnicamente fuera de tema (no GPU) pero estoy seguro de que estarás interesado.

Hay un ahorro obvio y bastante grande:

Precomputa A + B + C + D(no en el bucle, en todos los datos :) data1.sum(axis=-1), porque abcd = ((A+B+C+D) - 4Cmin) / (4dif). Esto ahorrará bastantes operaciones.

Sorprendió que nadie lo viera antes ;-)

Editar:

Hay otra cosa, aunque sospecho que solo está en su ejemplo, no en sus datos reales:

Como se encuentra aproximadamente la mitad de data2aserá menor que data2b. En estos lugares, sus condiciones abcdno pueden ser verdaderas, por lo que ni siquiera debe calcularlas abcdallí.

Editar:

Una modificación más que utilicé a continuación, pero olvidé mencionar: si calcula el máximo (o mínimo) sobre una ventana móvil. Cuando mueva un punto a la derecha, diga, ¿qué tan probable es que cambie el máximo? Solo hay dos cosas que pueden cambiarlo: el nuevo punto a la derecha es más grande (ocurre aproximadamente una vez en el tiempo de duración de la ventana, e incluso si sucede, inmediatamente se sabe el nuevo máximo) o el antiguo máximo se cae de la ventana de la izquierda (También ocurre aproximadamente una vez en tiempos de windowlength). Solo en este último caso debe buscar el nuevo máximo en toda la ventana.

Editar:

No pude resistirme a probarlo en tensorflow. No tengo una GPU, así que tú mismo tienes que probar la velocidad. Ponga "gpu" para "cpu" en la línea marcada.

En la CPU, es aproximadamente la mitad de rápido que su implementación original (es decir, sin los ajustes de Divakar). Tenga en cuenta que me he tomado la libertad de cambiar las entradas de matriz a matriz simple. Actualmente, tensorflow es un poco un objetivo móvil, así que asegúrese de tener la versión correcta. Usé Python3.6 y tf 0.12.1 Si haces un pip3 instala tensorflow-gpu hoy debería Podría funcionar.

import numpy as np import time import tensorflow as tf # currently the max/min code is sequential # thus parallel_iterations = 1 # but you can put this in a separate loop, precompute and then try and run # the remainder of doTheMathTF with a larger parallel_iterations # tensorflow is quite capricious about its data types ddf = tf.float64 ddi = tf.int32 def worker(data1, data2a, data2b): ################################### # CHANGE cpu to gpu in next line! # ################################### with tf.device(''/cpu:0''): g = tf.Graph () with g.as_default(): ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf) B = tf.constant(data1[:, 1], dtype=ddf) C = tf.constant(data1[:, 2], dtype=ddf) window = tf.constant(len(data2a)) N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi) data2a = tf.constant(data2a, dtype=ddf) data2b = tf.constant(data2b, dtype=ddf) def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out): # most of the time we can keep the old max/min Bmaxind = tf.cond(Bmaxind<i, lambda: i + tf.to_int32( tf.argmax(B[i:i+window], axis=0)), lambda: tf.cond(Bmax>B[i+window-1], lambda: Bmaxind, lambda: i+window-1)) Cminind = tf.cond(Cminind<i, lambda: i + tf.to_int32( tf.argmin(C[i:i+window], axis=0)), lambda: tf.cond(Cmin<C[i+window-1], lambda: Cminind, lambda: i+window-1)) Bmax = B[Bmaxind] Cmin = C[Cminind] abcd = (ABCD[i:i+window] - 4 * Cmin) * (1 / (4 * (Bmax-Cmin))) out = out.write(i, tf.to_int32( tf.count_nonzero(tf.logical_and(abcd <= data2a, abcd >= data2b)))) return i + 1, Bmax, Bmaxind, Cmin, Cminind, out with tf.Session(graph=g) as sess: i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop( lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF, (tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi), tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi), tf.TensorArray(ddi, size=N)), shape_invariants=None, parallel_iterations=parallel_iterations, back_prop=False) out = out.pack() sess.run(tf.initialize_all_variables()) out, = sess.run((out,)) return out #Declare variables batchSize = 2000 sampleSize = 50000#00 resultArray = [] #Create Sample Data data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4)) data2a = np.random.uniform(0, 1, (batchSize,)) data2b = np.random.uniform(0, 1, (batchSize,)) t0 = time.time() out = worker(data1, data2a, data2b) print(''Runtime (tensorflow):'', time.time() - t0) good_indices, = np.where(out >= 490) res_tf = np.c_[good_indices, out[good_indices]] def doTheMath(tmpData1, data2a, data2b): A = tmpData1[:, 0] B = tmpData1[:,1] C = tmpData1[:,2] D = tmpData1[:,3] Bmax = B.max() Cmin = C.min() dif = (Bmax - Cmin) abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4) return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum() #Loop through the data t0 = time.time() for rowNr in range(sampleSize+1): tmp_df = data1[rowNr:rowNr + batchSize] #rolling window result = doTheMath(tmp_df, data2a, data2b) if (result >= 490): resultArray.append([rowNr , result]) print(''Runtime (original):'', time.time() - t0) print(np.alltrue(np.array(resultArray)==res_tf))