threading thread start library get_ident current_thread python multithreading callback python-multithreading urlfetch

python - thread - Una recuperación de URL paralela de subprocesamiento múltiple muy simple(sin cola)



threading current_thread() (5)

Pasé todo un día buscando el buscador de URL multiproceso más simple posible en Python, pero la mayoría de los scripts que encontré están usando colas o multiproceso o bibliotecas complejas.

Finalmente, escribí uno, que estoy informando como respuesta. Por favor, siéntase libre de sugerir cualquier mejora.

Supongo que otras personas podrían haber estado buscando algo similar.


Ahora estoy publicando una solución diferente, teniendo los hilos de trabajo no-deamon y uniéndolos al hilo principal (lo que significa bloquear el hilo principal hasta que todos los hilos de trabajo hayan terminado) en lugar de notificar el final de la ejecución de cada hilo de trabajo con un devolución de llamada a una función global (como lo hice en la respuesta anterior), ya que en algunos comentarios se señaló que de esa manera no es seguro para subprocesos.

import threading import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] class FetchUrl(threading.Thread): def __init__(self, url): threading.Thread.__init__(self) self.url = url def run(self): urlHandler = urllib2.urlopen(self.url) html = urlHandler.read() print "''%s/' fetched in %ss" % (self.url,(time.time() - start)) for url in urls: FetchUrl(url).start() #Join all existing threads to main thread. for thread in threading.enumerate(): if thread is not threading.currentThread(): thread.join() print "Elapsed Time: %s" % (time.time() - start)


Este script recupera el contenido de un conjunto de URL definidas en una matriz. Genera un hilo para cada URL que se va a recuperar, por lo que está destinado a ser utilizado para un conjunto limitado de URL.

En lugar de utilizar un objeto de cola, cada hilo notifica su final con una devolución de llamada a una función global, que mantiene el recuento de la cantidad de subprocesos en ejecución.

import threading import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] left_to_fetch = len(urls) class FetchUrl(threading.Thread): def __init__(self, url): threading.Thread.__init__(self) self.setDaemon = True self.url = url def run(self): urlHandler = urllib2.urlopen(self.url) html = urlHandler.read() finished_fetch_url(self.url) def finished_fetch_url(url): "callback function called when a FetchUrl thread ends" print "/"%s/" fetched in %ss" % (url,(time.time() - start)) global left_to_fetch left_to_fetch-=1 if left_to_fetch==0: "all urls have been fetched" print "Elapsed Time: %ss" % (time.time() - start) for url in urls: "spawning a FetchUrl thread for each url to fetch" FetchUrl(url).start()


El ejemplo principal en concurrent.futures hace todo lo que quiere, mucho más simple. Además, puede manejar un gran número de URL haciendo solo 5 a la vez, y maneja los errores mucho mejor.

Por supuesto, este módulo solo está integrado con Python 3.2 o posterior ... pero si está utilizando 2.5-3.1, puede instalar el backport, futures , fuera de PyPI. Todo lo que necesita para cambiar desde el código de ejemplo es buscar y reemplazar concurrent.futures con futures y, para 2.x, urllib.request con urllib2 .

Aquí está la muestra backported a 2.x, modificada para usar su lista de URL y agregar las horas:

import concurrent.futures import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] # Retrieve a single page and report the url and contents def load_url(url, timeout): conn = urllib2.urlopen(url, timeout=timeout) return conn.readall() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in urls} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print ''%r generated an exception: %s'' % (url, exc) else: print ''"%s" fetched in %ss'' % (url,(time.time() - start)) print "Elapsed Time: %ss" % (time.time() - start)

Pero puedes hacer esto aún más simple. Realmente, todo lo que necesitas es:

def load_url(url): conn = urllib2.urlopen(url, timeout) data = conn.readall() print ''"%s" fetched in %ss'' % (url,(time.time() - start)) return data with futures.ThreadPoolExecutor(max_workers=5) as executor: pages = executor.map(load_url, urls) print "Elapsed Time: %ss" % (time.time() - start)


multiprocessing tiene un grupo de subprocesos que no inicia otros procesos:

#!/usr/bin/env python from multiprocessing.pool import ThreadPool from time import time as timer from urllib2 import urlopen urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] def fetch_url(url): try: response = urlopen(url) return url, response.read(), None except Exception as e: return url, None, e start = timer() results = ThreadPool(20).imap_unordered(fetch_url, urls) for url, html, error in results: if error is None: print("%r fetched in %ss" % (url, timer() - start)) else: print("error fetching %r: %s" % (url, error)) print("Elapsed Time: %s" % (timer() - start,))

Las ventajas en comparación con la solución basada en Thread :

  • ThreadPool permite limitar la cantidad máxima de conexiones simultáneas ( 20 en el ejemplo de código)
  • la salida no está distorsionada porque toda la salida está en el hilo principal
  • los errores son registrados
  • el código funciona tanto en Python 2 como en 3 sin cambios (asumiendo from urllib.request import urlopen en Python 3).

Simplificando su versión original en la medida de lo posible:

import threading import urllib2 import time start = time.time() urls = ["http://www.google.com", "http://www.apple.com", "http://www.microsoft.com", "http://www.amazon.com", "http://www.facebook.com"] def fetch_url(url): urlHandler = urllib2.urlopen(url) html = urlHandler.read() print "''%s/' fetched in %ss" % (url, (time.time() - start)) threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls] for thread in threads: thread.start() for thread in threads: thread.join() print "Elapsed Time: %s" % (time.time() - start)

Los únicos trucos nuevos aquí son:

  • Mantenga un registro de los hilos que crea.
  • No te molestes con un contador de hilos si solo quieres saber cuándo han terminado; join ya te dice eso.
  • Si no necesita ninguna API estatal o externa, no necesita una subclase Thread , solo una función target .