python http concurrency

python multithreading



¿Cuál es la forma más rápida de enviar 100,000 solicitudes HTTP en Python? (13)

Estoy abriendo un archivo que tiene 100,000 url''s. Necesito enviar una solicitud http a cada url e imprimir el código de estado. Estoy usando Python 2.6, y hasta ahora he visto las muchas formas confusas en que Python implementa la creación de hilos / concurrencia. Incluso he mirado la biblioteca de concurrence Python, pero no puedo entender cómo escribir este programa correctamente. ¿Alguien ha encontrado un problema similar? Supongo que, en general, necesito saber cómo realizar miles de tareas en Python lo más rápido posible, supongo que eso significa "concurrentemente".


Considere usar Windmill , aunque Windmill probablemente no pueda hacer tantos hilos.

Podría hacerlo con un script de Python enrollado a mano en 5 máquinas, cada una de las cuales se conectará de forma externa utilizando los puertos 40000-60000, abriendo 100,000 conexiones de puertos.

Además, podría ser útil realizar una prueba de muestra con una aplicación de control de calidad con un buen subproceso, como OpenSTA , para tener una idea de cuánto puede manejar cada servidor.

Además, intente ver solo usar Perl simple con la clase LWP :: ConnCache. Probablemente obtendrá más rendimiento (más conexiones) de esa manera.


El uso de un grupo de subprocesos es una buena opción y lo hará bastante fácil. Desafortunadamente, python no tiene una biblioteca estándar que hace que los grupos de subprocesos sean extremadamente fáciles. Pero aquí hay una biblioteca decente que debería ayudarle a comenzar: http://www.chrisarndt.de/projects/threadpool/

Código de ejemplo de su sitio:

pool = ThreadPool(poolsize) requests = makeRequests(some_callable, list_of_args, callback) [pool.putRequest(req) for req in requests] pool.wait()

Espero que esto ayude.


En su caso, el subprocesamiento probablemente hará el truco, ya que probablemente pasará la mayor parte del tiempo esperando una respuesta. Hay módulos útiles como Queue en la biblioteca estándar que pueden ayudar.

Hice algo similar con la descarga paralela de archivos antes y fue lo suficientemente bueno para mí, pero no estaba en la escala de la que estás hablando.

Si su tarea estaba más vinculada a la CPU, es posible que desee ver el módulo de multiprocessing , que le permitirá utilizar más CPU / núcleos / subprocesos (más procesos que no se bloquearán entre sí, ya que el bloqueo es por proceso)


Este cliente web asíncrono retorcido va bastante rápido.

#!/usr/bin/python2.7 from twisted.internet import reactor from twisted.internet.defer import Deferred, DeferredList, DeferredLock from twisted.internet.defer import inlineCallbacks from twisted.web.client import Agent, HTTPConnectionPool from twisted.web.http_headers import Headers from pprint import pprint from collections import defaultdict from urlparse import urlparse from random import randrange import fileinput pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 16 agent = Agent(reactor, pool) locks = defaultdict(DeferredLock) codes = {} def getLock(url, simultaneous = 1): return locks[urlparse(url).netloc, randrange(simultaneous)] @inlineCallbacks def getMapping(url): # Limit ourselves to 4 simultaneous connections per host # Tweak this number, but it should be no larger than pool.maxPersistentPerHost lock = getLock(url,4) yield lock.acquire() try: resp = yield agent.request(''HEAD'', url) codes[url] = resp.code except Exception as e: codes[url] = str(e) finally: lock.release() dl = DeferredList(getMapping(url.strip()) for url in fileinput.input()) dl.addCallback(lambda _: reactor.stop()) reactor.run() pprint(codes)


La forma más sencilla sería utilizar la biblioteca de subprocesos incorporada de Python. No son "reales" / hilos del kernel. Tienen problemas (como la serialización), pero son lo suficientemente buenos. Usted querría una cola y grupo de subprocesos. Una opción está here , pero es trivial escribir el tuyo. No puedes poner en paralelo todas las 100,000 llamadas, pero puedes disparar 100 (o así) de ellas al mismo tiempo.


