python performance pandas numpy vectorization

python - Versión NumPy de "Promedio móvil ponderado exponencial", equivalente a pandas.ewm(). Mean()



performance vectorization (11)

¿Cómo obtengo el promedio móvil ponderado exponencial en NumPy como el siguiente en pandas ?

import pandas as pd import pandas_datareader as pdr from datetime import datetime # Declare variables ibm = pdr.get_data_yahoo(symbols=''IBM'', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)[''Adj Close''] windowSize = 20 # Get PANDAS exponential weighted moving average ewm_pd = pd.DataFrame(ibm).ewm(span=windowSize, min_periods=windowSize).mean().as_matrix() print(ewm_pd)

Intenté lo siguiente con NumPy

import numpy as np import pandas_datareader as pdr from datetime import datetime # From this post: http://stackoverflow.com/a/40085052/3293881 by @Divakar def strided_app(a, L, S): # Window len = L, Stride len/stepsize = S nrows = ((a.size - L) // S) + 1 n = a.strides[0] return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n)) def numpyEWMA(price, windowSize): weights = np.exp(np.linspace(-1., 0., windowSize)) weights /= weights.sum() a2D = strided_app(price, windowSize, 1) returnArray = np.empty((price.shape[0])) returnArray.fill(np.nan) for index in (range(a2D.shape[0])): returnArray[index + windowSize-1] = np.convolve(weights, a2D[index])[windowSize - 1:-windowSize + 1] return np.reshape(returnArray, (-1, 1)) # Declare variables ibm = pdr.get_data_yahoo(symbols=''IBM'', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)[''Adj Close''] windowSize = 20 # Get NumPy exponential weighted moving average ewma_np = numpyEWMA(ibm, windowSize) print(ewma_np)

Pero los resultados no son similares a los de los pandas.

¿Existe quizás un mejor enfoque para calcular el promedio móvil ponderado exponencial directamente en NumPy y obtener exactamente el mismo resultado que pandas.ewm().mean() ?

Con 60,000 solicitudes de solución de pandas, obtengo unos 230 segundos. Estoy seguro de que con un NumPy puro, esto se puede disminuir significativamente.


Los pandas EWMA 23x más rápidos

La pregunta es estrictamente pedir una solución numpy , sin embargo, parece que el OP fue en realidad justo después de una solución numpy pura para acelerar el tiempo de ejecución.

numba.jit un problema similar, pero en cambio miré hacia numba.jit que acelera masivamente el tiempo de cálculo

In [24]: a = np.random.random(10**7) ...: df = pd.Series(a) In [25]: %timeit numpy_ewma(a, 10) # /a/42915307/4013571 ...: %timeit df.ewm(span=10).mean() # pandas ...: %timeit numpy_ewma_vectorized_v2(a, 10) # best w/o numba: /a/42926270/4013571 ...: %timeit _ewma(a, 10) # fastest accurate (below) ...: %timeit _ewma_infinite_hist(a, 10) # fastest overall (below) 4.14 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 991 ms ± 52.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 396 ms ± 8.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 181 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) 39.6 ms ± 979 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

Reducción a matrices más pequeñas de a = np.random.random(100) (resultados en el mismo orden)

41.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each) 945 ms ± 12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) 16 µs ± 93.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each) 1.66 µs ± 13.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each) 1.14 µs ± 5.57 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

También vale la pena señalar que mis funciones a continuación están alineadas de manera idéntica a los pandas (ver los ejemplos en docstr), mientras que algunas de las respuestas aquí toman varias aproximaciones diferentes. Por ejemplo,

In [57]: print(pd.DataFrame([1,2,3]).ewm(span=2).mean().values.ravel()) ...: print(numpy_ewma_vectorized_v2(np.array([1,2,3]), 2)) ...: print(numpy_ewma(np.array([1,2,3]), 2)) [1. 1.75 2.61538462] [1. 1.66666667 2.55555556] [1. 1.18181818 1.51239669]

El código fuente que he documentado para mi propia biblioteca.

