requests parallel async python http python-requests throttling rate-limiting

async - parallel request python



Limitar/limitar la tasa de solicitudes HTTP en las peticiones (4)

Estoy escribiendo un pequeño script en Python 2.7.3 con GRequests y lxml que me permitirá recopilar algunos precios de tarjetas coleccionables de varios sitios web y compararlos. El problema es que uno de los sitios web limita el número de solicitudes y devuelve el error HTTP 429 si lo excedo.

¿Hay alguna manera de agregar una mayor cantidad de solicitudes en GRequestes para que no exceda la cantidad de solicitudes por segundo que especifico? Además, ¿cómo puedo hacer que GRequestes vuelva a intentar después de un tiempo si se produce HTTP 429?

En una nota al margen, su límite es ridículamente bajo. Algo así como 8 peticiones por 15 segundos. Lo rompí con mi navegador en múltiples ocasiones simplemente actualizando la página a la espera de cambios de precios.


Eche un vistazo a esto para el límite automático de solicitudes: https://pypi.python.org/pypi/RequestsThrottler/0.2.2

Puede establecer una cantidad fija de retraso entre cada solicitud o establecer una cantidad de solicitudes para enviar en una cantidad fija de segundos (que es básicamente lo mismo):

import requests from requests_throttler import BaseThrottler request = requests.Request(method=''GET'', url=''http://www.google.com'') reqs = [request for i in range(0, 5)] # An example list of requests with BaseThrottler(name=''base-throttler'', delay=1.5) as bt: throttled_requests = bt.multi_submit(reqs)

donde la función multi_submit devuelve una lista de ThrottledRequest (ver doc: enlace al final).

A continuación, puede acceder a las respuestas:

for tr in throttled_requests: print tr.response

Alternativamente, puede lograr lo mismo especificando el número o las solicitudes para enviar en una cantidad de tiempo fija (por ejemplo, 15 solicitudes cada 60 segundos):

import requests from requests_throttler import BaseThrottler request = requests.Request(method=''GET'', url=''http://www.google.com'') reqs = [request for i in range(0, 5)] # An example list of requests with BaseThrottler(name=''base-throttler'', reqs_over_time=(15, 60)) as bt: throttled_requests = bt.multi_submit(reqs)

Ambas soluciones pueden implementarse sin el uso de la declaración with :

import requests from requests_throttler import BaseThrottler request = requests.Request(method=''GET'', url=''http://www.google.com'') reqs = [request for i in range(0, 5)] # An example list of requests bt = BaseThrottler(name=''base-throttler'', delay=1.5) bt.start() throttled_requests = bt.multi_submit(reqs) bt.shutdown()

Para más detalles: http://pythonhosted.org/RequestsThrottler/index.html


No parece que haya un mecanismo simple para manejar esta compilación en el código de solicitudes o codificaciones. El único gancho que parece estar alrededor es para las respuestas.

Aquí hay una solución super hacky para al menos demostrar que es posible: modifiqué grequests para mantener una lista del tiempo en que se emitió una solicitud y suspendí la creación de AsyncRequest hasta que las solicitudes por segundo estuvieran por debajo del máximo.

class AsyncRequest(object): def __init__(self, method, url, **kwargs): print self,''init'' waiting=True while waiting: if len([x for x in q if x > time.time()-15]) < 8: q.append(time.time()) waiting=False else: print self,''snoozing'' gevent.sleep(1)

Puedes usar grequests.imap () para ver esto interactivamente

import time import rg urls = [ ''http://www.heroku.com'', ''http://python-tablib.org'', ''http://httpbin.org'', ''http://python-requests.org'', ''http://kennethreitz.com'', ''http://www.cnn.com'', ] def print_url(r, *args, **kwargs): print(r.url),time.time() hook_dict=dict(response=print_url) rs = (rg.get(u, hooks=hook_dict) for u in urls) for r in rg.imap(rs): print r

Desearía que hubiera una solución más elegante, pero hasta ahora no puedo encontrar una. Miró a su alrededor en sesiones y adaptadores. Tal vez el poolmanager podría ser aumentado en su lugar?

Además, no pondría este código en producción; la lista ''q'' nunca se recorta y, finalmente, se volvería bastante grande. Además, no sé si realmente funciona como se anuncia. Simplemente parece que es cuando miro la salida de la consola.

Ugh Solo mirando este código puedo decir que son las 3am. Hora de ir a la cama.


Tuve un problema similar. Aquí está mi solución. En tu caso, yo haría:

def worker(): with rate_limit(''slow.domain.com'', 2): response = requests.get(''https://slow.domain.com/path'') text = response.text # Use `text`

Suponiendo que tiene varios dominios de los que está realizando el sacrificio, configuraría un mapeo de diccionario (domain, delay) para que no alcance sus límites de tasa.

Este código asume que vas a usar gevent y el parche de mono.

from contextlib import contextmanager from gevent.event import Event from gevent.queue import Queue from time import time def rate_limit(resource, delay, _queues={}): """Delay use of `resource` until after `delay` seconds have passed. Example usage: def worker(): with rate_limit(''foo.bar.com'', 1): response = requests.get(''https://foo.bar.com/path'') text = response.text # use `text` This will serialize and delay requests from multiple workers for resource ''foo.bar.com'' by 1 second. """ if resource not in _queues: queue = Queue() gevent.spawn(_watch, queue) _queues[resource] = queue return _resource_manager(_queues[resource], delay) def _watch(queue): "Watch `queue` and wake event listeners after delay." last = 0 while True: event, delay = queue.get() now = time() if (now - last) < delay: gevent.sleep(delay - (now - last)) event.set() # Wake worker but keep control. event.clear() event.wait() # Yield control until woken. last = time() @contextmanager def _resource_manager(queue, delay): "`with` statement support for `rate_limit`." event = Event() queue.put((event, delay)) event.wait() # Wait for queue watcher to wake us. yield event.set() # Wake queue watcher.


Voy a responder mi propia pregunta, ya que tuve que resolver esto por mi cuenta y parece que hay muy poca información sobre esto.

La idea es la siguiente. Cada objeto de solicitud utilizado con GRequests puede tomar un objeto de sesión como un parámetro cuando se crea. Por otra parte, los objetos de sesión pueden tener adaptadores HTTP montados que se utilizan al realizar solicitudes. Al crear nuestro propio adaptador, podemos interceptar las solicitudes y limitarlas de manera que encontremos las mejores para nuestra aplicación. En mi caso terminé con el siguiente código.

Objeto utilizado para la regulación:

DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5) DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15) class BurstThrottle(object): max_hits = None hits = None burst_window = None total_window = None timestamp = None def __init__(self, max_hits, burst_window, wait_window): self.max_hits = max_hits self.hits = 0 self.burst_window = burst_window self.total_window = burst_window + wait_window self.timestamp = datetime.datetime.min def throttle(self): now = datetime.datetime.utcnow() if now < self.timestamp + self.total_window: if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits): self.hits += 1 return datetime.timedelta(0) else: return self.timestamp + self.total_window - now else: self.timestamp = now self.hits = 1 return datetime.timedelta(0)

Adaptador HTTP:

class MyHttpAdapter(requests.adapters.HTTPAdapter): throttle = None def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE, pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES, pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW, wait_window=DEFAULT_WAIT_WINDOW): self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window) super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize, max_retries=max_retries, pool_block=pool_block) def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None): request_successful = False response = None while not request_successful: wait_time = self.throttle.throttle() while wait_time > datetime.timedelta(0): gevent.sleep(wait_time.total_seconds(), ref=True) wait_time = self.throttle.throttle() response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout, verify=verify, cert=cert, proxies=proxies) if response.status_code != 429: request_successful = True return response

Preparar:

requests_adapter = adapter.MyHttpAdapter( pool_connections=__CONCURRENT_LIMIT__, pool_maxsize=__CONCURRENT_LIMIT__, max_retries=0, pool_block=False, burst_window=datetime.timedelta(seconds=5), wait_window=datetime.timedelta(seconds=20)) requests_session = requests.session() requests_session.mount(''http://'', requests_adapter) requests_session.mount(''https://'', requests_adapter) unsent_requests = (grequests.get(url, hooks={''response'': handle_response}, session=requests_session) for url in urls) grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)