with tutorial script scraping scrap con code python python-2.7 numpy matplotlib scipy

tutorial - using beautifulsoup python



Media móvil o media móvil (17)

¿Hay una función scipy o función numpy o módulo para python que calcula la media de ejecución de una matriz 1D dada una ventana específica?


o módulo para python que calcula

en mis pruebas en Tradewave.net TA-lib siempre gana:

import talib as ta import numpy as np import pandas as pd import scipy from scipy import signal import time as t PAIR = info.primary_pair PERIOD = 30 def initialize(): storage.reset() storage.elapsed = storage.get(''elapsed'', [0,0,0,0,0,0]) def cumsum_sma(array, period): ret = np.cumsum(array, dtype=float) ret[period:] = ret[period:] - ret[:-period] return ret[period - 1:] / period def pandas_sma(array, period): return pd.rolling_mean(array, period) def api_sma(array, period): # this method is native to Tradewave and does NOT return an array return (data[PAIR].ma(PERIOD)) def talib_sma(array, period): return ta.MA(array, period) def convolve_sma(array, period): return np.convolve(array, np.ones((period,))/period, mode=''valid'') def fftconvolve_sma(array, period): return scipy.signal.fftconvolve( array, np.ones((period,))/period, mode=''valid'') def tick(): close = data[PAIR].warmup_period(''close'') t1 = t.time() sma_api = api_sma(close, PERIOD) t2 = t.time() sma_cumsum = cumsum_sma(close, PERIOD) t3 = t.time() sma_pandas = pandas_sma(close, PERIOD) t4 = t.time() sma_talib = talib_sma(close, PERIOD) t5 = t.time() sma_convolve = convolve_sma(close, PERIOD) t6 = t.time() sma_fftconvolve = fftconvolve_sma(close, PERIOD) t7 = t.time() storage.elapsed[-1] = storage.elapsed[-1] + t2-t1 storage.elapsed[-2] = storage.elapsed[-2] + t3-t2 storage.elapsed[-3] = storage.elapsed[-3] + t4-t3 storage.elapsed[-4] = storage.elapsed[-4] + t5-t4 storage.elapsed[-5] = storage.elapsed[-5] + t6-t5 storage.elapsed[-6] = storage.elapsed[-6] + t7-t6 plot(''sma_api'', sma_api) plot(''sma_cumsum'', sma_cumsum[-5]) plot(''sma_pandas'', sma_pandas[-10]) plot(''sma_talib'', sma_talib[-15]) plot(''sma_convolve'', sma_convolve[-20]) plot(''sma_fftconvolve'', sma_fftconvolve[-25]) def stop(): log(''ticks....: %s'' % info.max_ticks) log(''api......: %.5f'' % storage.elapsed[-1]) log(''cumsum...: %.5f'' % storage.elapsed[-2]) log(''pandas...: %.5f'' % storage.elapsed[-3]) log(''talib....: %.5f'' % storage.elapsed[-4]) log(''convolve.: %.5f'' % storage.elapsed[-5]) log(''fft......: %.5f'' % storage.elapsed[-6])

resultados:

[2015-01-31 23:00:00] ticks....: 744 [2015-01-31 23:00:00] api......: 0.16445 [2015-01-31 23:00:00] cumsum...: 0.03189 [2015-01-31 23:00:00] pandas...: 0.03677 [2015-01-31 23:00:00] talib....: 0.00700 # <<< Winner! [2015-01-31 23:00:00] convolve.: 0.04871 [2015-01-31 23:00:00] fft......: 0.22306


Solución eficiente

La convolución es mucho mejor que el enfoque directo, pero (supongo) usa FFT y, por lo tanto, bastante lento. Sin embargo, especialmente para calcular la media móvil, el siguiente enfoque funciona bien

def running_mean(x, N): cumsum = numpy.cumsum(numpy.insert(x, 0, 0)) return (cumsum[N:] - cumsum[:-N]) / float(N)

El código para verificar

In[3]: x = numpy.random.random(100000) In[4]: N = 1000 In[5]: %timeit result1 = numpy.convolve(x, numpy.ones((N,))/N, mode=''valid'') 10 loops, best of 3: 41.4 ms per loop In[6]: %timeit result2 = running_mean(x, N) 1000 loops, best of 3: 1.04 ms per loop