import numpy as np from numba import jit from numba import float64 from numba import int64 @jit((float64[:], int64), nopython=True, nogil=True) def _ewma(arr_in, window): r"""Exponentialy weighted moving average specified by a decay ``window`` to provide better adjustments for small windows via: y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) / (1 + (1-a) + (1-a)^2 + ... + (1-a)^n). Parameters ---------- arr_in : np.ndarray, float64 A single dimenisional numpy array window : int64 The decay window, or ''span'' Returns ------- np.ndarray The EWMA vector, same length / shape as ``arr_in`` Examples -------- >>> import pandas as pd >>> a = np.arange(5, dtype=float) >>> exp = pd.DataFrame(a).ewm(span=10, adjust=True).mean() >>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel()) True """ n = arr_in.shape[0] ewma = np.empty(n, dtype=float64) alpha = 2 / float(window + 1) w = 1 ewma_old = arr_in[0] ewma[0] = ewma_old for i in range(1, n): w += (1-alpha)**i ewma_old = ewma_old*(1-alpha) + arr_in[i] ewma[i] = ewma_old / w return ewma @jit((float64[:], int64), nopython=True, nogil=True) def _ewma_infinite_hist(arr_in, window): r"""Exponentialy weighted moving average specified by a decay ``window`` assuming infinite history via the recursive form: (2) (i) y[0] = x[0]; and (ii) y[t] = a*x[t] + (1-a)*y[t-1] for t>0. This method is less accurate that ``_ewma`` but much faster: In [1]: import numpy as np, bars ...: arr = np.random.random(100000) ...: %timeit bars._ewma(arr, 10) ...: %timeit bars._ewma_infinite_hist(arr, 10) 3.74 ms ± 60.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each) 262 µs ± 1.54 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each) Parameters ---------- arr_in : np.ndarray, float64 A single dimenisional numpy array window : int64 The decay window, or ''span'' Returns ------- np.ndarray The EWMA vector, same length / shape as ``arr_in`` Examples -------- >>> import pandas as pd >>> a = np.arange(5, dtype=float) >>> exp = pd.DataFrame(a).ewm(span=10, adjust=False).mean() >>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel()) True """ n = arr_in.shape[0] ewma = np.empty(n, dtype=float64) alpha = 2 / float(window + 1) ewma[0] = arr_in[0] for i in range(1, n): ewma[i] = arr_in[i] * alpha + ewma[i-1] * (1 - alpha) return ewma


Aquí está mi implementación para matrices de entrada 1D con un tamaño de ventana infinito. Como usa números grandes, solo funciona con matrices de entrada con elementos de valor absoluto <1e16, cuando se usa float32, pero ese debería ser el caso.

La idea es cambiar la forma de la matriz de entrada en segmentos de una longitud limitada, para que no se produzca un desbordamiento, y luego hacer el cálculo de ewm con cada segmento por separado.

def ewm(x, alpha): """ Returns the exponentially weighted mean y of a numpy array x with scaling factor alpha y[0] = x[0] y[j] = (1. - alpha) * y[j-1] + alpha * x[j], for j > 0 x -- 1D numpy array alpha -- float """ n = int(-100. / np.log(1.-alpha)) # Makes sure that the first and last elements in f are very big and very small (about 1e22 and 1e-22) f = np.exp(np.arange(1-n, n, 2) * (0.5 * np.log(1. - alpha))) # Scaling factor for each slice tmp = (np.resize(x, ((len(x) + n - 1) // n, n)) / f * alpha).cumsum(axis=1) * f # Get ewm for each slice of length n # Add the last value of each previous slice to the next slice with corresponding scaling factor f and return result return np.resize(tmp + np.tensordot(np.append(x[0], np.roll(tmp.T[n-1], 1)[1:]), f * ((1. - alpha) / f[0]), axes=0), len(x))


¡Creo que finalmente lo he descifrado!

Aquí hay una versión vectorizada de la función numpy_ewma que se afirma que produce los resultados correctos de la @RaduS''s post :

def numpy_ewma_vectorized(data, window): alpha = 2 /(window + 1.0) alpha_rev = 1-alpha scale = 1/alpha_rev n = data.shape[0] r = np.arange(n) scale_arr = scale**r offset = data[0]*alpha_rev**(r+1) pw0 = alpha*alpha_rev**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = offset + cumsums*scale_arr[::-1] return out

Mayor impulso

Podemos impulsarlo aún más con la reutilización de algunos códigos, así:

