para - funciones de numpy python 3
Python: reescribe una función matemática numpy en bucle para ejecutar en GPU (5)
¿Puede alguien ayudarme a reescribir esta función (la función doTheMath
) para hacer los cálculos en la GPU? Utilicé algunos buenos días ahora tratando de entenderlo, pero sin resultados. Me pregunto que tal vez alguien pueda ayudarme a reescribir esta función de la manera que parezca más adecuada para el registro, ya que da el mismo resultado al final. Intenté usar @jit
desde numba
pero por alguna razón, en realidad es mucho más lento que ejecutar el código como de costumbre. Con un tamaño de muestra enorme, el objetivo es disminuir considerablemente el tiempo de ejecución, por lo que, naturalmente, creo que la GPU es la forma más rápida de hacerlo.
Te explico un poco lo que está pasando Los datos reales, que parecen casi idénticos a los datos de muestra creados en el código a continuación, se dividen en tamaños de muestra de aproximadamente 5.000.000 filas cada muestra o alrededor de 150 MB por archivo. En total hay alrededor de 600.000.000 filas o 20GB de datos. Debo recorrer estos datos, muestra por muestra y luego fila por fila en cada muestra, tomar las últimas 2000 (u otra) filas a partir de cada línea y ejecutar la función doTheMath
que devuelve un resultado. Ese resultado luego se guarda en el disco duro donde puedo hacer otras cosas con otro programa. Como puede ver a continuación, no necesito todos los resultados de todas las filas, solo aquellos mayores a una cantidad específica. Si ejecuto mi función tal como está ahora en Python, obtengo unos 62 segundos por cada 1.000.000 de filas. Este es un tiempo muy largo considerando todos los datos y la rapidez con la que se debe hacer.
Debo mencionar que subo el archivo de datos reales a la RAM con la ayuda de data = joblib.load(file)
por lo que la carga de los datos no es el problema, ya que solo toma alrededor de 0.29 segundos por archivo. Una vez cargado corro el código completo a continuación. Lo que lleva más tiempo es la función doTheMath
. Estoy dispuesto a dar todos mis 500 puntos de reputación que tengo en stackoverflow como recompensa por alguien dispuesto a ayudarme a volver a escribir este código simple para ejecutarlo en la GPU. Mi interés está específicamente en la GPU, realmente quiero ver cómo se hace en este problema en cuestión.
EDITAR / ACTUALIZAR 1: Aquí hay un enlace a una pequeña muestra de los datos reales: data_csv.zip Unas 102000 filas de datos reales1 y 2000 filas para datos reales2a y datos2b. Use minimumLimit = 400
en los datos de muestra reales
EDITAR / ACTUALIZAR 2: Para quienes siguen esta publicación, aquí hay un breve resumen de las respuestas a continuación. Hasta ahora tenemos 4 respuestas a la solución original. El ofrecido por @Divakar son solo ajustes al código original. De los dos ajustes, solo el primero es realmente aplicable a este problema, el segundo es un buen ajuste, pero no se aplica aquí. De las otras tres respuestas, dos de ellas son soluciones basadas en CPU y una prueba de tensorflow-GPU. El Tensorflow-GPU de Paul Panzer parece ser prometedor, pero cuando lo ejecuto en la GPU es más lento que el original, por lo que el código aún necesita mejoras.
Las otras dos soluciones basadas en CPU son enviadas por @PaulPanzer (una solución de números puros) y @MSeifert (una solución numba). Ambas soluciones dan muy buenos resultados y ambos procesan datos extremadamente rápido en comparación con el código original. De los dos, el presentado por Paul Panzer es más rápido. Procesa unas 1.000.000 filas en unos 3 segundos. El único problema es con tamaños de lote más pequeños, esto se puede superar cambiando a la solución numba ofrecida por MSeifert, o incluso al código original después de todos los ajustes que se han analizado a continuación.
Estoy muy feliz y agradecida a @PaulPanzer y @MSeifert por el trabajo que hicieron en sus respuestas. Sin embargo, dado que esta es una pregunta sobre una solución basada en GPU, estoy esperando a ver si alguien está dispuesto a probarlo en una versión de GPU y ver cuánto más rápido pueden procesarse los datos en la GPU en comparación con la CPU actual. soluciones Si no hay otras respuestas que superen a la solución numérica pura de @PellPanzer, aceptaré su respuesta como la correcta y obtendré la recompensa :)
EDITAR / ACTUALIZAR 3: @Divakar ha publicado una nueva respuesta con una solución para la GPU. Después de mis pruebas en datos reales, la velocidad ni siquiera es comparable con las soluciones de contraparte de la CPU. La GPU procesa unos 5.000.000 en unos 1,5 segundos. Esto es increíble :) Estoy muy entusiasmado con la solución de GPU y agradezco a @Divakar por publicarla. Además, agradezco a @PaulPanzer y @MSeifert por sus soluciones de CPU :) Ahora mi investigación continúa con una velocidad increíble debido a la GPU :)
import pandas as pd
import numpy as np
import time
def doTheMath(tmpData1, data2a, data2b):
A = tmpData1[:, 0]
B = tmpData1[:,1]
C = tmpData1[:,2]
D = tmpData1[:,3]
Bmax = B.max()
Cmin = C.min()
dif = (Bmax - Cmin)
abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
#Declare variables
batchSize = 2000
sampleSize = 5000000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data
#Create Random Sample Data
data1 = np.matrix(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #upper limit
data2b = np.matrix(np.random.uniform(0, 1, (batchSize, 1))) #lower limit
#approx. half of data2a will be smaller than data2b, but that is only in the sample data because it is randomly generated, NOT the real data. The real data2a is always higher than data2b.
#Loop through the data
t0 = time.time()
for rowNr in range(data1.shape[0]):
tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
if(tmp_df.shape[0] == batchSize):
result = doTheMath(tmp_df, data2a, data2b)
if (result >= minimumLimit):
resultArray.append([rowNr , result])
print(''Runtime:'', time.time() - t0)
#Save data results
resultArray = np.array(resultArray)
print(resultArray[:,1].sum())
resultArray = pd.DataFrame({''index'':resultArray[:,0], ''result'':resultArray[:,1]})
resultArray.to_csv("Result Array.csv", sep='';'')
Las especificaciones de PC en las que estoy trabajando:
GTX970(4gb) video card;
i7-4790K CPU 4.00Ghz;
16GB RAM;
a SSD drive
running Windows 7;
Como una pregunta adicional, ¿ayudaría una segunda tarjeta de video en SLI en este problema?
Antes de comenzar a ajustar el objetivo (GPU) o utilizar cualquier otra cosa (es decir, ejecuciones paralelas), es posible que desee considerar cómo mejorar el código ya existente. numba la numba numba, así que la numba para mejorar el código: primero operamos en matrices, no en matrices:
data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit
Cada vez que llamas a doTheMath
esperas que te doTheMath
un entero, sin embargo, utilizas muchos arreglos y creas muchos arreglos intermedios:
abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
Esto crea una matriz intermedia en cada paso:
-
tmp1 = A-Cmin
, -
tmp2 = tmp1 / dif
, -
tmp3 = B - Cmin
, -
tmp4 = tmp3 / dif
- ... obtienes la esencia
Sin embargo, esta es una función de reducción (array -> integer), por lo que tener una gran cantidad de arreglos intermedios es un peso innecesario, simplemente calcule el valor de la "mosca".
import numba as nb
@nb.njit
def doTheMathNumba(tmpData, data2a, data2b):
Bmax = np.max(tmpData[:, 1])
Cmin = np.min(tmpData[:, 2])
diff = (Bmax - Cmin)
idiff = 1 / diff
sum_ = 0
for i in range(tmpData.shape[0]):
val = (tmpData[i, 0] + tmpData[i, 1] + tmpData[i, 2] + tmpData[i, 3]) / 4 * idiff - Cmin * idiff
if val <= data2a[i] and val >= data2b[i]:
sum_ += 1
return sum_
Hice algo más aquí para evitar múltiples operaciones:
(((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4
= ((A - Cmin + B - Cmin + C - Cmin + D - Cmin) / dif) / 4
= (A + B + C + D - 4 * Cmin) / (4 * dif)
= (A + B + C + D) / (4 * dif) - (Cmin / dif)
Esto realmente reduce el tiempo de ejecución en casi un factor de 10 en mi computadora:
%timeit doTheMath(tmp_df, data2a, data2b) # 1000 loops, best of 3: 446 µs per loop
%timeit doTheMathNumba(tmp_df, data2a, data2b) # 10000 loops, best of 3: 59 µs per loop
Ciertamente, también hay otras mejoras, por ejemplo, el uso de un mínimo / máximo rodante para calcular Bmax
y Cmin
, que realizaría al menos parte del cálculo en O(sampleSize)
lugar de O(samplesize * batchsize)
. Esto también haría posible reutilizar algunos de los cálculos (A + B + C + D) / (4 * dif) - (Cmin / dif)
porque si Cmin
y Bmax
no cambian para la siguiente muestra, estos valores no '' t difieren Es un poco complicado de hacer porque las comparaciones difieren. ¡Pero definitivamente posible! Mira aquí:
import time
import numpy as np
import numba as nb
@nb.njit
def doTheMathNumba(abcd, data2a, data2b, Bmax, Cmin):
diff = (Bmax - Cmin)
idiff = 1 / diff
quarter_idiff = 0.25 * idiff
sum_ = 0
for i in range(abcd.shape[0]):
val = abcd[i] * quarter_idiff - Cmin * idiff
if val <= data2a[i] and val >= data2b[i]:
sum_ += 1
return sum_
@nb.njit
def doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, resultArray):
found = 0
for rowNr in range(data1.shape[0]):
if(abcd[rowNr:rowNr + batchSize].shape[0] == batchSize):
result = doTheMathNumba(abcd[rowNr:rowNr + batchSize],
data2a, data2b, Bmaxs[rowNr], Cmins[rowNr])
if (result >= minimumLimit):
resultArray[found, 0] = rowNr
resultArray[found, 1] = result
found += 1
return resultArray[:found]
#Declare variables
batchSize = 2000
sampleSize = 50000
resultArray = []
minimumLimit = 490 #use 400 on the real sample data
data1 = np.array(np.random.uniform(1, 100, (sampleSize + batchSize, 4)))
data2a = np.array(np.random.uniform(0, 1, batchSize)) #upper limit
data2b = np.array(np.random.uniform(0, 1, batchSize)) #lower limit
from scipy import ndimage
t0 = time.time()
abcd = np.sum(data1, axis=1)
Bmaxs = ndimage.maximum_filter1d(data1[:, 1],
size=batchSize,
origin=-((batchSize-1)//2-1)) # correction for even shapes
Cmins = ndimage.minimum_filter1d(data1[:, 2],
size=batchSize,
origin=-((batchSize-1)//2-1))
result = np.zeros((sampleSize, 2), dtype=np.int64)
doloop(data1, data2a, data2b, abcd, Bmaxs, Cmins, batchSize, sampleSize, minimumLimit, result)
print(''Runtime:'', time.time() - t0)
Esto me da un Runtime: 0.759593152999878
(¡después de que Numba compiló las funciones!), Mientras que su versión original tenía Runtime: 24.68975639343262
. ¡Ahora somos 30 veces más rápido!
Con el tamaño de la muestra, todavía toma Runtime: 60.187848806381226
pero eso no es tan malo, ¿verdad?
E incluso si no lo he hecho yo mismo, numba dice que es posible escribir "Numba para las GPU de CUDA" y no parece complicado.
Aquí hay algo de código para demostrar lo que es posible simplemente ajustando el algoritmo. Es un número puro, pero en los datos de muestra que publicaste se obtiene una aceleración de aproximadamente 35 veces con respecto a la versión original (~ 1,000,000 de muestras en ~ 2.5 s en mi máquina bastante modesta):
>>> result_dict = master(''run'')
[(''load'', 0.82578349113464355), (''precomp'', 0.028138399124145508), (''max/min'', 0.24333405494689941), (''ABCD'', 0.015314102172851562), (''main'', 1.3356468677520752)]
TOTAL 2.44821691513
Ajustes utilizados:
- A + B + C + D, mira mi otra respuesta
- ejecutar min / max, incluso evitar calcular (A + B + C + D - 4Cmin) / (4dif) varias veces con el mismo Cmin / dif.
Estos son más o menos rutinarios. Eso deja la comparación con data2a / b, que es caro O (NK) donde N es el número de muestras y K es el tamaño de la ventana. Aquí uno puede aprovechar los datos relativamente bien comportados. El uso de min / max en ejecución puede crear variantes de data2a / b que se pueden usar para probar un rango de compensaciones de ventana a la vez, si la prueba falla, todas estas compensaciones se pueden descartar de inmediato, de lo contrario, el rango se divide.
import numpy as np
import time
# global variables; they will hold the precomputed pre-screening filters
preA, preB = {}, {}
CHUNK_SIZES = None
def sliding_argmax(data, K=2000):
"""compute the argmax of data over a sliding window of width K
returns:
indices -- indices into data
switches -- window offsets at which the maximum changes
(strictly speaking: where the index of the maximum changes)
excludes 0 but includes maximum offset (len(data)-K+1)
see last line of compute_pre_screening_filter for a recipe to convert
this representation to the vector of maxima
"""
N = len(data)
last = np.argmax(data[:K])
indices = [last]
while indices[-1] <= N - 1:
ge = np.where(data[last + 1 : last + K + 1] > data[last])[0]
if len(ge) == 0:
if last + K >= N:
break
last += 1 + np.argmax(data[last + 1 : last + K + 1])
indices.append(last)
else:
last += 1 + ge[0]
indices.append(last)
indices = np.array(indices)
switches = np.where(data[indices[1:]] > data[indices[:-1]],
indices[1:] + (1-K), indices[:-1] + 1)
return indices, np.r_[switches, [len(data)-K+1]]
def compute_pre_screening_filter(bound, n_offs):
"""compute pre-screening filter for point-wise upper bound
given a K-vector of upper bounds B and K+n_offs-1-vector data
compute K+n_offs-1-vector filter such that for each index j
if for any offset 0 <= o < n_offs and index 0 <= i < K such that
o + i = j, the inequality B_i >= data_j holds then filter_j >= data_j
therefore the number of data points below filter is an upper bound for
the maximum number of points below bound in any K-window in data
"""
pad_l, pad_r = np.min(bound[:n_offs-1]), np.min(bound[1-n_offs:])
padded = np.r_[pad_l+np.zeros(n_offs-1,), bound, pad_r+np.zeros(n_offs-1,)]
indices, switches = sliding_argmax(padded, n_offs)
return padded[indices].repeat(np.diff(np.r_[[0], switches]))
def compute_all_pre_screening_filters(upper, lower, min_chnk=5, dyads=6):
"""compute upper and lower pre-screening filters for data blocks of
sizes K+n_offs-1 where
n_offs = min_chnk, 2min_chnk, ..., 2^(dyads-1)min_chnk
the result is stored in global variables preA and preB
"""
global CHUNK_SIZES
CHUNK_SIZES = min_chnk * 2**np.arange(dyads)
preA[1] = upper
preB[1] = lower
for n in CHUNK_SIZES:
preA[n] = compute_pre_screening_filter(upper, n)
preB[n] = -compute_pre_screening_filter(-lower, n)
def test_bounds(block, counts, threshold=400):
"""test whether the windows fitting in the data block ''block'' fall
within the bounds using pre-screening for efficient bulk rejection
array ''counts'' will be overwritten with the counts of compliant samples
note that accurate counts will only be returned for above threshold
windows, because the analysis of bulk rejected windows is short-circuited
also note that bulk rejection only works for ''well behaved'' data and
for example not on random numbers
"""
N = len(counts)
K = len(preA[1])
r = N % CHUNK_SIZES[0]
# chop up N into as large as possible chunks with matching pre computed
# filters
# start with small and work upwards
counts[:r] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
(block[l:l+K] >= preB[1]))
for l in range(r)]
def bisect(block, counts):
M = len(counts)
cnts = np.count_nonzero((block <= preA[M]) & (block >= preB[M]))
if cnts < threshold:
counts[:] = cnts
return
elif M == CHUNK_SIZES[0]:
counts[:] = [np.count_nonzero((block[l:l+K] <= preA[1]) &
(block[l:l+K] >= preB[1]))
for l in range(M)]
else:
M //= 2
bisect(block[:-M], counts[:M])
bisect(block[M:], counts[M:])
N = N // CHUNK_SIZES[0]
for M in CHUNK_SIZES:
if N % 2:
bisect(block[r:r+M+K-1], counts[r:r+M])
r += M
elif N == 0:
return
N //= 2
else:
for j in range(2*N):
bisect(block[r:r+M+K-1], counts[r:r+M])
r += M
def analyse(data, use_pre_screening=True, min_chnk=5, dyads=6,
threshold=400):
samples, upper, lower = data
N, K = samples.shape[0], upper.shape[0]
times = [time.time()]
if use_pre_screening:
compute_all_pre_screening_filters(upper, lower, min_chnk, dyads)
times.append(time.time())
# compute switching points of max and min for running normalisation
upper_inds, upper_swp = sliding_argmax(samples[:, 1], K)
lower_inds, lower_swp = sliding_argmax(-samples[:, 2], K)
times.append(time.time())
# sum columns
ABCD = samples.sum(axis=-1)
times.append(time.time())
counts = np.empty((N-K+1,), dtype=int)
# main loop
# loop variables:
offs = 0
u_ind, u_scale, u_swp = 0, samples[upper_inds[0], 1], upper_swp[0]
l_ind, l_scale, l_swp = 0, samples[lower_inds[0], 2], lower_swp[0]
while True:
# check which is switching next, min(C) or max(B)
if u_swp > l_swp:
# greedily take the largest block possible such that dif and Cmin
# do not change
block = (ABCD[offs:l_swp+K-1] - 4*l_scale) /
* (0.25 / (u_scale-l_scale))
if use_pre_screening:
test_bounds(block, counts[offs:l_swp], threshold=threshold)
else:
counts[offs:l_swp] = [
np.count_nonzero((block[l:l+K] <= upper) &
(block[l:l+K] >= lower))
for l in range(l_swp - offs)]
# book keeping
l_ind += 1
offs = l_swp
l_swp = lower_swp[l_ind]
l_scale = samples[lower_inds[l_ind], 2]
else:
block = (ABCD[offs:u_swp+K-1] - 4*l_scale) /
* (0.25 / (u_scale-l_scale))
if use_pre_screening:
test_bounds(block, counts[offs:u_swp], threshold=threshold)
else:
counts[offs:u_swp] = [
np.count_nonzero((block[l:l+K] <= upper) &
(block[l:l+K] >= lower))
for l in range(u_swp - offs)]
u_ind += 1
if u_ind == len(upper_inds):
assert u_swp == N-K+1
break
offs = u_swp
u_swp = upper_swp[u_ind]
u_scale = samples[upper_inds[u_ind], 1]
times.append(time.time())
return {''counts'': counts, ''valid'': np.where(counts >= 400)[0],
''timings'': np.diff(times)}
def master(mode=''calibrate'', data=''fake'', use_pre_screening=True, nrep=3,
min_chnk=None, dyads=None):
t = time.time()
if data in (''fake'', ''load''):
data1 = np.loadtxt(''data1.csv'', delimiter='';'', skiprows=1,
usecols=[1,2,3,4])
data2a = np.loadtxt(''data2a.csv'', delimiter='';'', skiprows=1,
usecols=[1])
data2b = np.loadtxt(''data2b.csv'', delimiter='';'', skiprows=1,
usecols=[1])
if data == ''fake'':
data1 = np.tile(data1, (10, 1))
threshold = 400
elif data == ''random'':
data1 = np.random.random((102000, 4))
data2b = np.random.random(2000)
data2a = np.random.random(2000)
threshold = 490
if use_pre_screening or mode == ''calibrate'':
print(''WARNING: pre-screening not efficient on artificial data'')
else:
raise ValueError("data mode {} not recognised".format(data))
data = data1, data2a, data2b
t_load = time.time() - t
if mode == ''calibrate'':
min_chnk = (2, 3, 4, 5, 6) if min_chnk is None else min_chnk
dyads = (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) if dyads is None else dyads
timings = np.zeros((len(min_chnk), len(dyads)))
print(''max bisect '' + '' ''.join([
'' n.a.'' if dy == 0 else ''{:7d}''.format(dy) for dy in dyads]),
end='''')
for i, mc in enumerate(min_chnk):
print(''/nmin chunk {}''.format(mc), end='' '')
for j, dy in enumerate(dyads):
for k in range(nrep):
if dy == 0: # no pre-screening
timings[i, j] += analyse(
data, False, mc, dy, threshold)[''timings''][3]
else:
timings[i, j] += analyse(
data, True, mc, dy, threshold)[''timings''][3]
timings[i, j] /= nrep
print(''{:7.3f}''.format(timings[i, j]), end='' '', flush=True)
best_mc, best_dy = np.unravel_index(np.argmin(timings.ravel()),
timings.shape)
print(''/nbest'', min_chnk[best_mc], dyads[best_dy])
return timings, min_chnk[best_mc], dyads[best_dy]
if mode == ''run'':
min_chnk = 2 if min_chnk is None else min_chnk
dyads = 5 if dyads is None else dyads
res = analyse(data, use_pre_screening, min_chnk, dyads, threshold)
times = np.r_[[t_load], res[''timings'']]
print(list(zip((''load'', ''precomp'', ''max/min'', ''ABCD'', ''main''), times)))
print(''TOTAL'', times.sum())
return res
Introducción y código de solución.
Bueno, lo pediste! Por lo tanto, en esta publicación se PyCUDA
una implementación con PyCUDA
que utiliza envolturas ligeras que extienden la mayoría de las capacidades de CUDA dentro del entorno de Python. Tendremos la funcionalidad SourceModule
que nos permite escribir y compilar los núcleos CUDA que permanecen en el entorno Python.
Para resolver el problema que nos ocupa, entre los cálculos involucrados, tenemos deslizamiento máximo y mínimo, pocas diferencias y divisiones y comparaciones. Para las partes máximas y mínimas, que involucran la búsqueda de bloque máximo (para cada ventana deslizante), usaremos la técnica de reducción como se explica con más detalle here
. Esto se haría a nivel de bloque. Para las iteraciones de nivel superior a través de ventanas deslizantes, usaríamos la indexación de nivel de cuadrícula en los recursos de CUDA. Para obtener más información sobre este bloque y formato de cuadrícula, consulte la page-18
. PyCUDA también admite incorporaciones para reducciones de cómputo como max y min, pero perdemos el control, específicamente pretendemos usar memoria especializada como compartida y memoria constante para aprovechar GPU en su nivel casi óptimo.
Listado del código de la solución PyCUDA-NumPy -
1] parte de PyCUDA -
import pycuda.autoinit
import pycuda.driver as drv
import numpy as np
from pycuda.compiler import SourceModule
mod = SourceModule("""
#define TBP 1024 // THREADS_PER_BLOCK
__device__ void get_Bmax_Cmin(float* out, float *d1, float *d2, int L, int offset)
{
int tid = threadIdx.x;
int inv = TBP;
__shared__ float dS[TBP][2];
dS[tid][0] = d1[tid+offset];
dS[tid][1] = d2[tid+offset];
__syncthreads();
if(tid<L-TBP)
{
dS[tid][0] = fmaxf(dS[tid][0] , d1[tid+inv+offset]);
dS[tid][1] = fminf(dS[tid][1] , d2[tid+inv+offset]);
}
__syncthreads();
inv = inv/2;
while(inv!=0)
{
if(tid<inv)
{
dS[tid][0] = fmaxf(dS[tid][0] , dS[tid+inv][0]);
dS[tid][1] = fminf(dS[tid][1] , dS[tid+inv][1]);
}
__syncthreads();
inv = inv/2;
}
__syncthreads();
if(tid==0)
{
out[0] = dS[0][0];
out[1] = dS[0][1];
}
__syncthreads();
}
__global__ void main1(float* out, float *d0, float *d1, float *d2, float *d3, float *lowL, float *highL, int *BLOCKLEN)
{
int L = BLOCKLEN[0];
int tid = threadIdx.x;
int iterID = blockIdx.x;
float Bmax_Cmin[2];
int inv;
float Cmin, dif;
__shared__ float dS[TBP*2];
get_Bmax_Cmin(Bmax_Cmin, d1, d2, L, iterID);
Cmin = Bmax_Cmin[1];
dif = (Bmax_Cmin[0] - Cmin);
inv = TBP;
dS[tid] = (d0[tid+iterID] + d1[tid+iterID] + d2[tid+iterID] + d3[tid+iterID] - 4.0*Cmin) / (4.0*dif);
__syncthreads();
if(tid<L-TBP)
dS[tid+inv] = (d0[tid+inv+iterID] + d1[tid+inv+iterID] + d2[tid+inv+iterID] + d3[tid+inv+iterID] - 4.0*Cmin) / (4.0*dif);
dS[tid] = ((dS[tid] >= lowL[tid]) & (dS[tid] <= highL[tid])) ? 1 : 0;
__syncthreads();
if(tid<L-TBP)
dS[tid] += ((dS[tid+inv] >= lowL[tid+inv]) & (dS[tid+inv] <= highL[tid+inv])) ? 1 : 0;
__syncthreads();
inv = inv/2;
while(inv!=0)
{
if(tid<inv)
dS[tid] += dS[tid+inv];
__syncthreads();
inv = inv/2;
}
if(tid==0)
out[iterID] = dS[0];
__syncthreads();
}
""")
Tenga en cuenta que THREADS_PER_BLOCK, TBP
se establecerá en función del batchSize
. La regla de oro aquí es asignar una potencia de 2 valores a TBP
que sea menor que batchSize
. Por lo tanto, para batchSize = 2000
, necesitábamos TBP
como 1024
.
2] NumPy part -
def gpu_app_v1(A, B, C, D, batchSize, minimumLimit):
func1 = mod.get_function("main1")
outlen = len(A)-batchSize+1
# Set block and grid sizes
BSZ = (1024,1,1)
GSZ = (outlen,1)
dest = np.zeros(outlen).astype(np.float32)
N = np.int32(batchSize)
func1(drv.Out(dest), drv.In(A), drv.In(B), drv.In(C), drv.In(D), /
drv.In(data2b), drv.In(data2a),/
drv.In(N), block=BSZ, grid=GSZ)
idx = np.flatnonzero(dest >= minimumLimit)
return idx, dest[idx]
Benchmarking
He probado en GTX 960M. Tenga en cuenta que PyCUDA espera que las matrices sean de orden contiguo. Por lo tanto, tenemos que cortar las columnas y hacer copias. Estoy esperando / asumiendo que los datos podrían leerse de los archivos de tal manera que los datos se distribuyan a lo largo de filas en lugar de ser como columnas. Por lo tanto, mantener a aquellos fuera de la función de evaluación comparativa por ahora.
Enfoque original -
def org_app(data1, batchSize, minimumLimit):
resultArray = []
for rowNr in range(data1.shape[0]-batchSize+1):
tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
result = doTheMath(tmp_df, data2a, data2b)
if (result >= minimumLimit):
resultArray.append([rowNr , result])
return resultArray
Tiempos y verificación -
In [2]: #Declare variables
...: batchSize = 2000
...: sampleSize = 50000
...: resultArray = []
...: minimumLimit = 490 #use 400 on the real sample data
...:
...: #Create Random Sample Data
...: data1 = np.random.uniform(1, 100000, (sampleSize + batchSize, 4)).astype(np.float32)
...: data2b = np.random.uniform(0, 1, (batchSize)).astype(np.float32)
...: data2a = data2b + np.random.uniform(0, 1, (batchSize)).astype(np.float32)
...:
...: # Make column copies
...: A = data1[:,0].copy()
...: B = data1[:,1].copy()
...: C = data1[:,2].copy()
...: D = data1[:,3].copy()
...:
...: gpu_out1,gpu_out2 = gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
...: cpu_out1,cpu_out2 = np.array(org_app(data1, batchSize, minimumLimit)).T
...: print(np.allclose(gpu_out1, cpu_out1))
...: print(np.allclose(gpu_out2, cpu_out2))
...:
True
False
Entonces, hay algunas diferencias entre los recuentos de CPU y GPU. Investiguémoslos
In [7]: idx = np.flatnonzero(~np.isclose(gpu_out2, cpu_out2))
In [8]: idx
Out[8]: array([12776, 15208, 17620, 18326])
In [9]: gpu_out2[idx] - cpu_out2[idx]
Out[9]: array([-1., -1., 1., 1.])
Hay cuatro instancias de cuentas no coincidentes. Estos están desactivados al máximo por 1
. En la investigación, me encontré con alguna información sobre esto. Básicamente, ya que estamos usando los cálculos de matemática para los cálculos de max y min, y creo que esos están causando que el último bit binario en la representación de pt flotante sea diferente al de la contraparte de la CPU. Esto se denomina error ULP y se ha discutido en detalle here
y here
.
Finalmente, dejando de lado el problema, vamos a lo más importante, el rendimiento:
In [10]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 2.18 s per loop
In [11]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
10 loops, best of 3: 82.5 ms per loop
In [12]: 2180.0/82.5
Out[12]: 26.424242424242426
Probemos con conjuntos de datos más grandes. Con sampleSize = 500000
, obtenemos -
In [14]: %timeit org_app(data1, batchSize, minimumLimit)
1 loops, best of 3: 23.2 s per loop
In [15]: %timeit gpu_app_v1(A, B, C, D, batchSize, minimumLimit)
1 loops, best of 3: 821 ms per loop
In [16]: 23200.0/821
Out[16]: 28.25822168087698
Por lo tanto, la aceleración se mantiene constante en alrededor de 27
.
Limitaciones:
1) Estamos utilizando números float32
, ya que las GPU funcionan mejor con esos. La precisión doble, especialmente en las GPU que no son de servidor, no es popular cuando se trata de rendimiento y, como está trabajando con una GPU de este tipo, probé con float32.
Mejoramiento adicional :
1) Podríamos usar una constant memory
más rápida para alimentar data2a
y data2b
, en lugar de usar global memory
.
Tweak # 1
Por lo general, se recomienda vectorizar las cosas cuando se trabaja con matrices NumPy. Pero con arreglos muy grandes, creo que estás fuera de opciones allí. Por lo tanto, para mejorar el rendimiento, es posible optimizar un pequeño ajuste en el último paso de la suma.
Podríamos reemplazar el paso que hace una matriz de 1s
y 0s
y que suma:
np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
con np.count_nonzero
que funciona de manera eficiente para contar valores True
en una matriz booleana, en lugar de convertir a 1s
y 0s
-
np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
Prueba de tiempo de ejecución
In [45]: abcd = np.random.randint(11,99,(10000))
In [46]: data2a = np.random.randint(11,99,(10000))
In [47]: data2b = np.random.randint(11,99,(10000))
In [48]: %timeit np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
10000 loops, best of 3: 81.8 µs per loop
In [49]: %timeit np.count_nonzero((abcd <= data2a) & (abcd >= data2b))
10000 loops, best of 3: 28.8 µs per loop
Tweak # 2
Utilice un recíproco precalculado cuando se trate de casos que se someten a radiodifusión implícita. Un poco más de información here
. Por lo tanto, almacene recíproco de dif
y use eso en lugar del paso:
((((A - Cmin) / dif) + ((B - Cmin) / dif) + ...
Prueba de muestra
In [52]: A = np.random.rand(10000)
In [53]: dif = 0.5
In [54]: %timeit A/dif
10000 loops, best of 3: 25.8 µs per loop
In [55]: %timeit A*(1.0/dif)
100000 loops, best of 3: 7.94 µs per loop
Tienes cuatro lugares usando división por dif
. Así que, ¡espero que esto también produzca un impulso notable allí!
Esto es técnicamente fuera de tema (no GPU) pero estoy seguro de que estarás interesado.
Hay un ahorro obvio y bastante grande:
Precomputa A + B + C + D
(no en el bucle, en todos los datos :) data1.sum(axis=-1)
, porque abcd = ((A+B+C+D) - 4Cmin) / (4dif)
. Esto ahorrará bastantes operaciones.
Sorprendió que nadie lo viera antes ;-)
Editar:
Hay otra cosa, aunque sospecho que solo está en su ejemplo, no en sus datos reales:
Como se encuentra aproximadamente la mitad de data2a
será menor que data2b
. En estos lugares, sus condiciones abcd
no pueden ser verdaderas, por lo que ni siquiera debe calcularlas abcd
allí.
Editar:
Una modificación más que utilicé a continuación, pero olvidé mencionar: si calcula el máximo (o mínimo) sobre una ventana móvil. Cuando mueva un punto a la derecha, diga, ¿qué tan probable es que cambie el máximo? Solo hay dos cosas que pueden cambiarlo: el nuevo punto a la derecha es más grande (ocurre aproximadamente una vez en el tiempo de duración de la ventana, e incluso si sucede, inmediatamente se sabe el nuevo máximo) o el antiguo máximo se cae de la ventana de la izquierda (También ocurre aproximadamente una vez en tiempos de windowlength). Solo en este último caso debe buscar el nuevo máximo en toda la ventana.
Editar:
No pude resistirme a probarlo en tensorflow. No tengo una GPU, así que tú mismo tienes que probar la velocidad. Ponga "gpu" para "cpu" en la línea marcada.
En la CPU, es aproximadamente la mitad de rápido que su implementación original (es decir, sin los ajustes de Divakar). Tenga en cuenta que me he tomado la libertad de cambiar las entradas de matriz a matriz simple. Actualmente, tensorflow es un poco un objetivo móvil, así que asegúrese de tener la versión correcta. Usé Python3.6 y tf 0.12.1 Si haces un pip3 instala tensorflow-gpu hoy debería Podría funcionar.
import numpy as np
import time
import tensorflow as tf
# currently the max/min code is sequential
# thus
parallel_iterations = 1
# but you can put this in a separate loop, precompute and then try and run
# the remainder of doTheMathTF with a larger parallel_iterations
# tensorflow is quite capricious about its data types
ddf = tf.float64
ddi = tf.int32
def worker(data1, data2a, data2b):
###################################
# CHANGE cpu to gpu in next line! #
###################################
with tf.device(''/cpu:0''):
g = tf.Graph ()
with g.as_default():
ABCD = tf.constant(data1.sum(axis=-1), dtype=ddf)
B = tf.constant(data1[:, 1], dtype=ddf)
C = tf.constant(data1[:, 2], dtype=ddf)
window = tf.constant(len(data2a))
N = tf.constant(data1.shape[0] - len(data2a) + 1, dtype=ddi)
data2a = tf.constant(data2a, dtype=ddf)
data2b = tf.constant(data2b, dtype=ddf)
def doTheMathTF(i, Bmax, Bmaxind, Cmin, Cminind, out):
# most of the time we can keep the old max/min
Bmaxind = tf.cond(Bmaxind<i,
lambda: i + tf.to_int32(
tf.argmax(B[i:i+window], axis=0)),
lambda: tf.cond(Bmax>B[i+window-1],
lambda: Bmaxind,
lambda: i+window-1))
Cminind = tf.cond(Cminind<i,
lambda: i + tf.to_int32(
tf.argmin(C[i:i+window], axis=0)),
lambda: tf.cond(Cmin<C[i+window-1],
lambda: Cminind,
lambda: i+window-1))
Bmax = B[Bmaxind]
Cmin = C[Cminind]
abcd = (ABCD[i:i+window] - 4 * Cmin) * (1 / (4 * (Bmax-Cmin)))
out = out.write(i, tf.to_int32(
tf.count_nonzero(tf.logical_and(abcd <= data2a,
abcd >= data2b))))
return i + 1, Bmax, Bmaxind, Cmin, Cminind, out
with tf.Session(graph=g) as sess:
i, Bmaxind, Bmax, Cminind, Cmin, out = tf.while_loop(
lambda i, _1, _2, _3, _4, _5: i<N, doTheMathTF,
(tf.Variable(0, dtype=ddi), tf.Variable(0.0, dtype=ddf),
tf.Variable(-1, dtype=ddi),
tf.Variable(0.0, dtype=ddf), tf.Variable(-1, dtype=ddi),
tf.TensorArray(ddi, size=N)),
shape_invariants=None,
parallel_iterations=parallel_iterations,
back_prop=False)
out = out.pack()
sess.run(tf.initialize_all_variables())
out, = sess.run((out,))
return out
#Declare variables
batchSize = 2000
sampleSize = 50000#00
resultArray = []
#Create Sample Data
data1 = np.random.uniform(1, 100, (sampleSize + batchSize, 4))
data2a = np.random.uniform(0, 1, (batchSize,))
data2b = np.random.uniform(0, 1, (batchSize,))
t0 = time.time()
out = worker(data1, data2a, data2b)
print(''Runtime (tensorflow):'', time.time() - t0)
good_indices, = np.where(out >= 490)
res_tf = np.c_[good_indices, out[good_indices]]
def doTheMath(tmpData1, data2a, data2b):
A = tmpData1[:, 0]
B = tmpData1[:,1]
C = tmpData1[:,2]
D = tmpData1[:,3]
Bmax = B.max()
Cmin = C.min()
dif = (Bmax - Cmin)
abcd = ((((A - Cmin) / dif) + ((B - Cmin) / dif) + ((C - Cmin) / dif) + ((D - Cmin) / dif)) / 4)
return np.where(((abcd <= data2a) & (abcd >= data2b)), 1, 0).sum()
#Loop through the data
t0 = time.time()
for rowNr in range(sampleSize+1):
tmp_df = data1[rowNr:rowNr + batchSize] #rolling window
result = doTheMath(tmp_df, data2a, data2b)
if (result >= 490):
resultArray.append([rowNr , result])
print(''Runtime (original):'', time.time() - t0)
print(np.alltrue(np.array(resultArray)==res_tf))