Tenga en cuenta que numpy.allclose(result1, result2) es True , dos métodos son equivalentes. Cuanto mayor es N, mayor es la diferencia en el tiempo.


Al leer las otras respuestas, no creo que esto sea lo que me pidió la pregunta, pero llegué aquí con la necesidad de mantener un promedio continuo de una lista de valores que estaba creciendo en tamaño.

Entonces, si desea mantener una lista de valores que está adquiriendo en algún lugar (un sitio, un dispositivo de medición, etc.) y el promedio de los últimos n valores actualizados, puede usar el siguiente código, que minimiza el esfuerzo de agregar nuevos elementos:

class Running_Average(object): def __init__(self, buffer_size=10): """ Create a new Running_Average object. This object allows the efficient calculation of the average of the last `buffer_size` numbers added to it. Examples -------- >>> a = Running_Average(2) >>> a.add(1) >>> a.get() 1.0 >>> a.add(1) # there are two 1 in buffer >>> a.get() 1.0 >>> a.add(2) # there''s a 1 and a 2 in the buffer >>> a.get() 1.5 >>> a.add(2) >>> a.get() # now there''s only two 2 in the buffer 2.0 """ self._buffer_size = int(buffer_size) # make sure it''s an int self.reset() def add(self, new): """ Add a new number to the buffer, or replaces the oldest one there. """ new = float(new) # make sure it''s a float n = len(self._buffer) if n < self.buffer_size: # still have to had numbers to the buffer. self._buffer.append(new) if self._average != self._average: # ~ if isNaN(). self._average = new # no previous numbers, so it''s new. else: self._average *= n # so it''s only the sum of numbers. self._average += new # add new number. self._average /= (n+1) # divide by new number of numbers. else: # buffer full, replace oldest value. old = self._buffer[self._index] # the previous oldest number. self._buffer[self._index] = new # replace with new one. self._index += 1 # update the index and make sure it''s... self._index %= self.buffer_size # ... smaller than buffer_size. self._average -= old/self.buffer_size # remove old one... self._average += new/self.buffer_size # ...and add new one... # ... weighted by the number of elements. def __call__(self): """ Return the moving average value, for the lazy ones who don''t want to write .get . """ return self._average def get(self): """ Return the moving average value. """ return self() def reset(self): """ Reset the moving average. If for some reason you don''t want to just create a new one. """ self._buffer = [] # could use np.empty(self.buffer_size)... self._index = 0 # and use this to keep track of how many numbers. self._average = float(''nan'') # could use np.NaN . def get_buffer_size(self): """ Return current buffer_size. """ return self._buffer_size def set_buffer_size(self, buffer_size): """ >>> a = Running_Average(10) >>> for i in range(15): ... a.add(i) ... >>> a() 9.5 >>> a._buffer # should not access this!! [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0] Decreasing buffer size: >>> a.buffer_size = 6 >>> a._buffer # should not access this!! [9.0, 10.0, 11.0, 12.0, 13.0, 14.0] >>> a.buffer_size = 2 >>> a._buffer [13.0, 14.0] Increasing buffer size: >>> a.buffer_size = 5 Warning: no older data available! >>> a._buffer [13.0, 14.0] Keeping buffer size: >>> a = Running_Average(10) >>> for i in range(15): ... a.add(i) ... >>> a() 9.5 >>> a._buffer # should not access this!! [10.0, 11.0, 12.0, 13.0, 14.0, 5.0, 6.0, 7.0, 8.0, 9.0] >>> a.buffer_size = 10 # reorders buffer! >>> a._buffer [5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0] """ buffer_size = int(buffer_size) # order the buffer so index is zero again: new_buffer = self._buffer[self._index:] new_buffer.extend(self._buffer[:self._index]) self._index = 0 if self._buffer_size < buffer_size: print(''Warning: no older data available!'') # should use Warnings! else: diff = self._buffer_size - buffer_size print(diff) new_buffer = new_buffer[diff:] self._buffer_size = buffer_size self._buffer = new_buffer buffer_size = property(get_buffer_size, set_buffer_size)

Y puedes probarlo con, por ejemplo:

def graph_test(N=200): import matplotlib.pyplot as plt values = list(range(N)) values_average_calculator = Running_Average(N/2) values_averages = [] for value in values: values_average_calculator.add(value) values_averages.append(values_average_calculator()) fig, ax = plt.subplots(1, 1) ax.plot(values, label=''values'') ax.plot(values_averages, label=''averages'') ax.grid() ax.set_xlim(0, N) ax.set_ylim(0, N) fig.show()