def numpy_ewma_vectorized_v2(data, window): alpha = 2 /(window + 1.0) alpha_rev = 1-alpha n = data.shape[0] pows = alpha_rev**(np.arange(n+1)) scale_arr = 1/pows[:-1] offset = data[0]*pows[1:] pw0 = alpha*alpha_rev**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = offset + cumsums*scale_arr[::-1] return out

Prueba de tiempo de ejecución

Midamos estos dos contra la misma función de bucle para un gran conjunto de datos.

In [97]: data = np.random.randint(2,9,(5000)) ...: window = 20 ...: In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window)) Out[98]: True In [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window)) Out[99]: True In [100]: %timeit numpy_ewma(data, window) 100 loops, best of 3: 6.03 ms per loop In [101]: %timeit numpy_ewma_vectorized(data, window) 1000 loops, best of 3: 665 µs per loop In [102]: %timeit numpy_ewma_vectorized_v2(data, window) 1000 loops, best of 3: 357 µs per loop In [103]: 6030/357.0 Out[103]: 16.89075630252101

¡Hay una aceleración de alrededor de 17 veces!


Aquí hay otra solución que O se le ocurrió mientras tanto. Es aproximadamente cuatro veces más rápido que la solución de pandas.

def numpy_ewma(data, window): returnArray = np.empty((data.shape[0])) returnArray.fill(np.nan) e = data[0] alpha = 2 / float(window + 1) for s in range(data.shape[0]): e = ((data[s]-e) *alpha ) + e returnArray[s] = e return returnArray

Usé esta fórmula como punto de partida. Estoy seguro de que esto se puede mejorar aún más, pero es al menos un punto de partida.


Aquí hay una implementación usando NumPy que es equivalente a usar df.ewm(alpha=alpha).mean() . Después de leer la documentación, son solo algunas operaciones matriciales. El truco es construir las matrices correctas.

Vale la pena señalar que debido a que estamos creando matrices flotantes, puede comer rápidamente a través de su memoria si la matriz de entrada es demasiado grande.

import pandas as pd import numpy as np def ewma(x, alpha): '''''' Returns the exponentially weighted moving average of x. Parameters: ----------- x : array-like alpha : float {0 <= alpha <= 1} Returns: -------- ewma: numpy array the exponentially weighted moving average '''''' # Coerce x to an array x = np.array(x) n = x.size # Create an initial weight matrix of (1-alpha), and a matrix of powers # to raise the weights by w0 = np.ones(shape=(n,n)) * (1-alpha) p = np.vstack([np.arange(i,i-n,-1) for i in range(n)]) # Create the weight matrix w = np.tril(w0**p,0) # Calculate the ewma return np.dot(w, x[::np.newaxis]) / w.sum(axis=1)

Probemos su:

alpha = 0.55 x = np.random.randint(0,30,15) df = pd.DataFrame(x, columns=[''A'']) df.ewm(alpha=alpha).mean() # returns: # A # 0 13.000000 # 1 22.655172 # 2 20.443268 # 3 12.159796 # 4 14.871955 # 5 15.497575 # 6 20.743511 # 7 20.884818 # 8 24.250715 # 9 18.610901 # 10 17.174686 # 11 16.528564 # 12 17.337879 # 13 7.801912 # 14 12.310889 ewma(x=x, alpha=alpha) # returns: # array([ 13. , 22.65517241, 20.44326778, 12.1597964 , # 14.87195534, 15.4975749 , 20.74351117, 20.88481763, # 24.25071484, 18.61090129, 17.17468551, 16.52856393, # 17.33787888, 7.80191235, 12.31088889])


Dado alpha y windowSize , aquí hay un enfoque para simular el comportamiento correspondiente en NumPy:

def numpy_ewm_alpha(a, alpha, windowSize): wghts = (1-alpha)**np.arange(windowSize) wghts /= wghts.sum() out = np.full(df.shape[0],np.nan) out[windowSize-1:] = np.convolve(a,wghts,''valid'') return out

Ejecuciones de muestra para verificación:

