con - graficar series de tiempo en python
calcular el promedio móvil exponencial en python (10)
Tengo un rango de fechas y una medición en cada una de esas fechas. Me gustaría calcular un promedio móvil exponencial para cada una de las fechas. ¿Alguien sabe cómo hacer esto?
Soy nuevo en Python. No parece que los promedios estén integrados en la biblioteca estándar de Python, lo que me parece un poco extraño. Tal vez no estoy buscando en el lugar correcto.
Entonces, dado el siguiente código, ¿cómo podría calcular el promedio móvil ponderado de los puntos de cociente intelectual para las fechas del calendario?
from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]
(Probablemente haya una mejor manera de estructurar los datos, cualquier consejo sería apreciado)
EDITAR: Parece que la función mov_average_expw()
del submódulo scikits.timeseries.lib.moving_funcs de SciKits (juegos de herramientas complementarios que complementan SciPy ) se adapta mejor a la redacción de su pregunta.
Para calcular un alisamiento exponencial de sus datos con un factor de suavizado alpha
(es (1 - alpha)
en términos de Wikipedia):
>>> alpha = 0.5
>>> assert 0 < alpha <= 1.0
>>> av = sum(alpha**n.days * iq
... for n, iq in map(lambda (day, iq), today=max(days): (today-day, iq),
... sorted(zip(days, IQ), key=lambda p: p[0], reverse=True)))
95.0
Lo anterior no es bonito, así que vamos a refactorizarlo un poco:
from collections import namedtuple
from operator import itemgetter
def smooth(iq_data, alpha=1, today=None):
"""Perform exponential smoothing with factor `alpha`.
Time period is a day.
Each time period the value of `iq` drops `alpha` times.
The most recent data is the most valuable one.
"""
assert 0 < alpha <= 1
if alpha == 1: # no smoothing
return sum(map(itemgetter(1), iq_data))
if today is None:
today = max(map(itemgetter(0), iq_data))
return sum(alpha**((today - date).days) * iq for date, iq in iq_data)
IQData = namedtuple("IQData", "date iq")
if __name__ == "__main__":
from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]
iqdata = list(map(IQData, days, IQ))
print("/n".join(map(str, iqdata)))
print(smooth(iqdata, alpha=0.5))
Ejemplo:
$ python26 smooth.py
IQData(date=datetime.date(2008, 1, 1), iq=110)
IQData(date=datetime.date(2008, 1, 2), iq=105)
IQData(date=datetime.date(2008, 1, 7), iq=90)
95.0
No conozco Python, pero para la parte de promedio, ¿te refieres a un filtro de paso bajo exponencialmente en descomposición de la forma
y_new = y_old + (input - y_old)*alpha
donde alpha = dt / tau, dt = el paso del filtro, tau = la constante de tiempo del filtro? (la forma de paso variable variable de esto es la siguiente, solo clip dt / tau para no ser más de 1.0)
y_new = y_old + (input - y_old)*dt/tau
Si desea filtrar algo así como una fecha, asegúrese de convertir a una cantidad en coma flotante como # de segundos desde el 1 de enero de 1970.
Mi pitón está un poco oxidado (cualquiera puede escribir este código para hacer correcciones, si he estropeado la sintaxis de alguna manera), pero aquí va ...
def movingAverageExponential(values, alpha, epsilon = 0):
if not 0 < alpha < 1:
raise ValueError("out of range, alpha=''%s''" % alpha)
if not 0 <= epsilon < alpha:
raise ValueError("out of range, epsilon=''%s''" % epsilon)
result = [None] * len(values)
for i in range(len(result)):
currentWeight = 1.0
numerator = 0
denominator = 0
for value in values[i::-1]:
numerator += value * currentWeight
denominator += currentWeight
currentWeight *= alpha
if currentWeight < epsilon:
break
result[i] = numerator / denominator
return result
Esta función se mueve hacia atrás, desde el final de la lista hasta el comienzo, calculando el promedio móvil exponencial para cada valor trabajando hacia atrás hasta que el coeficiente de peso para un elemento sea menor que el épsilon dado.
Al final de la función, invierte los valores antes de devolver la lista (para que estén en el orden correcto para la persona que llama).
(NOTA LATERAL: si estuviera usando un lenguaje que no sea Python, primero crearía una matriz vacía de tamaño completo y luego la llenaría hacia atrás, para que no tuviera que invertirla al final. Pero no No creo que puedas declarar una gran matriz vacía en Python. Y en las listas de Python, agregar es mucho menos costoso que anteponer, por lo que construí la lista en orden inverso. Corrígeme si estoy equivocado.
El argumento ''alfa'' es el factor de disminución en cada iteración. Por ejemplo, si usó un alfa de 0.5, entonces el valor promedio móvil de hoy estaría compuesto por los siguientes valores ponderados:
today: 1.0
yesterday: 0.5
2 days ago: 0.25
3 days ago: 0.125
...etc...
Por supuesto, si tiene una gran variedad de valores, los valores de hace diez o quince días no contribuirán mucho al promedio ponderado de hoy. El argumento ''épsilon'' le permite establecer un punto de corte, debajo del cual dejará de preocuparse por los valores antiguos (ya que su contribución al valor actual será insignificante).
Invocarías la función algo como esto:
result = movingAverageExponential(values, 0.75, 0.0001)
Hice un poco de búsqueda en Google y encontré el siguiente código de muestra ( http://osdir.com/ml/python.matplotlib.general/2005-04/msg00044.html ):
def ema(s, n):
"""
returns an n period exponential moving average for
the time series s
s is a list ordered from oldest (index 0) to most
recent (index -1)
n is an integer
returns a numeric array of the exponential
moving average
"""
s = array(s)
ema = []
j = 1
#get n sma first and calculate the next n period ema
sma = sum(s[:n]) / n
multiplier = 2 / float(1 + n)
ema.append(sma)
#EMA(current) = ( (Price(current) - EMA(prev) ) x Multiplier) + EMA(prev)
ema.append(( (s[n] - sma) * multiplier) + sma)
#now calculate the rest of the values
for i in s[n+1:]:
tmp = ( (i - ema[j]) * multiplier) + ema[j]
j = j + 1
ema.append(tmp)
return ema
Encontré el fragmento de código de @earino bastante útil, pero necesitaba algo que pudiera suavizar continuamente una secuencia de valores, así que lo refactoré a esto:
def exponential_moving_average(period=1000):
""" Exponential moving average. Smooths the values in v over ther period. Send in values - at first it''ll return a simple average, but as soon as it''s gahtered ''period'' values, it''ll start to use the Exponential Moving Averge to smooth the values.
period: int - how many values to smooth over (default=100). """
multiplier = 2 / float(1 + period)
cum_temp = yield None # We are being primed
# Start by just returning the simple average until we have enough data.
for i in xrange(1, period + 1):
cum_temp += yield cum_temp / float(i)
# Grab the timple avergae
ema = cum_temp / period
# and start calculating the exponentially smoothed average
while True:
ema = (((yield ema) - ema) * multiplier) + ema
y lo uso así:
def temp_monitor(pin):
""" Read from the temperature monitor - and smooth the value out. The sensor is noisy, so we use exponential smoothing. """
ema = exponential_moving_average()
next(ema) # Prime the generator
while True:
yield ema.send(val_to_temp(pin.read()))
(donde pin.read () produce el siguiente valor que me gustaría consumir).
En los ejemplos de matplotlib.org ( http://matplotlib.org/examples/pylab_examples/finance_work2.html ) se proporciona un buen ejemplo de la función Promedio Móvil Exponencial (EMA) usando numpy:
def moving_average(x, n, type):
x = np.asarray(x)
if type==''simple'':
weights = np.ones(n)
else:
weights = np.exp(np.linspace(-1., 0., n))
weights /= weights.sum()
a = np.convolve(x, weights, mode=''full'')[:len(x)]
a[:n] = a[n]
return a
Aquí hay una muestra simple que trabajé basado en http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:moving_averages
Tenga en cuenta que a diferencia de su hoja de cálculo, no calculo el SMA, y no espero para generar el EMA después de 10 muestras. Esto significa que mis valores difieren ligeramente, pero si lo grafica, sigue exactamente después de 10 muestras. Durante las primeras 10 muestras, el EMA que calculo se suaviza adecuadamente.
def emaWeight(numSamples):
return 2 / float(numSamples + 1)
def ema(close, prevEma, numSamples):
return ((close-prevEma) * emaWeight(numSamples) ) + prevEma
samples = [
22.27, 22.19, 22.08, 22.17, 22.18, 22.13, 22.23, 22.43, 22.24, 22.29,
22.15, 22.39, 22.38, 22.61, 23.36, 24.05, 23.75, 23.83, 23.95, 23.63,
23.82, 23.87, 23.65, 23.19, 23.10, 23.33, 22.68, 23.10, 22.40, 22.17,
]
emaCap = 10
e=samples[0]
for s in range(len(samples)):
numSamples = emaCap if s > emaCap else s
e = ema(samples[s], e, numSamples)
print e
Siempre estoy calculando EMA con Pandas:
Aquí hay un ejemplo de cómo hacerlo:
import pandas as pd
import numpy as np
def ema(values, period):
values = np.array(values)
return pd.ewma(values, span=period)[-1]
values = [9, 5, 10, 16, 5]
period = 5
print ema(values, period)
Más información sobre Pandas EWMA:
http://pandas.pydata.org/pandas-docs/stable/generated/pandas.ewma.html
También puede usar el método de filtro SciPy porque el EMA es un filtro IIR. Esto tendrá el beneficio de ser aproximadamente 64 veces más rápido según lo medido en mi sistema usando timeit en grandes conjuntos de datos en comparación con el enfoque enumerate () .
import numpy as np
from scipy.signal import lfilter
x = np.random.normal(size=1234)
alpha = .1 # smoothing coefficient
zi = [x[0]] # seed the filter state with first value
# filter can process blocks of continuous data if <zi> is maintained
y, zi = lfilter([1.-alpha], [1., -alpha], x, zi=zi)
Una forma rápida (copiar y pegar desde aquí ) es la siguiente:
def ExpMovingAverage(values, window):
""" Numpy implementation of EMA
"""
weights = np.exp(np.linspace(-1., 0., window))
weights /= weights.sum()
a = np.convolve(values, weights, mode=''full'')[:len(values)]
a[:window] = a[window]
return a