python algorithm message-queue

queue python



¿Qué es un buen algoritmo de limitación de velocidad? (10)

Aquí el algoritmo más simple , si solo quiere soltar mensajes cuando llegan demasiado rápido (en lugar de ponerlos en cola, lo que tiene sentido porque la cola puede ser arbitrariamente grande):

rate = 5.0; // unit: messages per = 8.0; // unit: seconds allowance = rate; // unit: messages last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds when (message_received): current = now(); time_passed = current - last_check; last_check = current; allowance += time_passed * (rate / per); if (allowance > rate): allowance = rate; // throttle if (allowance < 1.0): discard_message(); else: forward_message(); allowance -= 1.0;

No hay estructuras de datos, temporizadores, etc. en esta solución y funciona limpiamente :) Para ver esto, la ''tolerancia'' crece a una velocidad de 5/8 unidades por segundo como máximo, es decir, a lo sumo cinco unidades por ocho segundos. Cada mensaje que se reenvía resta una unidad, por lo que no puede enviar más de cinco mensajes por cada ocho segundos.

Tenga en cuenta que la rate debe ser un número entero, es decir, sin una parte decimal distinta de cero, o el algoritmo no funcionará correctamente (la tasa real no será rate/per ). Por ejemplo, rate=0.5; per=1.0; rate=0.5; per=1.0; no funciona porque la allowance nunca crecerá a 1.0. Pero rate=1.0; per=2.0; rate=1.0; per=2.0; funciona bien.

Podría usar algún pseudocódigo, o mejor, Python. Estoy intentando implementar una cola de limitación de velocidad para un bot de Python IRC, y funciona parcialmente, pero si alguien dispara menos mensajes que el límite (por ejemplo, el límite de velocidad es de 5 mensajes por 8 segundos y la persona activa solo 4), y el siguiente desencadenante es más de 8 segundos (por ejemplo, 16 segundos más tarde), el bot envía el mensaje, pero la cola se llena y el bot espera 8 segundos, aunque no es necesario ya que el período de 8 segundos ha transcurrido.


Mantenga el tiempo en que se enviaron las últimas cinco líneas. Mantenga los mensajes puestos en cola hasta el momento en que el quinto mensaje más reciente (si existe) es de al menos 8 segundos en el pasado (con last_five como una serie de veces):

now = time.time() if len(last_five) == 0 or (now - last_five[-1]) >= 8.0: last_five.insert(0, now) send_message(msg) if len(last_five) > 5: last_five.pop()


Necesitaba una variación en Scala. Aquí está:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) { import Thread.sleep private def now = System.currentTimeMillis / 1000.0 private val (calls, sec) = callsPerSecond private var allowance = 1.0 private var last = now def apply(a: A): B = { synchronized { val t = now val delta_t = t - last last = t allowance += delta_t * (calls / sec) if (allowance > calls) allowance = calls if (allowance < 1d) { sleep(((1 - allowance) * (sec / calls) * 1000d).toLong) } allowance -= 1 } f(a) } }

Aquí es cómo se puede usar:

val f = Limiter((5d, 8d), { _: Unit ⇒ println(System.currentTimeMillis) }) while(true){f(())}


Qué tal esto:

long check_time = System.currentTimeMillis(); int msgs_sent_count = 0; private boolean isRateLimited(int msgs_per_sec) { if (System.currentTimeMillis() - check_time > 1000) { check_time = System.currentTimeMillis(); msgs_sent_count = 0; } if (msgs_sent_count > (msgs_per_sec - 1)) { return true; } else { msgs_sent_count++; } return false; }


Si alguien todavía está interesado, utilizo esta clase simple invocable junto con un almacenamiento de valor de clave LRU programado para limitar la tasa de solicitud por IP. Utiliza una deque, pero puede reescribirse para ser utilizada con una lista.

from collections import deque import time class RateLimiter: def __init__(self, maxRate=5, timeUnit=1): self.timeUnit = timeUnit self.deque = deque(maxlen=maxRate) def __call__(self): if self.deque.maxlen == len(self.deque): cTime = time.time() if cTime - self.deque[0] > self.timeUnit: self.deque.append(cTime) return False else: return True self.deque.append(time.time()) return False r = RateLimiter() for i in range(0,100): time.sleep(0.1) print(i, "block" if r() else "pass")


Solo una implementación de Python de un código de respuesta aceptada.

import time class Object(object): pass def get_throttler(rate, per): scope = Object() scope.allowance = rate scope.last_check = time.time() def throttler(fn): current = time.time() time_passed = current - scope.last_check; scope.last_check = current; scope.allowance = scope.allowance + time_passed * (rate / per) if (scope.allowance > rate): scope.allowance = rate if (scope.allowance < 1): pass else: fn() scope.allowance = scope.allowance - 1 return throttler


Un Token Bucket es bastante simple de implementar.

Comience con un cubo con 5 fichas.

Cada 5/8 segundos: si el cucharón tiene menos de 5 fichas, agrega una.

Cada vez que desee enviar un mensaje: si el depósito tiene ≥1 símbolo, saque un símbolo y envíe el mensaje. De lo contrario, espere / suelte el mensaje / lo que sea.

(obviamente, en el código real, usaría un contador de enteros en lugar de tokens reales y puede optimizar el paso de cada 5 / 8s almacenando las marcas de tiempo)