In [54]: alpha = 0.55 ...: windowSize = 20 ...: In [55]: df = pd.DataFrame(np.random.randint(2,9,(100))) In [56]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel() ...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize) ...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1))) ...: Max. error : 5.10531254605e-07 In [57]: alpha = 0.75 ...: windowSize = 30 ...: In [58]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel() ...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize) ...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1))) Max. error : 8.881784197e-16

Prueba de tiempo de ejecución en un conjunto de datos más grande:

In [61]: alpha = 0.55 ...: windowSize = 20 ...: In [62]: df = pd.DataFrame(np.random.randint(2,9,(10000))) In [63]: %timeit df.ewm(alpha = alpha, min_periods=windowSize).mean() 1000 loops, best of 3: 851 µs per loop In [64]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize) 1000 loops, best of 3: 204 µs per loop

Mayor impulso

Para aumentar aún más el rendimiento, podríamos evitar la inicialización con NaN y, en su lugar, usar el conjunto np.convolve por np.convolve , de esta manera:

def numpy_ewm_alpha_v2(a, alpha, windowSize): wghts = (1-alpha)**np.arange(windowSize) wghts /= wghts.sum() out = np.convolve(a,wghts) out[:windowSize-1] = np.nan return out[:a.size]

Tiempos -

In [117]: alpha = 0.55 ...: windowSize = 20 ...: In [118]: df = pd.DataFrame(np.random.randint(2,9,(10000))) In [119]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize) 1000 loops, best of 3: 204 µs per loop In [120]: %timeit numpy_ewm_alpha_v2(df.values.ravel(), alpha = alpha, windowSize = windowSize) 10000 loops, best of 3: 195 µs per loop


Esta respuesta puede parecer irrelevante. Pero, para aquellos que también necesitan calcular la varianza ponderada exponencialmente (y también la desviación estándar) con NumPy, la siguiente solución será útil:

import numpy as np def ew(a, alpha, winSize): _alpha = 1 - alpha ws = _alpha ** np.arange(winSize) w_sum = ws.sum() ew_mean = np.convolve(a, ws)[winSize - 1] / w_sum bias = (w_sum ** 2) / ((w_sum ** 2) - (ws ** 2).sum()) ew_var = (np.convolve((a - ew_mean) ** 2, ws)[winSize - 1] / w_sum) * bias ew_std = np.sqrt(ew_var) return (ew_mean, ew_var, ew_std)


Gracias a la solución de @ Divakar y eso es realmente rápido. Sin embargo, causa un problema de desbordamiento que fue señalado por @Danny. La función no devuelve respuestas correctas cuando la longitud es mayor que 13835 más o menos a mi final.

La siguiente es mi solución basada en la solución de Divakar y pandas.ewm (). Mean ()

def numpy_ema(data, com=None, span=None, halflife=None, alpha=None): """Summary Calculate ema with automatically-generated alpha. Weight of past effect decreases as the length of window increasing. # these functions reproduce the pandas result when the flag adjust=False is set. References: https://.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm Args: data (TYPE): Description com (float, optional): Specify decay in terms of center of mass, alpha=1/(1+com), for com>=0 span (float, optional): Specify decay in terms of span, alpha=2/(span+1), for span>=1 halflife (float, optional): Specify decay in terms of half-life, alpha=1-exp(log(0.5)/halflife), for halflife>0 alpha (float, optional): Specify smoothing factor alpha directly, 0<alpha<=1 Returns: TYPE: Description Raises: ValueError: Description """ n_input = sum(map(bool, [com, span, halflife, alpha])) if n_input != 1: raise ValueError( ''com, span, halflife, and alpha are mutually exclusive'') nrow = data.shape[0] if np.isnan(data).any() or (nrow > 13835) or (data.ndim == 2): df = pd.DataFrame(data) df_ewm = df.ewm(com=com, span=span, halflife=halflife, alpha=alpha, adjust=False) out = df_ewm.mean().values.squeeze() else: if com: alpha = 1 / (1 + com) elif span: alpha = 2 / (span + 1.0) elif halflife: alpha = 1 - np.exp(np.log(0.5) / halflife) alpha_rev = 1 - alpha pows = alpha_rev**(np.arange(nrow + 1)) scale_arr = 1 / pows[:-1] offset = data[0] * pows[1:] pw0 = alpha * alpha_rev**(nrow - 1) mult = data * pw0 * scale_arr cumsums = np.cumsum(mult) out = offset + cumsums * scale_arr[::-1] return out