Lo que da:


Aunque hay soluciones para esta pregunta aquí, eche un vistazo a mi solución. Es muy simple y funciona bien.

import numpy as np dataset = np.asarray([1, 2, 3, 4, 5, 6, 7]) ma = list() window = 3 for t in range(0, len(dataset)): if t+window <= len(dataset): indices = range(t, t+window) ma.append(np.average(np.take(dataset, indices))) else: ma = np.asarray(ma)


Esta pregunta ahora es incluso más antigua que cuando NeXuS escribió sobre ella el mes pasado, PERO me gusta cómo su código trata con casos extremos. Sin embargo, debido a que es un "promedio móvil simple", sus resultados van a la zaga de los datos a los que se aplican. Pensé que lidiar con casos límite de una manera más satisfactoria que los modos valid , same y full de NumPy podría lograrse aplicando un enfoque similar a un método basado en convolution() .

Mi contribución utiliza un promedio de ejecución central para alinear sus resultados con sus datos. Cuando hay muy pocos puntos disponibles para usar la ventana de tamaño completo, los promedios de ejecución se calculan a partir de ventanas sucesivamente más pequeñas en los bordes de la matriz. [En realidad, desde ventanas sucesivamente más grandes, pero eso es un detalle de implementación.]

import numpy as np def running_mean(l, N): # Also works for the(strictly invalid) cases when N is even. if (N//2)*2 == N: N = N - 1 front = np.zeros(N//2) back = np.zeros(N//2) for i in range(1, (N//2)*2, 2): front[i//2] = np.convolve(l[:i], np.ones((i,))/i, mode = ''valid'') for i in range(1, (N//2)*2, 2): back[i//2] = np.convolve(l[-i:], np.ones((i,))/i, mode = ''valid'') return np.concatenate([front, np.convolve(l, np.ones((N,))/N, mode = ''valid''), back[::-1]])

Es relativamente lento porque usa convolve() , y podría ser arreglado bastante por un verdadero Pythonista, sin embargo, creo que la idea es válida.


Para una solución corta y rápida que hace todo en un ciclo, sin dependencias, el siguiente código funciona muy bien.

mylist = [1, 2, 3, 4, 5, 6, 7] N = 3 cumsum, moving_aves = [0], [] for i, x in enumerate(mylist, 1): cumsum.append(cumsum[i-1] + x) if i>=N: moving_ave = (cumsum[i] - cumsum[i-N])/N #can do stuff with moving_ave here moving_aves.append(moving_ave)


Para una solución lista para usar, consulte http://www.scipy.org/Cookbook/SignalSmooth . Proporciona un promedio continuo con el tipo de ventana flat . Tenga en cuenta que esto es un poco más sofisticado que el simple método convolvedor hágalo usted mismo, ya que trata de manejar los problemas al principio y al final de los datos al reflejarlo (que puede o no funcionar en su caso). ..).

Para empezar, podrías intentar:

a = np.random.random(100) plt.plot(a) b = smooth(a, window=''flat'') plt.plot(b)


Puedes calcular una media móvil con:

import numpy as np def runningMean(x, N): y = np.zeros((len(x),)) for ctr in range(len(x)): y[ctr] = np.sum(x[ctr:(ctr+N)]) return y/N

Pero es lento.

Afortunadamente, Numpy incluye una función np.convolve que podemos usar para acelerar las cosas. La media móvil es equivalente a convolucionar x con un vector que es N largo, con todos los miembros igual a 1/N La implementación numpy de convolve incluye el transitorio inicial, por lo que debe eliminar los primeros puntos N-1:

def runningMeanFast(x, N): return np.convolve(x, np.ones((N,))/N)[(N-1):]

En mi máquina, la versión rápida es 20-30 veces más rápida, dependiendo de la longitud del vector de entrada y el tamaño de la ventana de promedio.

Tenga en cuenta que convolve incluye un ''same'' modo que parece que debería abordar el problema transitorio inicial, pero lo divide entre el principio y el final.


