python multithreading
¿Cuál es la forma más rápida de enviar 100,000 solicitudes HTTP en Python? (13)
Estoy abriendo un archivo que tiene 100,000 url''s. Necesito enviar una solicitud http a cada url e imprimir el código de estado. Estoy usando Python 2.6, y hasta ahora he visto las muchas formas confusas en que Python implementa la creación de hilos / concurrencia. Incluso he mirado la biblioteca de concurrence Python, pero no puedo entender cómo escribir este programa correctamente. ¿Alguien ha encontrado un problema similar? Supongo que, en general, necesito saber cómo realizar miles de tareas en Python lo más rápido posible, supongo que eso significa "concurrentemente".
Considere usar Windmill , aunque Windmill probablemente no pueda hacer tantos hilos.
Podría hacerlo con un script de Python enrollado a mano en 5 máquinas, cada una de las cuales se conectará de forma externa utilizando los puertos 40000-60000, abriendo 100,000 conexiones de puertos.
Además, podría ser útil realizar una prueba de muestra con una aplicación de control de calidad con un buen subproceso, como OpenSTA , para tener una idea de cuánto puede manejar cada servidor.
Además, intente ver solo usar Perl simple con la clase LWP :: ConnCache. Probablemente obtendrá más rendimiento (más conexiones) de esa manera.
El uso de un grupo de subprocesos es una buena opción y lo hará bastante fácil. Desafortunadamente, python no tiene una biblioteca estándar que hace que los grupos de subprocesos sean extremadamente fáciles. Pero aquí hay una biblioteca decente que debería ayudarle a comenzar: http://www.chrisarndt.de/projects/threadpool/
Código de ejemplo de su sitio:
pool = ThreadPool(poolsize)
requests = makeRequests(some_callable, list_of_args, callback)
[pool.putRequest(req) for req in requests]
pool.wait()
Espero que esto ayude.
En su caso, el subprocesamiento probablemente hará el truco, ya que probablemente pasará la mayor parte del tiempo esperando una respuesta. Hay módulos útiles como Queue en la biblioteca estándar que pueden ayudar.
Hice algo similar con la descarga paralela de archivos antes y fue lo suficientemente bueno para mí, pero no estaba en la escala de la que estás hablando.
Si su tarea estaba más vinculada a la CPU, es posible que desee ver el módulo de multiprocessing , que le permitirá utilizar más CPU / núcleos / subprocesos (más procesos que no se bloquearán entre sí, ya que el bloqueo es por proceso)
Este cliente web asíncrono retorcido va bastante rápido.
#!/usr/bin/python2.7
from twisted.internet import reactor
from twisted.internet.defer import Deferred, DeferredList, DeferredLock
from twisted.internet.defer import inlineCallbacks
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from pprint import pprint
from collections import defaultdict
from urlparse import urlparse
from random import randrange
import fileinput
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 16
agent = Agent(reactor, pool)
locks = defaultdict(DeferredLock)
codes = {}
def getLock(url, simultaneous = 1):
return locks[urlparse(url).netloc, randrange(simultaneous)]
@inlineCallbacks
def getMapping(url):
# Limit ourselves to 4 simultaneous connections per host
# Tweak this number, but it should be no larger than pool.maxPersistentPerHost
lock = getLock(url,4)
yield lock.acquire()
try:
resp = yield agent.request(''HEAD'', url)
codes[url] = resp.code
except Exception as e:
codes[url] = str(e)
finally:
lock.release()
dl = DeferredList(getMapping(url.strip()) for url in fileinput.input())
dl.addCallback(lambda _: reactor.stop())
reactor.run()
pprint(codes)
La forma más sencilla sería utilizar la biblioteca de subprocesos incorporada de Python. No son "reales" / hilos del kernel. Tienen problemas (como la serialización), pero son lo suficientemente buenos. Usted querría una cola y grupo de subprocesos. Una opción está here , pero es trivial escribir el tuyo. No puedes poner en paralelo todas las 100,000 llamadas, pero puedes disparar 100 (o así) de ellas al mismo tiempo.
Las cosas han cambiado bastante desde 2010, cuando se publicó, y no he probado todas las otras respuestas, pero he probado algunas, y encontré que esto funciona mejor para mí usando python3.6.
Pude obtener unos 150 dominios únicos por segundo que se ejecutan en AWS.
import pandas as pd
import concurrent.futures
import requests
import time
out = []
CONNECTIONS = 100
TIMEOUT = 5
tlds = open(''../data/sample_1k.txt'').read().splitlines()
urls = [''http://{}''.format(x) for x in tlds[1:]]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
time1 = time.time()
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)),end="/r")
time2 = time.time()
print(f''Took {time2-time1:.2f} s'')
print(pd.Series(out).value_counts())
Los hilos no son absolutamente la respuesta aquí. Proporcionarán cuellos de botella en el proceso y en el kernel, así como límites de rendimiento que no son aceptables si el objetivo general es "la forma más rápida".
Un poco de twisted
y su cliente HTTP
asíncrono le daría resultados mucho mejores.
Si desea obtener el mejor rendimiento posible, es posible que desee considerar el uso de E / S asíncronas en lugar de subprocesos. La sobrecarga asociada con miles de subprocesos del sistema operativo no es trivial y el cambio de contexto dentro del intérprete de Python agrega aún más. El subprocesamiento ciertamente hará el trabajo, pero sospecho que una ruta asíncrona proporcionará un mejor rendimiento general.
Específicamente, sugeriría el cliente web asíncrono en la biblioteca Twisted ( http://www.twistedmatrix.com ). Tiene una curva de aprendizaje ciertamente empinada, pero es bastante fácil de usar una vez que tenga un buen manejo del estilo de programación asíncrona de Twisted.
Un HowTo on API de cliente web asíncrono de Twisted está disponible en:
http://twistedmatrix.com/documents/current/web/howto/client.html
Solución sin torceduras:
from urlparse import urlparse
from threading import Thread
import httplib, sys
from Queue import Queue
concurrent = 200
def doWork():
while True:
url = q.get()
status, url = getStatus(url)
doSomethingWithResult(status, url)
q.task_done()
def getStatus(ourl):
try:
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status, ourl
except:
return "error", ourl
def doSomethingWithResult(status, url):
print status, url
q = Queue(concurrent * 2)
for i in range(concurrent):
t = Thread(target=doWork)
t.daemon = True
t.start()
try:
for url in open(''urllist.txt''):
q.put(url.strip())
q.join()
except KeyboardInterrupt:
sys.exit(1)
Este es un poco más rápido que la solución torcida y utiliza menos CPU.
Un buen enfoque para resolver este problema es escribir primero el código requerido para obtener un resultado, luego incorporar código de subprocesamiento para paralelizar la aplicación.
En un mundo perfecto, esto simplemente significaría comenzar simultáneamente 100,000 hilos que envían sus resultados a un diccionario o lista para su procesamiento posterior, pero en la práctica usted está limitado en la cantidad de solicitudes HTTP paralelas que puede emitir de esta manera. Localmente, tiene límites en la cantidad de sockets que puede abrir al mismo tiempo, cuántos subprocesos de ejecución permitirá su intérprete de Python. De forma remota, puede estar limitado en el número de conexiones simultáneas si todas las solicitudes son contra un servidor, o muchas. Es probable que estas limitaciones requieran que escriba el script de tal manera que solo sondee una pequeña fracción de las URL en un momento dado (100, como se mencionó en otro póster, es probablemente un tamaño de grupo de subprocesos decente, aunque puede encontrar que puede implementar con éxito muchos más).
Puedes seguir este patrón de diseño para resolver el problema anterior:
- Inicie un subproceso que inicie nuevos subprocesos de solicitud hasta que el número de subprocesos que se estén ejecutando actualmente (puede rastrearlos mediante threading.active_count () o al insertar los objetos de subproceso en una estructura de datos) sea> = su número máximo de solicitudes simultáneas (por ejemplo, 100) , luego duerme por un corto tiempo de espera. Este hilo debe terminar cuando no haya más URL para procesar. Por lo tanto, el hilo seguirá despertándose, lanzando nuevos hilos y durmiendo hasta que termines.
- Haga que los subprocesos de solicitud almacenen sus resultados en alguna estructura de datos para su posterior recuperación y salida. Si la estructura en la que está almacenando los resultados es una
list
odict
en CPython, puede agregar o insertar de forma segura elementos únicos de sus subprocesos sin bloqueos , pero si escribe en un archivo o requiere una interacción de datos entre subprocesos más compleja , debería Utilice un bloqueo de exclusión mutua para proteger este estado de la corrupción .
Le sugiero que utilice el módulo de threading . Puedes usarlo para iniciar y rastrear hilos en ejecución. El soporte de subprocesos de Python es simple, pero la descripción de su problema sugiere que es completamente suficiente para sus necesidades.
Finalmente, si desea ver una aplicación bastante sencilla de una aplicación de red paralela escrita en Python, consulte ssh.py Es una pequeña biblioteca que utiliza subprocesos de Python para paralelizar muchas conexiones SSH. El diseño está lo suficientemente cerca de sus requisitos, por lo que puede ser un buen recurso.
Una solución utilizando tornado de la biblioteca de redes asíncronas.
from tornado import ioloop, httpclient
i = 0
def handle_request(response):
print(response.code)
global i
i -= 1
if i == 0:
ioloop.IOLoop.instance().stop()
http_client = httpclient.AsyncHTTPClient()
for url in open(''urls.txt''):
i += 1
http_client.fetch(url.strip(), handle_request, method=''HEAD'')
ioloop.IOLoop.instance().start()
Una solución:
from twisted.internet import reactor, threads
from urlparse import urlparse
import httplib
import itertools
concurrent = 200
finished=itertools.count(1)
reactor.suggestThreadPoolSize(concurrent)
def getStatus(ourl):
url = urlparse(ourl)
conn = httplib.HTTPConnection(url.netloc)
conn.request("HEAD", url.path)
res = conn.getresponse()
return res.status
def processResponse(response,url):
print response, url
processedOne()
def processError(error,url):
print "error", url#, error
processedOne()
def processedOne():
if finished.next()==added:
reactor.stop()
def addTask(url):
req = threads.deferToThread(getStatus, url)
req.addCallback(processResponse, url)
req.addErrback(processError, url)
added=0
for url in open(''urllist.txt''):
added+=1
addTask(url.strip())
try:
reactor.run()
except KeyboardInterrupt:
reactor.stop()
Tiempo de prueba:
[kalmi@ubi1:~] wc -l urllist.txt
10000 urllist.txt
[kalmi@ubi1:~] time python f.py > /dev/null
real 1m10.682s
user 0m16.020s
sys 0m10.330s
[kalmi@ubi1:~] head -n 6 urllist.txt
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
http://www.google.com
http://www.bix.hu
http://www.godaddy.com
[kalmi@ubi1:~] python f.py | head -n 6
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
200 http://www.bix.hu
Pingtime:
bix.hu is ~10 ms away from me
godaddy.com: ~170 ms
google.com: ~30 ms
Usa grequests , es una combinación de peticiones + módulo Gevent.
GRequests le permite utilizar las solicitudes con Gevent para realizar solicitudes HTTP asíncronas fácilmente.
El uso es simple:
import grequests
urls = [
''http://www.heroku.com'',
''http://tablib.org'',
''http://httpbin.org'',
''http://python-requests.org'',
''http://kennethreitz.com''
]
Crear un conjunto de solicitudes no enviadas:
>>> rs = (grequests.get(u) for u in urls)
Envíalos todos al mismo tiempo:
>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>]