La respuesta de @ Divakar parece causar desbordamiento al tratar con

numpy_ewma_vectorized(np.random.random(500000), 10)

Lo que he estado usando es:

def EMA(input, time_period=10): # For time period = 10 t_ = time_period - 1 ema = np.zeros_like(input,dtype=float) multiplier = 2.0 / (time_period + 1) #multiplier = 1 - multiplier for i in range(len(input)): # Special Case if i > t_: ema[i] = (input[i] - ema[i-1]) * multiplier + ema[i-1] else: ema[i] = np.mean(input[:i+1]) return ema

Sin embargo, esto es mucho más lento que la solución panda:

from pandas import ewma as pd_ema def EMA_fast(X, time_period = 10): out = pd_ema(X, span=time_period, min_periods=time_period) out[:time_period-1] = np.cumsum(X[:time_period-1]) / np.asarray(range(1,time_period)) return out


Sobre la base de la gran respuesta de , aquí hay una implementación que corresponde a la bandera de adjust=True de la función pandas, es decir, usando pesos en lugar de recursividad.

def numpy_ewma(data, window): alpha = 2 /(window + 1.0) scale = 1/(1-alpha) n = data.shape[0] scale_arr = (1-alpha)**(-1*np.arange(n)) weights = (1-alpha)**np.arange(n) pw0 = (1-alpha)**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = cumsums*scale_arr[::-1] / weights.cumsum() return out


Actualizado 08/06/2019

SOLUCIÓN PURITA, RÁPIDA Y VECTORIZADA PARA ENTRADAS GRANDES

out parámetro para el cálculo en el lugar, parámetro dtype , parámetro de order índice

Esta función es equivalente a pandas '' ewm(adjust=False).mean() , pero mucho más rápido. ewm(adjust=True).mean() (el valor predeterminado para los pandas) puede producir diferentes valores al comienzo del resultado. Estoy trabajando para agregar la funcionalidad de adjust a esta solución.

La respuesta de @ Divakar conduce a problemas de precisión de coma flotante cuando la entrada es demasiado grande. Esto se debe a que (1-alpha)**(n+1) -> 0 cuando n -> inf y alpha -> 1 , lo que lleva a que aparezcan valores de división por cero y NaN en el cálculo.

Aquí está mi solución más rápida sin problemas de precisión, casi completamente vectorizada. Se ha vuelto un poco complicado, pero el rendimiento es excelente, especialmente para entradas realmente grandes. Sin usar cálculos en el lugar (que es posible usando el parámetro out , ahorrando tiempo de asignación de memoria): 3.62 segundos para el vector de entrada de elementos de 100M, 3.2ms para un vector de entrada de elementos de 100K y 293 µs para un vector de entrada de 5000 elementos en un bastante viejo PC (los resultados variarán con diferentes valores de alpha / row_size ).

