python - Versión NumPy de "Promedio móvil ponderado exponencial", equivalente a pandas.ewm(). Mean()
performance vectorization (11)
¿Cómo obtengo el promedio móvil ponderado exponencial en NumPy como el siguiente en pandas ?
import pandas as pd
import pandas_datareader as pdr
from datetime import datetime
# Declare variables
ibm = pdr.get_data_yahoo(symbols=''IBM'', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)[''Adj Close'']
windowSize = 20
# Get PANDAS exponential weighted moving average
ewm_pd = pd.DataFrame(ibm).ewm(span=windowSize, min_periods=windowSize).mean().as_matrix()
print(ewm_pd)
Intenté lo siguiente con NumPy
import numpy as np
import pandas_datareader as pdr
from datetime import datetime
# From this post: http://stackoverflow.com/a/40085052/3293881 by @Divakar
def strided_app(a, L, S): # Window len = L, Stride len/stepsize = S
nrows = ((a.size - L) // S) + 1
n = a.strides[0]
return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n))
def numpyEWMA(price, windowSize):
weights = np.exp(np.linspace(-1., 0., windowSize))
weights /= weights.sum()
a2D = strided_app(price, windowSize, 1)
returnArray = np.empty((price.shape[0]))
returnArray.fill(np.nan)
for index in (range(a2D.shape[0])):
returnArray[index + windowSize-1] = np.convolve(weights, a2D[index])[windowSize - 1:-windowSize + 1]
return np.reshape(returnArray, (-1, 1))
# Declare variables
ibm = pdr.get_data_yahoo(symbols=''IBM'', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)[''Adj Close'']
windowSize = 20
# Get NumPy exponential weighted moving average
ewma_np = numpyEWMA(ibm, windowSize)
print(ewma_np)
Pero los resultados no son similares a los de los pandas.
¿Existe quizás un mejor enfoque para calcular el promedio móvil ponderado exponencial directamente en NumPy y obtener exactamente el mismo resultado que
pandas.ewm().mean()
?
Con 60,000 solicitudes de solución de pandas, obtengo unos 230 segundos. Estoy seguro de que con un NumPy puro, esto se puede disminuir significativamente.
Los
pandas
EWMA 23x más rápidos
La pregunta es estrictamente pedir una solución
numpy
, sin embargo, parece que el OP fue en realidad justo después de una solución
numpy
pura para acelerar el tiempo de ejecución.
numba.jit
un problema similar, pero en cambio miré hacia
numba.jit
que acelera masivamente el tiempo de cálculo
In [24]: a = np.random.random(10**7)
...: df = pd.Series(a)
In [25]: %timeit numpy_ewma(a, 10) # /a/42915307/4013571
...: %timeit df.ewm(span=10).mean() # pandas
...: %timeit numpy_ewma_vectorized_v2(a, 10) # best w/o numba: /a/42926270/4013571
...: %timeit _ewma(a, 10) # fastest accurate (below)
...: %timeit _ewma_infinite_hist(a, 10) # fastest overall (below)
4.14 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
991 ms ± 52.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
396 ms ± 8.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
181 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
39.6 ms ± 979 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Reducción a matrices más pequeñas de
a = np.random.random(100)
(resultados en el mismo orden)
41.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
945 ms ± 12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
16 µs ± 93.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
1.66 µs ± 13.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
1.14 µs ± 5.57 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
También vale la pena señalar que mis funciones a continuación están alineadas de manera idéntica a los
pandas
(ver los ejemplos en docstr), mientras que algunas de las respuestas aquí toman varias aproximaciones diferentes.
Por ejemplo,
In [57]: print(pd.DataFrame([1,2,3]).ewm(span=2).mean().values.ravel())
...: print(numpy_ewma_vectorized_v2(np.array([1,2,3]), 2))
...: print(numpy_ewma(np.array([1,2,3]), 2))
[1. 1.75 2.61538462]
[1. 1.66666667 2.55555556]
[1. 1.18181818 1.51239669]
El código fuente que he documentado para mi propia biblioteca.
import numpy as np
from numba import jit
from numba import float64
from numba import int64
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
to provide better adjustments for small windows via:
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
(1 + (1-a) + (1-a)^2 + ... + (1-a)^n).
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or ''span''
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=True).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
w = 1
ewma_old = arr_in[0]
ewma[0] = ewma_old
for i in range(1, n):
w += (1-alpha)**i
ewma_old = ewma_old*(1-alpha) + arr_in[i]
ewma[i] = ewma_old / w
return ewma
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma_infinite_hist(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
assuming infinite history via the recursive form:
(2) (i) y[0] = x[0]; and
(ii) y[t] = a*x[t] + (1-a)*y[t-1] for t>0.
This method is less accurate that ``_ewma`` but
much faster:
In [1]: import numpy as np, bars
...: arr = np.random.random(100000)
...: %timeit bars._ewma(arr, 10)
...: %timeit bars._ewma_infinite_hist(arr, 10)
3.74 ms ± 60.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
262 µs ± 1.54 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or ''span''
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=False).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
ewma[0] = arr_in[0]
for i in range(1, n):
ewma[i] = arr_in[i] * alpha + ewma[i-1] * (1 - alpha)
return ewma
Aquí está mi implementación para matrices de entrada 1D con un tamaño de ventana infinito. Como usa números grandes, solo funciona con matrices de entrada con elementos de valor absoluto <1e16, cuando se usa float32, pero ese debería ser el caso.
La idea es cambiar la forma de la matriz de entrada en segmentos de una longitud limitada, para que no se produzca un desbordamiento, y luego hacer el cálculo de ewm con cada segmento por separado.
def ewm(x, alpha):
"""
Returns the exponentially weighted mean y of a numpy array x with scaling factor alpha
y[0] = x[0]
y[j] = (1. - alpha) * y[j-1] + alpha * x[j], for j > 0
x -- 1D numpy array
alpha -- float
"""
n = int(-100. / np.log(1.-alpha)) # Makes sure that the first and last elements in f are very big and very small (about 1e22 and 1e-22)
f = np.exp(np.arange(1-n, n, 2) * (0.5 * np.log(1. - alpha))) # Scaling factor for each slice
tmp = (np.resize(x, ((len(x) + n - 1) // n, n)) / f * alpha).cumsum(axis=1) * f # Get ewm for each slice of length n
# Add the last value of each previous slice to the next slice with corresponding scaling factor f and return result
return np.resize(tmp + np.tensordot(np.append(x[0], np.roll(tmp.T[n-1], 1)[1:]), f * ((1. - alpha) / f[0]), axes=0), len(x))
¡Creo que finalmente lo he descifrado!
Aquí hay una versión vectorizada de la función
numpy_ewma
que se afirma que produce los resultados correctos de la
@RaduS''s post
:
def numpy_ewma_vectorized(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
scale = 1/alpha_rev
n = data.shape[0]
r = np.arange(n)
scale_arr = scale**r
offset = data[0]*alpha_rev**(r+1)
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
Mayor impulso
Podemos impulsarlo aún más con la reutilización de algunos códigos, así:
def numpy_ewma_vectorized_v2(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
n = data.shape[0]
pows = alpha_rev**(np.arange(n+1))
scale_arr = 1/pows[:-1]
offset = data[0]*pows[1:]
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
Prueba de tiempo de ejecución
Midamos estos dos contra la misma función de bucle para un gran conjunto de datos.
In [97]: data = np.random.randint(2,9,(5000))
...: window = 20
...:
In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))
Out[98]: True
In [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))
Out[99]: True
In [100]: %timeit numpy_ewma(data, window)
100 loops, best of 3: 6.03 ms per loop
In [101]: %timeit numpy_ewma_vectorized(data, window)
1000 loops, best of 3: 665 µs per loop
In [102]: %timeit numpy_ewma_vectorized_v2(data, window)
1000 loops, best of 3: 357 µs per loop
In [103]: 6030/357.0
Out[103]: 16.89075630252101
¡Hay una aceleración de alrededor de 17 veces!
Aquí hay otra solución que O se le ocurrió mientras tanto. Es aproximadamente cuatro veces más rápido que la solución de pandas.
def numpy_ewma(data, window):
returnArray = np.empty((data.shape[0]))
returnArray.fill(np.nan)
e = data[0]
alpha = 2 / float(window + 1)
for s in range(data.shape[0]):
e = ((data[s]-e) *alpha ) + e
returnArray[s] = e
return returnArray
Usé esta fórmula como punto de partida. Estoy seguro de que esto se puede mejorar aún más, pero es al menos un punto de partida.
Aquí hay una implementación usando NumPy que es equivalente a usar
df.ewm(alpha=alpha).mean()
.
Después de leer la documentación, son solo algunas operaciones matriciales.
El truco es construir las matrices correctas.
Vale la pena señalar que debido a que estamos creando matrices flotantes, puede comer rápidamente a través de su memoria si la matriz de entrada es demasiado grande.
import pandas as pd
import numpy as np
def ewma(x, alpha):
''''''
Returns the exponentially weighted moving average of x.
Parameters:
-----------
x : array-like
alpha : float {0 <= alpha <= 1}
Returns:
--------
ewma: numpy array
the exponentially weighted moving average
''''''
# Coerce x to an array
x = np.array(x)
n = x.size
# Create an initial weight matrix of (1-alpha), and a matrix of powers
# to raise the weights by
w0 = np.ones(shape=(n,n)) * (1-alpha)
p = np.vstack([np.arange(i,i-n,-1) for i in range(n)])
# Create the weight matrix
w = np.tril(w0**p,0)
# Calculate the ewma
return np.dot(w, x[::np.newaxis]) / w.sum(axis=1)
Probemos su:
alpha = 0.55
x = np.random.randint(0,30,15)
df = pd.DataFrame(x, columns=[''A''])
df.ewm(alpha=alpha).mean()
# returns:
# A
# 0 13.000000
# 1 22.655172
# 2 20.443268
# 3 12.159796
# 4 14.871955
# 5 15.497575
# 6 20.743511
# 7 20.884818
# 8 24.250715
# 9 18.610901
# 10 17.174686
# 11 16.528564
# 12 17.337879
# 13 7.801912
# 14 12.310889
ewma(x=x, alpha=alpha)
# returns:
# array([ 13. , 22.65517241, 20.44326778, 12.1597964 ,
# 14.87195534, 15.4975749 , 20.74351117, 20.88481763,
# 24.25071484, 18.61090129, 17.17468551, 16.52856393,
# 17.33787888, 7.80191235, 12.31088889])
Dado
alpha
y
windowSize
, aquí hay un enfoque para simular el comportamiento correspondiente en NumPy:
def numpy_ewm_alpha(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.full(df.shape[0],np.nan)
out[windowSize-1:] = np.convolve(a,wghts,''valid'')
return out
Ejecuciones de muestra para verificación:
In [54]: alpha = 0.55
...: windowSize = 20
...:
In [55]: df = pd.DataFrame(np.random.randint(2,9,(100)))
In [56]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
...:
Max. error : 5.10531254605e-07
In [57]: alpha = 0.75
...: windowSize = 30
...:
In [58]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
Max. error : 8.881784197e-16
Prueba de tiempo de ejecución en un conjunto de datos más grande:
In [61]: alpha = 0.55
...: windowSize = 20
...:
In [62]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [63]: %timeit df.ewm(alpha = alpha, min_periods=windowSize).mean()
1000 loops, best of 3: 851 µs per loop
In [64]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
Mayor impulso
Para aumentar aún más el rendimiento, podríamos evitar la inicialización con NaN y, en su lugar, usar el conjunto
np.convolve
por
np.convolve
, de esta manera:
def numpy_ewm_alpha_v2(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.convolve(a,wghts)
out[:windowSize-1] = np.nan
return out[:a.size]
Tiempos -
In [117]: alpha = 0.55
...: windowSize = 20
...:
In [118]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [119]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
In [120]: %timeit numpy_ewm_alpha_v2(df.values.ravel(), alpha = alpha, windowSize = windowSize)
10000 loops, best of 3: 195 µs per loop
Esta respuesta puede parecer irrelevante. Pero, para aquellos que también necesitan calcular la varianza ponderada exponencialmente (y también la desviación estándar) con NumPy, la siguiente solución será útil:
import numpy as np
def ew(a, alpha, winSize):
_alpha = 1 - alpha
ws = _alpha ** np.arange(winSize)
w_sum = ws.sum()
ew_mean = np.convolve(a, ws)[winSize - 1] / w_sum
bias = (w_sum ** 2) / ((w_sum ** 2) - (ws ** 2).sum())
ew_var = (np.convolve((a - ew_mean) ** 2, ws)[winSize - 1] / w_sum) * bias
ew_std = np.sqrt(ew_var)
return (ew_mean, ew_var, ew_std)
Gracias a la solución de @ Divakar y eso es realmente rápido. Sin embargo, causa un problema de desbordamiento que fue señalado por @Danny. La función no devuelve respuestas correctas cuando la longitud es mayor que 13835 más o menos a mi final.
La siguiente es mi solución basada en la solución de Divakar y pandas.ewm (). Mean ()
def numpy_ema(data, com=None, span=None, halflife=None, alpha=None):
"""Summary
Calculate ema with automatically-generated alpha. Weight of past effect
decreases as the length of window increasing.
# these functions reproduce the pandas result when the flag adjust=False is set.
References:
https://.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
Args:
data (TYPE): Description
com (float, optional): Specify decay in terms of center of mass, alpha=1/(1+com), for com>=0
span (float, optional): Specify decay in terms of span, alpha=2/(span+1), for span>=1
halflife (float, optional): Specify decay in terms of half-life, alpha=1-exp(log(0.5)/halflife), for halflife>0
alpha (float, optional): Specify smoothing factor alpha directly, 0<alpha<=1
Returns:
TYPE: Description
Raises:
ValueError: Description
"""
n_input = sum(map(bool, [com, span, halflife, alpha]))
if n_input != 1:
raise ValueError(
''com, span, halflife, and alpha are mutually exclusive'')
nrow = data.shape[0]
if np.isnan(data).any() or (nrow > 13835) or (data.ndim == 2):
df = pd.DataFrame(data)
df_ewm = df.ewm(com=com, span=span, halflife=halflife,
alpha=alpha, adjust=False)
out = df_ewm.mean().values.squeeze()
else:
if com:
alpha = 1 / (1 + com)
elif span:
alpha = 2 / (span + 1.0)
elif halflife:
alpha = 1 - np.exp(np.log(0.5) / halflife)
alpha_rev = 1 - alpha
pows = alpha_rev**(np.arange(nrow + 1))
scale_arr = 1 / pows[:-1]
offset = data[0] * pows[1:]
pw0 = alpha * alpha_rev**(nrow - 1)
mult = data * pw0 * scale_arr
cumsums = np.cumsum(mult)
out = offset + cumsums * scale_arr[::-1]
return out
La respuesta de @ Divakar parece causar desbordamiento al tratar con
numpy_ewma_vectorized(np.random.random(500000), 10)
Lo que he estado usando es:
def EMA(input, time_period=10): # For time period = 10
t_ = time_period - 1
ema = np.zeros_like(input,dtype=float)
multiplier = 2.0 / (time_period + 1)
#multiplier = 1 - multiplier
for i in range(len(input)):
# Special Case
if i > t_:
ema[i] = (input[i] - ema[i-1]) * multiplier + ema[i-1]
else:
ema[i] = np.mean(input[:i+1])
return ema
Sin embargo, esto es mucho más lento que la solución panda:
from pandas import ewma as pd_ema
def EMA_fast(X, time_period = 10):
out = pd_ema(X, span=time_period, min_periods=time_period)
out[:time_period-1] = np.cumsum(X[:time_period-1]) / np.asarray(range(1,time_period))
return out
Sobre la base de la gran respuesta de
, aquí hay una implementación que corresponde a la bandera de
adjust=True
de la función pandas, es decir, usando pesos en lugar de recursividad.
def numpy_ewma(data, window):
alpha = 2 /(window + 1.0)
scale = 1/(1-alpha)
n = data.shape[0]
scale_arr = (1-alpha)**(-1*np.arange(n))
weights = (1-alpha)**np.arange(n)
pw0 = (1-alpha)**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = cumsums*scale_arr[::-1] / weights.cumsum()
return out
Actualizado 08/06/2019
SOLUCIÓN PURITA, RÁPIDA Y VECTORIZADA PARA ENTRADAS GRANDES
out
parámetro para el cálculo en el lugar, parámetro
dtype
, parámetro de
order
índice
Esta función es equivalente a pandas ''
ewm(adjust=False).mean()
, pero mucho más rápido.
ewm(adjust=True).mean()
(el valor predeterminado para los pandas) puede producir diferentes valores al comienzo del resultado.
Estoy trabajando para agregar la funcionalidad de
adjust
a esta solución.
La respuesta de @ Divakar
conduce a problemas de precisión de coma flotante cuando la entrada es demasiado grande.
Esto se debe a que
(1-alpha)**(n+1) -> 0
cuando
n -> inf
y
alpha -> 1
, lo que lleva a que aparezcan valores de división por cero y
NaN
en el cálculo.
Aquí está mi solución más rápida sin problemas de precisión, casi completamente vectorizada.
Se ha vuelto un poco complicado, pero el rendimiento es excelente, especialmente para entradas realmente grandes.
Sin usar cálculos en el lugar (que es posible usando el parámetro
out
, ahorrando tiempo de asignación de memoria): 3.62 segundos para el vector de entrada de elementos de 100M, 3.2ms para un vector de entrada de elementos de 100K y 293 µs para un vector de entrada de 5000 elementos en un bastante viejo PC (los resultados variarán con diferentes valores de
alpha
/
row_size
).
# tested with python3 & numpy 1.15.2
import numpy as np
def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order=''C'', out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size. Default depends on dtype.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {''C'', ''F'', ''A''}, optional
Order to use when flattening the data. Defaults to ''C''.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float
else:
dtype = np.dtype(dtype)
row_size = int(row_size) if row_size is not None
else get_max_row_size(alpha, dtype)
if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)
if data.ndim > 1:
# flatten input
data = np.reshape(data, -1, order=order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
row_n = int(data.size // row_size) # the number of rows to use
trailing_n = int(data.size % row_size) # the amount of data leftover
first_offset = data[0]
if trailing_n > 0:
# set temporary results to slice view of out parameter
out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
else:
out_main_view = out
data_main_view = data
# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,
order=''C'', out=out_main_view)
scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)
last_scaling_factor = scaling_factors[-1]
# create offset array
offsets = np.empty(out_main_view.shape[0], dtype=dtype)
offsets[0] = first_offset
# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]
# add the offsets to the result
out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]
if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],
dtype=dtype, order=''C'', out=out[-trailing_n:])
return out
def get_max_row_size(alpha, dtype=float):
assert 0. <= alpha < 1.
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy''s optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
epsilon = np.finfo(dtype).tiny
# If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon)/np.log(1-alpha)) + 1
La función 1D ewma:
def ewma_vectorized(data, alpha, offset=None, dtype=None, order=''C'', out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {''C'', ''F'', ''A''}, optional
Order to use when flattening the data. Defaults to ''C''.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if data.ndim > 1:
# flatten input
data = data.reshape(-1, order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if offset is None:
offset = data[0]
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),
dtype=dtype)
# create cumulative sum array
np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],
dtype=dtype, out=out)
np.cumsum(out, dtype=dtype, out=out)
# cumsums / scaling
out /= scaling_factors[-2::-1]
if offset != 0:
offset = np.array(offset, copy=False).astype(dtype, copy=False)
# add offsets
out += offset * scaling_factors[1:]
return out
La función 2D ewma:
def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order=''C'', out=None):
"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {''C'', ''F'', ''A''}, optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
assert data.ndim <= 2
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
offset = offset[0]
return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,
out=out)
assert -data.ndim <= axis < data.ndim
# create reshaped data views
out_view = out
if axis < 0:
axis = data.ndim - int(axis)
if axis == 0:
# transpose data views so columns are treated as rows
data = data.T
out_view = out_view.T
if offset is None:
# use the first element of each row as the offset
offset = np.copy(data[:, 0])
elif np.size(offset) == 1:
offset = np.reshape(offset, (1,))
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# calculate the moving average
row_size = data.shape[1]
row_n = data.shape[0]
scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
dtype=dtype)
# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),
dtype=dtype)
/ scaling_factors[np.newaxis, :-1],
dtype=dtype, out=out_view
)
np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
out_view /= scaling_factors[np.newaxis, -2::-1]
if not (np.size(offset) == 1 and offset == 0):
offset = offset.astype(dtype, copy=False)
# add the offsets to the scaled cumulative sums
out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]
return out
uso:
data_n = 100000000
data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100
span = 5000 # span >= 1
alpha = 2/(span+1) # for pandas` span parameter
# com = 1000 # com >= 0
# alpha = 1/(1+com) # for pandas` center-of-mass parameter
# halflife = 100 # halflife > 0
# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameter
result = ewma_vectorized_safe(data, alpha)
Solo un consejo
Es fácil calcular un ''tamaño de ventana'' (los promedios técnicamente exponenciales tienen ''ventanas'' infinitas) para un
alpha
dado, dependiendo de la contribución de los datos en esa ventana al promedio.
Esto es útil, por ejemplo, para elegir qué parte del inicio del resultado tratar como poco confiable debido a los efectos de borde.
def window_size(alpha, sum_proportion):
# Increases with increased sum_proportion and decreased alpha
# solve (1-alpha)**window_size = (1-sum_proportion) for window_size
return int(np.log(1-sum_proportion) / np.log(1-alpha))
alpha = 0.02
sum_proportion = .99 # window covers 99% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 227
sum_proportion = .75 # window covers 75% of contribution to the moving average
window = window_size(alpha, sum_proportion) # = 68
La relación
alpha = 2 / (window_size + 1.0)
utilizada en este hilo (la opción ''span'' de
pandas
) es una aproximación muy aproximada del inverso de la función anterior (con
sum_proportion~=0.87
).
alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size)
es más preciso (la opción ''half-life'' de pandas iguala esta fórmula con
sum_proportion=0.5
).
En el siguiente ejemplo, los
data
representan una señal ruidosa continua.
cutoff_idx
es la primera posición del
result
donde al menos el 99% del valor depende de valores separados en los
data
(es decir, menos del 1% depende de los datos [0]).
Los datos hasta
cutoff_idx
se excluyen de los resultados finales porque dependen demasiado del primer valor en los
data
, por lo tanto, posiblemente sesgan el promedio.
result = ewma_vectorized_safe(data, alpha, chunk_size)
sum_proportion = .99
cutoff_idx = window_size(alpha, sum_proportion)
result = result[cutoff_idx:]
Para ilustrar el problema que resolvió anteriormente, puede ejecutar esto varias veces, observe el inicio falso de la línea roja, que a menudo aparece, que se omite después de
cutoff_idx
:
data_n = 100000
data = np.random.rand(data_n) * 100
window = 1000
sum_proportion = .99
alpha = 1 - np.exp(np.log(1-sum_proportion)/window)
result = ewma_vectorized_safe(data, alpha)
cutoff_idx = window_size(alpha, sum_proportion)
x = np.arange(start=0, stop=result.size)
import matplotlib.pyplot as plt
plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], ''-r'',
x[cutoff_idx:], result[cutoff_idx:], ''-b'')
plt.show()
tenga en cuenta que
cutoff_idx==window
porque alpha se configuró con el inverso de la función
window_size()
, con la misma
sum_proportion
.
Esto es similar a cómo los pandas aplican
ewm(span=window, min_periods=window)
.