python - thread - Una recuperación de URL paralela de subprocesamiento múltiple muy simple(sin cola)
threading current_thread() (5)
Pasé todo un día buscando el buscador de URL multiproceso más simple posible en Python, pero la mayoría de los scripts que encontré están usando colas o multiproceso o bibliotecas complejas.
Finalmente, escribí uno, que estoy informando como respuesta. Por favor, siéntase libre de sugerir cualquier mejora.
Supongo que otras personas podrían haber estado buscando algo similar.
Ahora estoy publicando una solución diferente, teniendo los hilos de trabajo no-deamon y uniéndolos al hilo principal (lo que significa bloquear el hilo principal hasta que todos los hilos de trabajo hayan terminado) en lugar de notificar el final de la ejecución de cada hilo de trabajo con un devolución de llamada a una función global (como lo hice en la respuesta anterior), ya que en algunos comentarios se señaló que de esa manera no es seguro para subprocesos.
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
class FetchUrl(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.url = url
def run(self):
urlHandler = urllib2.urlopen(self.url)
html = urlHandler.read()
print "''%s/' fetched in %ss" % (self.url,(time.time() - start))
for url in urls:
FetchUrl(url).start()
#Join all existing threads to main thread.
for thread in threading.enumerate():
if thread is not threading.currentThread():
thread.join()
print "Elapsed Time: %s" % (time.time() - start)
Este script recupera el contenido de un conjunto de URL definidas en una matriz. Genera un hilo para cada URL que se va a recuperar, por lo que está destinado a ser utilizado para un conjunto limitado de URL.
En lugar de utilizar un objeto de cola, cada hilo notifica su final con una devolución de llamada a una función global, que mantiene el recuento de la cantidad de subprocesos en ejecución.
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
left_to_fetch = len(urls)
class FetchUrl(threading.Thread):
def __init__(self, url):
threading.Thread.__init__(self)
self.setDaemon = True
self.url = url
def run(self):
urlHandler = urllib2.urlopen(self.url)
html = urlHandler.read()
finished_fetch_url(self.url)
def finished_fetch_url(url):
"callback function called when a FetchUrl thread ends"
print "/"%s/" fetched in %ss" % (url,(time.time() - start))
global left_to_fetch
left_to_fetch-=1
if left_to_fetch==0:
"all urls have been fetched"
print "Elapsed Time: %ss" % (time.time() - start)
for url in urls:
"spawning a FetchUrl thread for each url to fetch"
FetchUrl(url).start()
El ejemplo principal en concurrent.futures
hace todo lo que quiere, mucho más simple. Además, puede manejar un gran número de URL haciendo solo 5 a la vez, y maneja los errores mucho mejor.
Por supuesto, este módulo solo está integrado con Python 3.2 o posterior ... pero si está utilizando 2.5-3.1, puede instalar el backport, futures
, fuera de PyPI. Todo lo que necesita para cambiar desde el código de ejemplo es buscar y reemplazar concurrent.futures
con futures
y, para 2.x, urllib.request
con urllib2
.
Aquí está la muestra backported a 2.x, modificada para usar su lista de URL y agregar las horas:
import concurrent.futures
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
# Retrieve a single page and report the url and contents
def load_url(url, timeout):
conn = urllib2.urlopen(url, timeout=timeout)
return conn.readall()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print ''%r generated an exception: %s'' % (url, exc)
else:
print ''"%s" fetched in %ss'' % (url,(time.time() - start))
print "Elapsed Time: %ss" % (time.time() - start)
Pero puedes hacer esto aún más simple. Realmente, todo lo que necesitas es:
def load_url(url):
conn = urllib2.urlopen(url, timeout)
data = conn.readall()
print ''"%s" fetched in %ss'' % (url,(time.time() - start))
return data
with futures.ThreadPoolExecutor(max_workers=5) as executor:
pages = executor.map(load_url, urls)
print "Elapsed Time: %ss" % (time.time() - start)
multiprocessing
tiene un grupo de subprocesos que no inicia otros procesos:
#!/usr/bin/env python
from multiprocessing.pool import ThreadPool
from time import time as timer
from urllib2 import urlopen
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
def fetch_url(url):
try:
response = urlopen(url)
return url, response.read(), None
except Exception as e:
return url, None, e
start = timer()
results = ThreadPool(20).imap_unordered(fetch_url, urls)
for url, html, error in results:
if error is None:
print("%r fetched in %ss" % (url, timer() - start))
else:
print("error fetching %r: %s" % (url, error))
print("Elapsed Time: %s" % (timer() - start,))
Las ventajas en comparación con la solución basada en Thread
:
-
ThreadPool
permite limitar la cantidad máxima de conexiones simultáneas (20
en el ejemplo de código) - la salida no está distorsionada porque toda la salida está en el hilo principal
- los errores son registrados
- el código funciona tanto en Python 2 como en 3 sin cambios (asumiendo
from urllib.request import urlopen
en Python 3).
Simplificando su versión original en la medida de lo posible:
import threading
import urllib2
import time
start = time.time()
urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"]
def fetch_url(url):
urlHandler = urllib2.urlopen(url)
html = urlHandler.read()
print "''%s/' fetched in %ss" % (url, (time.time() - start))
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
print "Elapsed Time: %s" % (time.time() - start)
Los únicos trucos nuevos aquí son:
- Mantenga un registro de los hilos que crea.
- No te molestes con un contador de hilos si solo quieres saber cuándo han terminado;
join
ya te dice eso. - Si no necesita ninguna API estatal o externa, no necesita una subclase
Thread
, solo una funcióntarget
.