async - parallel request python
Limitar/limitar la tasa de solicitudes HTTP en las peticiones (4)
Estoy escribiendo un pequeño script en Python 2.7.3 con GRequests y lxml que me permitirá recopilar algunos precios de tarjetas coleccionables de varios sitios web y compararlos. El problema es que uno de los sitios web limita el número de solicitudes y devuelve el error HTTP 429 si lo excedo.
¿Hay alguna manera de agregar una mayor cantidad de solicitudes en GRequestes para que no exceda la cantidad de solicitudes por segundo que especifico? Además, ¿cómo puedo hacer que GRequestes vuelva a intentar después de un tiempo si se produce HTTP 429?
En una nota al margen, su límite es ridículamente bajo. Algo así como 8 peticiones por 15 segundos. Lo rompí con mi navegador en múltiples ocasiones simplemente actualizando la página a la espera de cambios de precios.
Eche un vistazo a esto para el límite automático de solicitudes: https://pypi.python.org/pypi/RequestsThrottler/0.2.2
Puede establecer una cantidad fija de retraso entre cada solicitud o establecer una cantidad de solicitudes para enviar en una cantidad fija de segundos (que es básicamente lo mismo):
import requests
from requests_throttler import BaseThrottler
request = requests.Request(method=''GET'', url=''http://www.google.com'')
reqs = [request for i in range(0, 5)] # An example list of requests
with BaseThrottler(name=''base-throttler'', delay=1.5) as bt:
throttled_requests = bt.multi_submit(reqs)
donde la función multi_submit
devuelve una lista de ThrottledRequest
(ver doc: enlace al final).
A continuación, puede acceder a las respuestas:
for tr in throttled_requests:
print tr.response
Alternativamente, puede lograr lo mismo especificando el número o las solicitudes para enviar en una cantidad de tiempo fija (por ejemplo, 15 solicitudes cada 60 segundos):
import requests
from requests_throttler import BaseThrottler
request = requests.Request(method=''GET'', url=''http://www.google.com'')
reqs = [request for i in range(0, 5)] # An example list of requests
with BaseThrottler(name=''base-throttler'', reqs_over_time=(15, 60)) as bt:
throttled_requests = bt.multi_submit(reqs)
Ambas soluciones pueden implementarse sin el uso de la declaración with
:
import requests
from requests_throttler import BaseThrottler
request = requests.Request(method=''GET'', url=''http://www.google.com'')
reqs = [request for i in range(0, 5)] # An example list of requests
bt = BaseThrottler(name=''base-throttler'', delay=1.5)
bt.start()
throttled_requests = bt.multi_submit(reqs)
bt.shutdown()
Para más detalles: http://pythonhosted.org/RequestsThrottler/index.html
No parece que haya un mecanismo simple para manejar esta compilación en el código de solicitudes o codificaciones. El único gancho que parece estar alrededor es para las respuestas.
Aquí hay una solución super hacky para al menos demostrar que es posible: modifiqué grequests para mantener una lista del tiempo en que se emitió una solicitud y suspendí la creación de AsyncRequest hasta que las solicitudes por segundo estuvieran por debajo del máximo.
class AsyncRequest(object):
def __init__(self, method, url, **kwargs):
print self,''init''
waiting=True
while waiting:
if len([x for x in q if x > time.time()-15]) < 8:
q.append(time.time())
waiting=False
else:
print self,''snoozing''
gevent.sleep(1)
Puedes usar grequests.imap () para ver esto interactivamente
import time
import rg
urls = [
''http://www.heroku.com'',
''http://python-tablib.org'',
''http://httpbin.org'',
''http://python-requests.org'',
''http://kennethreitz.com'',
''http://www.cnn.com'',
]
def print_url(r, *args, **kwargs):
print(r.url),time.time()
hook_dict=dict(response=print_url)
rs = (rg.get(u, hooks=hook_dict) for u in urls)
for r in rg.imap(rs):
print r
Desearía que hubiera una solución más elegante, pero hasta ahora no puedo encontrar una. Miró a su alrededor en sesiones y adaptadores. Tal vez el poolmanager podría ser aumentado en su lugar?
Además, no pondría este código en producción; la lista ''q'' nunca se recorta y, finalmente, se volvería bastante grande. Además, no sé si realmente funciona como se anuncia. Simplemente parece que es cuando miro la salida de la consola.
Ugh Solo mirando este código puedo decir que son las 3am. Hora de ir a la cama.
Tuve un problema similar. Aquí está mi solución. En tu caso, yo haría:
def worker():
with rate_limit(''slow.domain.com'', 2):
response = requests.get(''https://slow.domain.com/path'')
text = response.text
# Use `text`
Suponiendo que tiene varios dominios de los que está realizando el sacrificio, configuraría un mapeo de diccionario (domain, delay)
para que no alcance sus límites de tasa.
Este código asume que vas a usar gevent y el parche de mono.
from contextlib import contextmanager
from gevent.event import Event
from gevent.queue import Queue
from time import time
def rate_limit(resource, delay, _queues={}):
"""Delay use of `resource` until after `delay` seconds have passed.
Example usage:
def worker():
with rate_limit(''foo.bar.com'', 1):
response = requests.get(''https://foo.bar.com/path'')
text = response.text
# use `text`
This will serialize and delay requests from multiple workers for resource
''foo.bar.com'' by 1 second.
"""
if resource not in _queues:
queue = Queue()
gevent.spawn(_watch, queue)
_queues[resource] = queue
return _resource_manager(_queues[resource], delay)
def _watch(queue):
"Watch `queue` and wake event listeners after delay."
last = 0
while True:
event, delay = queue.get()
now = time()
if (now - last) < delay:
gevent.sleep(delay - (now - last))
event.set() # Wake worker but keep control.
event.clear()
event.wait() # Yield control until woken.
last = time()
@contextmanager
def _resource_manager(queue, delay):
"`with` statement support for `rate_limit`."
event = Event()
queue.put((event, delay))
event.wait() # Wait for queue watcher to wake us.
yield
event.set() # Wake queue watcher.
Voy a responder mi propia pregunta, ya que tuve que resolver esto por mi cuenta y parece que hay muy poca información sobre esto.
La idea es la siguiente. Cada objeto de solicitud utilizado con GRequests puede tomar un objeto de sesión como un parámetro cuando se crea. Por otra parte, los objetos de sesión pueden tener adaptadores HTTP montados que se utilizan al realizar solicitudes. Al crear nuestro propio adaptador, podemos interceptar las solicitudes y limitarlas de manera que encontremos las mejores para nuestra aplicación. En mi caso terminé con el siguiente código.
Objeto utilizado para la regulación:
DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5)
DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15)
class BurstThrottle(object):
max_hits = None
hits = None
burst_window = None
total_window = None
timestamp = None
def __init__(self, max_hits, burst_window, wait_window):
self.max_hits = max_hits
self.hits = 0
self.burst_window = burst_window
self.total_window = burst_window + wait_window
self.timestamp = datetime.datetime.min
def throttle(self):
now = datetime.datetime.utcnow()
if now < self.timestamp + self.total_window:
if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits):
self.hits += 1
return datetime.timedelta(0)
else:
return self.timestamp + self.total_window - now
else:
self.timestamp = now
self.hits = 1
return datetime.timedelta(0)
Adaptador HTTP:
class MyHttpAdapter(requests.adapters.HTTPAdapter):
throttle = None
def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES,
pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW,
wait_window=DEFAULT_WAIT_WINDOW):
self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window)
super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize,
max_retries=max_retries, pool_block=pool_block)
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
request_successful = False
response = None
while not request_successful:
wait_time = self.throttle.throttle()
while wait_time > datetime.timedelta(0):
gevent.sleep(wait_time.total_seconds(), ref=True)
wait_time = self.throttle.throttle()
response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout,
verify=verify, cert=cert, proxies=proxies)
if response.status_code != 429:
request_successful = True
return response
Preparar:
requests_adapter = adapter.MyHttpAdapter(
pool_connections=__CONCURRENT_LIMIT__,
pool_maxsize=__CONCURRENT_LIMIT__,
max_retries=0,
pool_block=False,
burst_window=datetime.timedelta(seconds=5),
wait_window=datetime.timedelta(seconds=20))
requests_session = requests.session()
requests_session.mount(''http://'', requests_adapter)
requests_session.mount(''https://'', requests_adapter)
unsent_requests = (grequests.get(url,
hooks={''response'': handle_response},
session=requests_session) for url in urls)
grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)