Leyendo de nuevo la pregunta, si el límite de velocidad se restablece por completo cada 8 segundos, entonces aquí hay una modificación:

Comience con una marca de tiempo, last_send , en un momento hace mucho tiempo (por ejemplo, en la época). Además, comience con el mismo cubo de 5 tokens.

Golpea la regla cada 5/8 segundos.

Cada vez que envía un mensaje: Primero, verifique si last_send ≥ 8 segundos atrás. Si es así, llena el cubo (configúralo en 5 fichas). En segundo lugar, si hay tokens en el cubo, envíe el mensaje (de lo contrario, suelte / espere / etc.). En tercer lugar, establece last_send hasta ahora.

Eso debería funcionar para ese escenario.

De hecho, he escrito un bot de IRC usando una estrategia como esta (el primer enfoque). Está en Perl, no en Python, pero aquí hay un código para ilustrar:

La primera parte maneja agregar tokens al cubo. Puede ver la optimización de agregar tokens en función del tiempo (segunda a la última línea) y luego la última línea sujeta el contenido del contenedor al máximo (MESSAGE_BURST)

my $start_time = time; ... # Bucket handling my $bucket = $conn->{fujiko_limit_bucket}; my $lasttx = $conn->{fujiko_limit_lasttx}; $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL; ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$ conn es una estructura de datos que se transmite. Esto se encuentra dentro de un método que se ejecuta de forma rutinaria (calcula cuándo será la próxima vez que tenga algo que hacer, y duerme ese tiempo o hasta que reciba tráfico de red). La siguiente parte del método maneja el envío. Es bastante complicado, porque los mensajes tienen prioridades asociadas a ellos.

# Queue handling. Start with the ultimate queue. my $queues = $conn->{fujiko_queues}; foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) { # Ultimate is special. We run ultimate no matter what. Even if # it sends the bucket negative. --$bucket; $entry->{code}(@{$entry->{args}}); } $queues->[PRIORITY_ULTIMATE] = [];

Esa es la primera cola, que se ejecuta sin importar qué. Incluso si hace que nuestra conexión muera por las inundaciones. Se usa para cosas extremadamente importantes, como responder al PING del servidor. Luego, el resto de las colas:

# Continue to the other queues, in order of priority. QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) { my $queue = $queues->[$pri]; while (scalar(@$queue)) { if ($bucket < 1) { # continue later. $need_more_time = 1; last QRUN; } else { --$bucket; my $entry = shift @$queue; $entry->{code}(@{$entry->{args}}); } } }

Finalmente, el estado del depósito se guarda de nuevo en la estructura de datos de $ conn (en realidad un poco más adelante en el método; primero calcula cuán pronto tendrá más trabajo)

# Save status. $conn->{fujiko_limit_bucket} = $bucket; $conn->{fujiko_limit_lasttx} = $start_time;

Como puede ver, el código real de manejo del cubo es muy pequeño, aproximadamente cuatro líneas. El resto del código es manejo de colas de prioridad. El robot tiene colas de prioridad, por lo que, por ejemplo, alguien que chatee con él no puede evitar que realice sus importantes tareas de patada / prohibición.


Una solución es adjuntar una marca de tiempo a cada elemento de la cola y descartar el elemento después de que hayan pasado 8 segundos. Puede realizar esta comprobación cada vez que se agrega la cola.

Esto solo funciona si limita el tamaño de la cola a 5 y descarta cualquier adición mientras la cola está llena.


Use este decorador @RateLimited (ratepersec) antes de que su función se ponga en cola.

Básicamente, esto verifica si 1 / tasa segundos han pasado desde la última vez y si no, espera el resto del tiempo, de lo contrario, no espera. Esto efectivamente te limita a calificar / seg. El decorador se puede aplicar a cualquier función que desee con límite de velocidad.

En su caso, si desea un máximo de 5 mensajes por 8 segundos, use @RateLimited (0.625) antes de su función sendToQueue.

import time def RateLimited(maxPerSecond): minInterval = 1.0 / float(maxPerSecond) def decorate(func): lastTimeCalled = [0.0] def rateLimitedFunction(*args,**kargs): elapsed = time.clock() - lastTimeCalled[0] leftToWait = minInterval - elapsed if leftToWait>0: time.sleep(leftToWait) ret = func(*args,**kargs) lastTimeCalled[0] = time.clock() return ret return rateLimitedFunction return decorate @RateLimited(2) # 2 per second at most def PrintNumber(num): print num if __name__ == "__main__": print "This should print 1,2,3... at about 2 per second." for i in range(1,100): PrintNumber(i)


para bloquear el procesamiento hasta que se pueda enviar el mensaje, y así poner en cola más mensajes, la hermosa solución de antti también se puede modificar así:

rate = 5.0; // unit: messages per = 8.0; // unit: seconds allowance = rate; // unit: messages last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds when (message_received): current = now(); time_passed = current - last_check; last_check = current; allowance += time_passed * (rate / per); if (allowance > rate): allowance = rate; // throttle if (allowance < 1.0): time.sleep( (1-allowance) * (per/rate)) forward_message(); allowance = 0.0; else: forward_message(); allowance -= 1.0;

solo espera hasta que haya suficiente asignación para enviar el mensaje. para no comenzar con dos veces la tasa, la asignación también puede inicializarse con 0.