python - ¿buffer circular eficiente?
circular-buffer (12)
Quiero crear un buffer circular eficiente en python (con el objetivo de tomar promedios de los valores enteros en el buffer).
¿Es esta una manera eficiente de usar una lista para recolectar valores?
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
¿Qué sería más eficiente (y por qué)?
Aunque ya hay una gran cantidad de excelentes respuestas aquí, no pude encontrar ninguna comparación directa de los tiempos para las opciones mencionadas. Por lo tanto, encuentre mi humilde intento de hacer una comparación a continuación.
Solo para fines de prueba, la clase puede alternar entre un búfer basado en list
, un búfer basado en collections.deque
y un Numpy.roll
basado en Numpy.roll
.
Tenga en cuenta que el método de update
agrega solo un valor a la vez, para mantenerlo simple.
import numpy
import timeit
import collections
class CircularBuffer(object):
buffer_methods = (''list'', ''deque'', ''roll'')
def __init__(self, buffer_size, buffer_method):
self.content = None
self.size = buffer_size
self.method = buffer_method
def update(self, scalar):
if self.method == self.buffer_methods[0]:
# Use list
try:
self.content.append(scalar)
self.content.pop(0)
except AttributeError:
self.content = [0.] * self.size
elif self.method == self.buffer_methods[1]:
# Use collections.deque
try:
self.content.append(scalar)
except AttributeError:
self.content = collections.deque([0.] * self.size,
maxlen=self.size)
elif self.method == self.buffer_methods[2]:
# Use Numpy.roll
try:
self.content = numpy.roll(self.content, -1)
self.content[-1] = scalar
except IndexError:
self.content = numpy.zeros(self.size, dtype=float)
# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
buffer_method=method)
for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = ''from __main__ import circular_buffers''
timeit_results = []
for i, cb in enumerate(circular_buffers):
# We add a convenient number of convenient values (see equality test below)
code = ''[circular_buffers[{}].update(float(j)) for j in range({})]''.format(
i, circular_buffer_size)
# Testing
eval(code)
buffer_content = [item for item in cb.content]
assert buffer_content == range(circular_buffer_size)
# Timing
timeit_results.append(
timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
print ''{}: total {:.2f}s ({:.2f}ms per iteration)''.format(
cb.method, timeit_results[-1],
timeit_results[-1] / timeit_iterations * 1e3)
En mi sistema esto produce:
list: total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll: total 6.27s (0.63ms per iteration)
Basado en la respuesta de MoonCactus , aquí hay una clase de lista circularlist
. La diferencia con su versión es que aquí c[0]
siempre dará el elemento con el apendice más antiguo, c[-1]
el último elemento anexado, c[-2]
el penúltimo ... Esto es más natural para las aplicaciones.
c = circularlist(4)
c.append(1); print c, c[0], c[-1] #[1] (1 items) first 1, last 1
c.append(2); print c, c[0], c[-1] #[1, 2] (2 items) first 1, last 2
c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items) first 1, last 3
c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items) first 1, last 8
c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10
c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11
Clase:
class circularlist(object):
def __init__(self, size):
"""Initialization"""
self.index = 0
self.size = size
self._data = []
def append(self, value):
"""Append an element"""
if len(self._data) == self.size:
self._data[self.index] = value
else:
self._data.append(value)
self.index = (self.index + 1) % self.size
def __getitem__(self, key):
"""Get element by index, relative to the current index"""
if len(self._data) == self.size:
return(self._data[(key + self.index) % self.size])
else:
return(self._data[key])
def __repr__(self):
"""Return string representation"""
return self._data.__repr__() + '' ('' + str(len(self._data))+'' items)''
Este no requiere ninguna biblioteca. Crece una lista y luego circula dentro por índice.
La huella es muy pequeña (sin biblioteca), y se ejecuta dos veces más rápido que dequeue al menos. Esto es bueno para calcular los promedios móviles de hecho, pero tenga en cuenta que los artículos no se mantienen ordenados por edad como se indica arriba.
class CircularBuffer(object):
def __init__(self, size):
"""initialization"""
self.index= 0
self.size= size
self._data = []
def record(self, value):
"""append an element"""
if len(self._data) == self.size:
self._data[self.index]= value
else:
self._data.append(value)
self.index= (self.index + 1) % self.size
def __getitem__(self, key):
"""get element by index like a regular array"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
return self._data.__repr__() + '' ('' + str(len(self._data))+'' items)''
def get_all(self):
"""return a list of all the elements"""
return(self._data)
Para obtener el valor promedio, por ejemplo:
q= CircularBuffer(1000000);
for i in range(40000):
q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())
Resultados en:
capacity= 1000000
stored= 40000
average= 19999
real 0m0.024s
user 0m0.020s
sys 0m0.000s
Esto es aproximadamente 1/3 del tiempo del equivalente con dequeue.
Esto aplica el mismo principio a algunos almacenamientos intermedios destinados a contener los mensajes de texto más recientes.
import time
import datetime
import sys, getopt
class textbffr(object):
def __init__(self, size_max):
#initialization
self.posn_max = size_max-1
self._data = [""]*(size_max)
self.posn = self.posn_max
def append(self, value):
#append an element
if self.posn == self.posn_max:
self.posn = 0
self._data[self.posn] = value
else:
self.posn += 1
self._data[self.posn] = value
def __getitem__(self, key):
#return stored element
if (key + self.posn+1) > self.posn_max:
return(self._data[key - (self.posn_max-self.posn)])
else:
return(self._data[key + self.posn+1])
def print_bffr(bffr,bffer_max):
for ind in range(0,bffer_max):
stored = bffr[ind]
if stored != "":
print(stored)
print ( ''/n'' )
def make_time_text(time_value):
return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
+ str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2)
+ str(time_value.second).zfill(2))
def main(argv):
#Set things up
starttime = datetime.datetime.now()
log_max = 5
status_max = 7
log_bffr = textbffr(log_max)
status_bffr = textbffr(status_max)
scan_count = 1
#Main Loop
# every 10 secounds write a line with the time and the scan count.
while True:
time_text = make_time_text(datetime.datetime.now())
#create next messages and store in buffers
status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text)
log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")
#print whole buffers so far
print_bffr(log_bffr,log_max)
print_bffr(status_bffr,status_max)
time.sleep(2)
scan_count += 1
if __name__ == ''__main__'':
main(sys.argv[1:])
He tenido este problema antes de hacer una programación en serie. En ese momento, hace poco más de un año, tampoco pude encontrar implementaciones eficientes, así que terminé escribiendo una como extensión C y también está disponible en pypi bajo una licencia de MIT. Es súper básico, solo maneja buffers de caracteres de 8 bits, pero es de longitud flexible, por lo que puedes usar Struct o algo encima si necesitas algo más que caracteres. Ahora veo con una búsqueda en Google que hay varias opciones en estos días, por lo que es posible que también desee verlas.
La deque de Python es lenta. También puede usar numpy.roll en su lugar ¿Cómo se rotan los números en una matriz numpy de forma (n,) o (n, 1)?
En este punto de referencia, deque es 448 ms. Numpy.roll es 29ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
La pregunta original fue: buffer circular " eficiente ". Según esta eficiencia solicitada, la respuesta de aaronasterling parece ser definitivamente correcta. Usando una clase dedicada programada en Python y comparando el procesamiento de tiempo con collections.deque muestra una aceleración de x5.2 veces con deque! Aquí hay un código muy simple para probar esto:
class cb:
def __init__(self, size):
self.b = [0]*size
self.i = 0
self.sz = size
def append(self, v):
self.b[self.i] = v
self.i = (self.i + 1) % self.sz
b = cb(1000)
for i in range(10000):
b.append(i)
# called 200 times, this lasts 1.097 second on my laptop
from collections import deque
b = deque( [], 1000 )
for i in range(10000):
b.append(i)
# called 200 times, this lasts 0.211 second on my laptop
Para transformar una deque en una lista, simplemente use:
my_list = [v for v in my_deque]
A continuación, obtendrá O (1) acceso aleatorio a los elementos deque. Por supuesto, esto solo es valioso si necesita hacer muchos accesos aleatorios al deque después de haberlo configurado una vez.
Salir del encabezado de una lista hace que toda la lista se copie, por lo que es ineficiente
En su lugar, debe utilizar una lista / matriz de tamaño fijo y un índice que se mueve a través del búfer a medida que agrega / elimina elementos.
También puedes ver esta vieja receta de Python .
Aquí está mi propia versión con matriz NumPy:
#!/usr/bin/env python
import numpy as np
class RingBuffer(object):
def __init__(self, size_max, default_value=0.0, dtype=float):
"""initialization"""
self.size_max = size_max
self._data = np.empty(size_max, dtype=dtype)
self._data.fill(default_value)
self.size = 0
def append(self, value):
"""append an element"""
self._data = np.roll(self._data, 1)
self._data[0] = value
self.size += 1
if self.size == self.size_max:
self.__class__ = RingBufferFull
def get_all(self):
"""return a list of elements from the oldest to the newest"""
return(self._data)
def get_partial(self):
return(self.get_all()[0:self.size])
def __getitem__(self, key):
"""get element"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
s = self._data.__repr__()
s = s + ''/t'' + str(self.size)
s = s + ''/t'' + self.get_all()[::-1].__repr__()
s = s + ''/t'' + self.get_partial()[::-1].__repr__()
return(s)
class RingBufferFull(RingBuffer):
def append(self, value):
"""append an element when buffer is full"""
self._data = np.roll(self._data, 1)
self._data[0] = value
Tu respuesta no es correcta. Circular buffer principal tiene dos principios ( https://en.wikipedia.org/wiki/Circular_buffer )
- La lenth del buffer está configurada;
- Primero en entrar primero en salir;
- Cuando agrega o elimina un artículo, los otros artículos no deberían mover su posición
tu código a continuación:
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
Consideremos una situación en la que la lista esté completa, mediante el uso de su código:
self.mylist = [1, 2, 3, 4, 5]
ahora agregamos 6, la lista se cambia a
self.mylist = [2, 3, 4, 5, 6]
los artículos esperan 1 en la lista ha cambiado su posición
tu código es una cola, no un buffer circular.
La respuesta de Basj, creo que es la más eficiente.
Por cierto, un búfer circular puede mejorar la ejecución de la operación para agregar un elemento.
Yo usaría collections.deque
con un maxlen
arg
>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
... d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
Hay una recipe en los documentos para deque
que es similar a lo que desea. Mi afirmación de que es la más eficiente depende totalmente del hecho de que está implementada en C por un equipo increíblemente hábil que tiene el hábito de generar código de primera clase.
está bien con el uso de la clase de deque, pero para los requisitos de la pregunta (promedio) esta es mi solución:
>>> from collections import deque
>>> class CircularBuffer(deque):
... def __init__(self, size=0):
... super(CircularBuffer, self).__init__(maxlen=size)
... @property
... def average(self): # TODO: Make type check for integer or floats
... return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
... cb.append(i)
... print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14