Sé que esta es una pregunta antigua, pero aquí hay una solución que no usa ninguna estructura de datos o bibliotecas adicionales. Es lineal en la cantidad de elementos de la lista de entrada y no puedo pensar en ninguna otra forma de hacerlo más eficiente (en realidad, si alguien sabe de una mejor manera de asignar el resultado, hágamelo saber).

NOTA: esto sería mucho más rápido usando una matriz numpy en lugar de una lista, pero quería eliminar todas las dependencias. También sería posible mejorar el rendimiento mediante la ejecución de subprocesos múltiples

La función asume que la lista de entrada es unidimensional, así que ten cuidado.

### Running mean/Moving average def running_mean(l, N): sum = 0 result = list( 0 for x in l) for i in range( 0, N ): sum = sum + l[i] result[i] = sum / (i+1) for i in range( N, len(l) ): sum = sum - l[i-N] + l[i] result[i] = sum / N return result


Si eliges rodar el tuyo, en lugar de usar una biblioteca existente, ten en cuenta el error de punto flotante y trata de minimizar sus efectos:

class SumAccumulator: def __init__(self): self.values = [0] self.count = 0 def add( self, val ): self.values.append( val ) self.count = self.count + 1 i = self.count while i & 0x01: i = i >> 1 v0 = self.values.pop() v1 = self.values.pop() self.values.append( v0 + v1 ) def get_total(self): return sum( reversed(self.values) ) def get_size( self ): return self.count

Si todos sus valores son aproximadamente del mismo orden de magnitud, esto ayudará a preservar la precisión al agregar siempre valores de magnitudes aproximadamente similares.


Si es importante mantener las dimensiones de la entrada (en lugar de restringir la salida al área ''valid'' de una convolución), puede usar scipy.ndimage.filters.uniform_filter1d :

import numpy as np from scipy.ndimage.filters import uniform_filter1d N = 1000 x = np.random.random(100000) y = uniform_filter1d(x, size=N) y.shape == x.shape >>> True

uniform_filter1d permite múltiples formas de manejar el borde donde ''reflect'' es el predeterminado, pero en mi caso, prefiero ''nearest'' .

También es bastante rápido (casi 50 veces más rápido que np.convolve ):

%timeit y1 = np.convolve(x, np.ones((N,))/N, mode=''same'') 100 loops, best of 3: 9.28 ms per loop %timeit y2 = uniform_filter1d(x, size=N) 10000 loops, best of 3: 191 µs per loop


Todavía no he comprobado qué tan rápido es esto, pero podrías intentar:

from collections import deque cache = deque() # keep track of seen values n = 10 # window size A = xrange(100) # some dummy iterable cum_sum = 0 # initialize cumulative sum for t, val in enumerate(A, 1): cache.append(val) cum_sum += val if t < n: avg = cum_sum / float(t) else: # if window is saturated, cum_sum -= cache.popleft() # subtract oldest value avg = cum_sum / float(n)


Un poco tarde para la fiesta, pero he hecho mi propia función que NO se ajusta alrededor de los extremos o pads con ceros que luego se usan para encontrar el promedio también. Como un regalo adicional es que también vuelve a muestrear la señal en puntos linealmente espaciados. Personalice el código a voluntad para obtener otras funciones.

El método es una simple multiplicación de matriz con un kernel gaussiano normalizado.

def running_mean(y_in, x_in, N_out=101, sigma=1): '''''' Returns running mean as a Bell-curve weighted average at evenly spaced points. Does NOT wrap signal around, or pad with zeros. Arguments: y_in -- y values, the values to be smoothed and re-sampled x_in -- x values for array Keyword arguments: N_out -- NoOf elements in resampled array. sigma -- ''Width'' of Bell-curve in units of param x . '''''' N_in = size(y_in) # Gaussian kernel x_out = np.linspace(np.min(x_in), np.max(x_in), N_out) x_in_mesh, x_out_mesh = np.meshgrid(x_in, x_out) gauss_kernel = np.exp(-np.square(x_in_mesh - x_out_mesh) / (2 * sigma**2)) # Normalize kernel, such that the sum is one along axis 1 normalization = np.tile(np.reshape(sum(gauss_kernel, axis=1), (N_out, 1)), (1, N_in)) gauss_kernel_normalized = gauss_kernel / normalization # Perform running average as a linear operation y_out = gauss_kernel_normalized @ y_in return y_out, x_out