# tested with python3 & numpy 1.15.2 import numpy as np def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order=''C'', out=None): """ Reshapes data before calculating EWMA, then iterates once over the rows to calculate the offset without precision issues :param data: Input data, will be flattened. :param alpha: scalar float in range (0,1) The alpha parameter for the moving average. :param row_size: int, optional The row size to use in the computation. High row sizes need higher precision, low values will impact performance. The optimal value depends on the platform and the alpha being used. Higher alpha values require lower row size. Default depends on dtype. :param dtype: optional Data type used for calculations. Defaults to float64 unless data.dtype is float32, then it will use float32. :param order: {''C'', ''F'', ''A''}, optional Order to use when flattening the data. Defaults to ''C''. :param out: ndarray, or None, optional A location into which the result is stored. If provided, it must have the same shape as the desired output. If not provided or `None`, a freshly-allocated array is returned. :return: The flattened result. """ data = np.array(data, copy=False) if dtype is None: if data.dtype == np.float32: dtype = np.float32 else: dtype = np.float else: dtype = np.dtype(dtype) row_size = int(row_size) if row_size is not None else get_max_row_size(alpha, dtype) if data.size <= row_size: # The normal function can handle this input, use that return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out) if data.ndim > 1: # flatten input data = np.reshape(data, -1, order=order) if out is None: out = np.empty_like(data, dtype=dtype) else: assert out.shape == data.shape assert out.dtype == dtype row_n = int(data.size // row_size) # the number of rows to use trailing_n = int(data.size % row_size) # the amount of data leftover first_offset = data[0] if trailing_n > 0: # set temporary results to slice view of out parameter out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size)) data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size)) else: out_main_view = out data_main_view = data # get all the scaled cumulative sums with 0 offset ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype, order=''C'', out=out_main_view) scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1) last_scaling_factor = scaling_factors[-1] # create offset array offsets = np.empty(out_main_view.shape[0], dtype=dtype) offsets[0] = first_offset # iteratively calculate offset for each row for i in range(1, out_main_view.shape[0]): offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1] # add the offsets to the result out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :] if trailing_n > 0: # process trailing data in the 2nd slice of the out parameter ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1], dtype=dtype, order=''C'', out=out[-trailing_n:]) return out def get_max_row_size(alpha, dtype=float): assert 0. <= alpha < 1. # This will return the maximum row size possible on # your platform for the given dtype. I can find no impact on accuracy # at this value on my machine. # Might not be the optimal value for speed, which is hard to predict # due to numpy''s optimizations # Use np.finfo(dtype).eps if you are worried about accuracy # and want to be extra safe. epsilon = np.finfo(dtype).tiny # If this produces an OverflowError, make epsilon larger return int(np.log(epsilon)/np.log(1-alpha)) + 1

La función 1D ewma:

def ewma_vectorized(data, alpha, offset=None, dtype=None, order=''C'', out=None): """ Calculates the exponential moving average over a vector. Will fail for large inputs. :param data: Input data :param alpha: scalar float in range (0,1) The alpha parameter for the moving average. :param offset: optional The offset for the moving average, scalar. Defaults to data[0]. :param dtype: optional Data type used for calculations. Defaults to float64 unless data.dtype is float32, then it will use float32. :param order: {''C'', ''F'', ''A''}, optional Order to use when flattening the data. Defaults to ''C''. :param out: ndarray, or None, optional A location into which the result is stored. If provided, it must have the same shape as the input. If not provided or `None`, a freshly-allocated array is returned. """ data = np.array(data, copy=False) if dtype is None: if data.dtype == np.float32: dtype = np.float32 else: dtype = np.float64 else: dtype = np.dtype(dtype) if data.ndim > 1: # flatten input data = data.reshape(-1, order) if out is None: out = np.empty_like(data, dtype=dtype) else: assert out.shape == data.shape assert out.dtype == dtype if data.size < 1: # empty input, return empty array return out if offset is None: offset = data[0] alpha = np.array(alpha, copy=False).astype(dtype, copy=False) # scaling_factors -> 0 as len(data) gets large # this leads to divide-by-zeros below scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype), dtype=dtype) # create cumulative sum array np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1], dtype=dtype, out=out) np.cumsum(out, dtype=dtype, out=out) # cumsums / scaling out /= scaling_factors[-2::-1] if offset != 0: offset = np.array(offset, copy=False).astype(dtype, copy=False) # add offsets out += offset * scaling_factors[1:] return out

La función 2D ewma:

