set_start_method - Python: Algo como `map` que funciona en hilos
python pool map multiple arguments (5)
Estaba seguro de que había algo como esto en la biblioteca estándar, pero parece que estaba equivocado.
Tengo un montón de urls que quiero urlopen
en paralelo. Quiero algo como la función de map
incorporada, excepto que el trabajo se realiza en paralelo por un montón de hilos.
¿Hay un buen módulo que haga esto?
Alguien me recomendó usar el paquete de futures
para esto. Lo probé y parece estar funcionando.
http://pypi.python.org/pypi/futures
Aquí hay un ejemplo:
"Download many URLs in parallel."
import functools
import urllib.request
import futures
URLS = [''http://www.foxnews.com/'',
''http://www.cnn.com/'',
''http://europe.wsj.com/'',
''http://www.bbc.co.uk/'',
''http://some-made-up-domain.com/'']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
with futures.ThreadPoolExecutor(50) as executor:
future_list = executor.run_to_futures(
[functools.partial(load_url, url, 30) for url in URLS])
Aquí está mi implementación del mapa de hilos:
from threading import Thread
from queue import Queue
def thread_map(f, iterable, pool=None):
"""
Just like [f(x) for x in iterable] but each f(x) in a separate thread.
:param f: f
:param iterable: iterable
:param pool: thread pool, infinite by default
:return: list if results
"""
res = {}
if pool is None:
def target(arg, num):
try:
res[num] = f(arg)
except:
res[num] = sys.exc_info()
threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
else:
class WorkerThread(Thread):
def run(self):
while True:
try:
num, arg = queue.get(block=False)
try:
res[num] = f(arg)
except:
res[num] = sys.exc_info()
except Empty:
break
queue = Queue()
for i, arg in enumerate(iterable):
queue.put((i, arg))
threads = [WorkerThread() for _ in range(pool)]
[t.start() for t in threads]
[t.join() for t in threads]
return [res[i] for i in range(len(res))]
El módulo Python Queue
puede ayudar. Use un hilo que use Queue.put()
para insertar todas las URL en la cola y los hilos de trabajo simplemente get()
los URL uno por uno.
Hay un método de map
en multiprocessing.Pool . Eso hace múltiples procesos.
Y si varios procesos no son su plato, puede usar multiprocessing.dummy que usa subprocesos.
import urllib
import multiprocessing.dummy
p = multiprocessing.dummy.Pool(5)
def f(post):
return urllib.urlopen(''http://.com/questions/%u'' % post)
print p.map(f, range(3329361, 3329361 + 5))
Lo envolvería en una función (no probada):
import itertools
import threading
import urllib2
import Queue
def openurl(url, queue):
def starter():
try:
result = urllib2.urlopen(url)
except Ecxeption, exc:
def raiser():
raise exc
queue.put((url, raiser))
else:
queue.put((url, lambda:result))
threadind.Thread(target=starter).start()
myurls = ... # the list of urls
myqueue = Queue.Queue()
map(openurl, myurls, itertools.repeat(myqueue))
for each in myurls:
url, getresult = queue.get()
try:
result = getresult()
except Exception, exc:
print ''exception raised:'' + str(exc)
else:
# do stuff with result