Un uso simple en una señal sinusoidal con ruido distribuido normal agregado:


pandas es más adecuado para esto que NumPy o SciPy. Su función rolling_mean hace el trabajo de forma conveniente. También devuelve una matriz NumPy cuando la entrada es una matriz.

Es difícil superar rolling_mean en rendimiento con cualquier implementación de Python pura personalizada. Aquí hay un ejemplo de actuación frente a dos de las soluciones propuestas:

In [1]: import numpy as np In [2]: import pandas as pd In [3]: def running_mean(x, N): ...: cumsum = np.cumsum(np.insert(x, 0, 0)) ...: return (cumsum[N:] - cumsum[:-N]) / N ...: In [4]: x = np.random.random(100000) In [5]: N = 1000 In [6]: %timeit np.convolve(x, np.ones((N,))/N, mode=''valid'') 10 loops, best of 3: 172 ms per loop In [7]: %timeit running_mean(x, N) 100 loops, best of 3: 6.72 ms per loop In [8]: %timeit pd.rolling_mean(x, N)[N-1:] 100 loops, best of 3: 4.74 ms per loop In [9]: np.allclose(pd.rolling_mean(x, N)[N-1:], running_mean(x, N)) Out[9]: True

También hay buenas opciones sobre cómo lidiar con los valores de borde.


Otro enfoque para encontrar el promedio móvil sin usar numpy, panda

import itertools sample = [2, 6, 10, 8, 11, 10] list(itertools.starmap(lambda a,b: b/a, enumerate(itertools.accumulate(sample), 1)))

se imprimirá [2.0, 4.0, 6.0, 6.5, 7.4, 7.833333333333333]


UPD: Alleo y jasaarim han propuesto soluciones más eficientes.

Puede usar np.convolve para eso:

np.convolve(x, np.ones((N,))/N, mode=''valid'')

Explicación

La media móvil es un caso de la operación matemática de convolution . Para el promedio de ejecución, desliza una ventana a lo largo de la entrada y calcula la media del contenido de la ventana. Para las señales 1D discretas, la convolución es la misma cosa, excepto que en lugar de la media se calcula una combinación lineal arbitraria, es decir, se multiplica cada elemento por un coeficiente correspondiente y se suman los resultados. Esos coeficientes, uno para cada posición en la ventana, a veces se llaman kernel de convolución. Ahora, la media aritmética de N valores es (x_1 + x_2 + ... + x_N) / N , por lo que el kernel correspondiente es (1/N, 1/N, ..., 1/N) , y eso es exactamente lo que obtenemos usando np.ones((N,))/N

Bordes

El argumento mode de np.convolve especifica cómo manejar los bordes. Elegí el modo valid aquí porque creo que así es como la mayoría de la gente espera que la ejecución funcione, pero es posible que tenga otras prioridades. Aquí hay un diagrama que ilustra la diferencia entre los modos:

import numpy as np import matplotlib.pyplot as plt modes = [''full'', ''same'', ''valid''] for m in modes: plt.plot(np.convolve(np.ones((200,)), np.ones((50,))/50, mode=m)); plt.axis([-10, 251, -.1, 1.1]); plt.legend(modes, loc=''lower center''); plt.show()


Usar solo biblioteca Python Stadnard (memoria eficiente)

Solo da otra versión del uso de la biblioteca estándar deque solamente. Es bastante sorprendente para mí que la mayoría de las respuestas estén usando pandas o numpy .

def moving_average(iterable, n=3): d = deque(maxlen=n) for i in iterable: d.append(i) if len(d) == n: yield sum(d)/n r = moving_average([40, 30, 50, 46, 39, 44]) assert list(r) == [40.0, 42.0, 45.0, 43.0]

Actaully encontré otra implementación en python docs

def moving_average(iterable, n=3): # moving_average([40, 30, 50, 46, 39, 44]) --> 40.0 42.0 45.0 43.0 # http://en.wikipedia.org/wiki/Moving_average it = iter(iterable) d = deque(itertools.islice(it, n-1)) d.appendleft(0) s = sum(d) for elem in it: s += elem - d.popleft() d.append(elem) yield s / n

Sin embargo, la implementación me parece un poco más compleja de lo que debería ser. Pero debe estar en los documentos estándar de Python por alguna razón, ¿alguien podría comentar sobre la implementación de la mía y el documento estándar?