Las cosas han cambiado bastante desde 2010, cuando se publicó, y no he probado todas las otras respuestas, pero he probado algunas, y encontré que esto funciona mejor para mí usando python3.6.

Pude obtener unos 150 dominios únicos por segundo que se ejecutan en AWS.

import pandas as pd import concurrent.futures import requests import time out = [] CONNECTIONS = 100 TIMEOUT = 5 tlds = open(''../data/sample_1k.txt'').read().splitlines() urls = [''http://{}''.format(x) for x in tlds[1:]] def load_url(url, timeout): ans = requests.head(url, timeout=timeout) return ans.status_code with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor: future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls) time1 = time.time() for future in concurrent.futures.as_completed(future_to_url): try: data = future.result() except Exception as exc: data = str(type(exc)) finally: out.append(data) print(str(len(out)),end="/r") time2 = time.time() print(f''Took {time2-time1:.2f} s'') print(pd.Series(out).value_counts())


Los hilos no son absolutamente la respuesta aquí. Proporcionarán cuellos de botella en el proceso y en el kernel, así como límites de rendimiento que no son aceptables si el objetivo general es "la forma más rápida".

Un poco de twisted y su cliente HTTP asíncrono le daría resultados mucho mejores.


Si desea obtener el mejor rendimiento posible, es posible que desee considerar el uso de E / S asíncronas en lugar de subprocesos. La sobrecarga asociada con miles de subprocesos del sistema operativo no es trivial y el cambio de contexto dentro del intérprete de Python agrega aún más. El subprocesamiento ciertamente hará el trabajo, pero sospecho que una ruta asíncrona proporcionará un mejor rendimiento general.

Específicamente, sugeriría el cliente web asíncrono en la biblioteca Twisted ( http://www.twistedmatrix.com ). Tiene una curva de aprendizaje ciertamente empinada, pero es bastante fácil de usar una vez que tenga un buen manejo del estilo de programación asíncrona de Twisted.

Un HowTo on API de cliente web asíncrono de Twisted está disponible en:

http://twistedmatrix.com/documents/current/web/howto/client.html


Solución sin torceduras:

from urlparse import urlparse from threading import Thread import httplib, sys from Queue import Queue concurrent = 200 def doWork(): while True: url = q.get() status, url = getStatus(url) doSomethingWithResult(status, url) q.task_done() def getStatus(ourl): try: url = urlparse(ourl) conn = httplib.HTTPConnection(url.netloc) conn.request("HEAD", url.path) res = conn.getresponse() return res.status, ourl except: return "error", ourl def doSomethingWithResult(status, url): print status, url q = Queue(concurrent * 2) for i in range(concurrent): t = Thread(target=doWork) t.daemon = True t.start() try: for url in open(''urllist.txt''): q.put(url.strip()) q.join() except KeyboardInterrupt: sys.exit(1)

Este es un poco más rápido que la solución torcida y utiliza menos CPU.


Un buen enfoque para resolver este problema es escribir primero el código requerido para obtener un resultado, luego incorporar código de subprocesamiento para paralelizar la aplicación.

En un mundo perfecto, esto simplemente significaría comenzar simultáneamente 100,000 hilos que envían sus resultados a un diccionario o lista para su procesamiento posterior, pero en la práctica usted está limitado en la cantidad de solicitudes HTTP paralelas que puede emitir de esta manera. Localmente, tiene límites en la cantidad de sockets que puede abrir al mismo tiempo, cuántos subprocesos de ejecución permitirá su intérprete de Python. De forma remota, puede estar limitado en el número de conexiones simultáneas si todas las solicitudes son contra un servidor, o muchas. Es probable que estas limitaciones requieran que escriba el script de tal manera que solo sondee una pequeña fracción de las URL en un momento dado (100, como se mencionó en otro póster, es probablemente un tamaño de grupo de subprocesos decente, aunque puede encontrar que puede implementar con éxito muchos más).

Puedes seguir este patrón de diseño para resolver el problema anterior:

  1. Inicie un subproceso que inicie nuevos subprocesos de solicitud hasta que el número de subprocesos que se estén ejecutando actualmente (puede rastrearlos mediante threading.active_count () o al insertar los objetos de subproceso en una estructura de datos) sea> = su número máximo de solicitudes simultáneas (por ejemplo, 100) , luego duerme por un corto tiempo de espera. Este hilo debe terminar cuando no haya más URL para procesar. Por lo tanto, el hilo seguirá despertándose, lanzando nuevos hilos y durmiendo hasta que termines.
  2. Haga que los subprocesos de solicitud almacenen sus resultados en alguna estructura de datos para su posterior recuperación y salida. Si la estructura en la que está almacenando los resultados es una list o dict en CPython, puede agregar o insertar de forma segura elementos únicos de sus subprocesos sin bloqueos , pero si escribe en un archivo o requiere una interacción de datos entre subprocesos más compleja , debería Utilice un bloqueo de exclusión mutua para proteger este estado de la corrupción .

Le sugiero que utilice el módulo de threading . Puedes usarlo para iniciar y rastrear hilos en ejecución. El soporte de subprocesos de Python es simple, pero la descripción de su problema sugiere que es completamente suficiente para sus necesidades.

Finalmente, si desea ver una aplicación bastante sencilla de una aplicación de red paralela escrita en Python, consulte ssh.py Es una pequeña biblioteca que utiliza subprocesos de Python para paralelizar muchas conexiones SSH. El diseño está lo suficientemente cerca de sus requisitos, por lo que puede ser un buen recurso.


Una solución utilizando tornado de la biblioteca de redes asíncronas.

from tornado import ioloop, httpclient i = 0 def handle_request(response): print(response.code) global i i -= 1 if i == 0: ioloop.IOLoop.instance().stop() http_client = httpclient.AsyncHTTPClient() for url in open(''urls.txt''): i += 1 http_client.fetch(url.strip(), handle_request, method=''HEAD'') ioloop.IOLoop.instance().start()


Una solución:

from twisted.internet import reactor, threads from urlparse import urlparse import httplib import itertools concurrent = 200 finished=itertools.count(1) reactor.suggestThreadPoolSize(concurrent) def getStatus(ourl): url = urlparse(ourl) conn = httplib.HTTPConnection(url.netloc) conn.request("HEAD", url.path) res = conn.getresponse() return res.status def processResponse(response,url): print response, url processedOne() def processError(error,url): print "error", url#, error processedOne() def processedOne(): if finished.next()==added: reactor.stop() def addTask(url): req = threads.deferToThread(getStatus, url) req.addCallback(processResponse, url) req.addErrback(processError, url) added=0 for url in open(''urllist.txt''): added+=1 addTask(url.strip()) try: reactor.run() except KeyboardInterrupt: reactor.stop()

Tiempo de prueba:

[kalmi@ubi1:~] wc -l urllist.txt 10000 urllist.txt [kalmi@ubi1:~] time python f.py > /dev/null real 1m10.682s user 0m16.020s sys 0m10.330s [kalmi@ubi1:~] head -n 6 urllist.txt http://www.google.com http://www.bix.hu http://www.godaddy.com http://www.google.com http://www.bix.hu http://www.godaddy.com [kalmi@ubi1:~] python f.py | head -n 6 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu 200 http://www.bix.hu

Pingtime:

bix.hu is ~10 ms away from me godaddy.com: ~170 ms google.com: ~30 ms


Usa grequests , es una combinación de peticiones + módulo Gevent.

GRequests le permite utilizar las solicitudes con Gevent para realizar solicitudes HTTP asíncronas fácilmente.

El uso es simple:

import grequests urls = [ ''http://www.heroku.com'', ''http://tablib.org'', ''http://httpbin.org'', ''http://python-requests.org'', ''http://kennethreitz.com'' ]

Crear un conjunto de solicitudes no enviadas:

>>> rs = (grequests.get(u) for u in urls)

Envíalos todos al mismo tiempo:

>>> grequests.map(rs) [<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]