def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order=''C'', out=None): """ Calculates the exponential moving average over a given axis. :param data: Input data, must be 1D or 2D array. :param alpha: scalar float in range (0,1) The alpha parameter for the moving average. :param axis: The axis to apply the moving average on. If axis==None, the data is flattened. :param offset: optional The offset for the moving average. Must be scalar or a vector with one element for each row of data. If set to None, defaults to the first value of each row. :param dtype: optional Data type used for calculations. Defaults to float64 unless data.dtype is float32, then it will use float32. :param order: {''C'', ''F'', ''A''}, optional Order to use when flattening the data. Ignored if axis is not None. :param out: ndarray, or None, optional A location into which the result is stored. If provided, it must have the same shape as the desired output. If not provided or `None`, a freshly-allocated array is returned. """ data = np.array(data, copy=False) assert data.ndim <= 2 if dtype is None: if data.dtype == np.float32: dtype = np.float32 else: dtype = np.float64 else: dtype = np.dtype(dtype) if out is None: out = np.empty_like(data, dtype=dtype) else: assert out.shape == data.shape assert out.dtype == dtype if data.size < 1: # empty input, return empty array return out if axis is None or data.ndim < 2: # use 1D version if isinstance(offset, np.ndarray): offset = offset[0] return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order, out=out) assert -data.ndim <= axis < data.ndim # create reshaped data views out_view = out if axis < 0: axis = data.ndim - int(axis) if axis == 0: # transpose data views so columns are treated as rows data = data.T out_view = out_view.T if offset is None: # use the first element of each row as the offset offset = np.copy(data[:, 0]) elif np.size(offset) == 1: offset = np.reshape(offset, (1,)) alpha = np.array(alpha, copy=False).astype(dtype, copy=False) # calculate the moving average row_size = data.shape[1] row_n = data.shape[0] scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype), dtype=dtype) # create a scaled cumulative sum array np.multiply( data, np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype), dtype=dtype) / scaling_factors[np.newaxis, :-1], dtype=dtype, out=out_view ) np.cumsum(out_view, axis=1, dtype=dtype, out=out_view) out_view /= scaling_factors[np.newaxis, -2::-1] if not (np.size(offset) == 1 and offset == 0): offset = offset.astype(dtype, copy=False) # add the offsets to the scaled cumulative sums out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:] return out

uso:

data_n = 100000000 data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100 span = 5000 # span >= 1 alpha = 2/(span+1) # for pandas` span parameter # com = 1000 # com >= 0 # alpha = 1/(1+com) # for pandas` center-of-mass parameter # halflife = 100 # halflife > 0 # alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameter result = ewma_vectorized_safe(data, alpha)

Solo un consejo

Es fácil calcular un ''tamaño de ventana'' (los promedios técnicamente exponenciales tienen ''ventanas'' infinitas) para un alpha dado, dependiendo de la contribución de los datos en esa ventana al promedio. Esto es útil, por ejemplo, para elegir qué parte del inicio del resultado tratar como poco confiable debido a los efectos de borde.

def window_size(alpha, sum_proportion): # Increases with increased sum_proportion and decreased alpha # solve (1-alpha)**window_size = (1-sum_proportion) for window_size return int(np.log(1-sum_proportion) / np.log(1-alpha)) alpha = 0.02 sum_proportion = .99 # window covers 99% of contribution to the moving average window = window_size(alpha, sum_proportion) # = 227 sum_proportion = .75 # window covers 75% of contribution to the moving average window = window_size(alpha, sum_proportion) # = 68

La relación alpha = 2 / (window_size + 1.0) utilizada en este hilo (la opción ''span'' de pandas ) es una aproximación muy aproximada del inverso de la función anterior (con sum_proportion~=0.87 ). alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size) es más preciso (la opción ''half-life'' de pandas iguala esta fórmula con sum_proportion=0.5 ).

En el siguiente ejemplo, los data representan una señal ruidosa continua. cutoff_idx es la primera posición del result donde al menos el 99% del valor depende de valores separados en los data (es decir, menos del 1% depende de los datos [0]). Los datos hasta cutoff_idx se excluyen de los resultados finales porque dependen demasiado del primer valor en los data , por lo tanto, posiblemente sesgan el promedio.

result = ewma_vectorized_safe(data, alpha, chunk_size) sum_proportion = .99 cutoff_idx = window_size(alpha, sum_proportion) result = result[cutoff_idx:]

Para ilustrar el problema que resolvió anteriormente, puede ejecutar esto varias veces, observe el inicio falso de la línea roja, que a menudo aparece, que se omite después de cutoff_idx :

data_n = 100000 data = np.random.rand(data_n) * 100 window = 1000 sum_proportion = .99 alpha = 1 - np.exp(np.log(1-sum_proportion)/window) result = ewma_vectorized_safe(data, alpha) cutoff_idx = window_size(alpha, sum_proportion) x = np.arange(start=0, stop=result.size) import matplotlib.pyplot as plt plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], ''-r'', x[cutoff_idx:], result[cutoff_idx:], ''-b'') plt.show()

tenga en cuenta que cutoff_idx==window porque alpha se configuró con el inverso de la función window_size() , con la misma sum_proportion . Esto es similar a cómo los pandas aplican ewm(span=window, min_periods=window) .