ejemplos - multitarea en python
¿Muestra el progreso de una llamada de mapa de grupo de multiprocesamiento de Python? (6)
Tengo un script que realiza con éxito un grupo de tareas multiprocesamiento con una imap_unordered()
a imap_unordered()
:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
Sin embargo, mis num_tasks
son alrededor de 250,000, por lo que join()
bloquea el hilo principal durante 10 segundos más o menos, y me gustaría poder hacer un eco incremental de la línea de comando para mostrar que el proceso principal no está bloqueado. Algo como:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven''t been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
¿Hay algún método para el objeto resultante o el grupo en sí que indique la cantidad de tareas restantes? Intenté usar un objeto multiprocessing.Value
como contador ( do_work
llama a un counter.value += 1
acción después de hacer su tarea), pero el contador solo llega al ~ 85% del valor total antes de detener el incremento.
Creé una clase personalizada para crear una impresión de progreso. Maby esto ayuda:
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print(''Progress: {:.2f}%''.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
Descubrí que el trabajo ya estaba hecho cuando traté de verificar su progreso. Esto es lo que funcionó para mí usando tqdm .
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
Esto debería funcionar con todos los sabores de multiprocesamiento, ya sea que bloqueen o no.
Encontré una respuesta yo mismo con un poco más de excavación: Echando un vistazo al __dict__
del objeto de resultado imap_unordered
, encontré que tiene un atributo _index
que se incrementa con cada finalización de tarea. Así que esto funciona para el registro, envuelto en el ciclo while:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
Sin embargo, encontré que el intercambio de imap_unordered
para map_async
daba map_async
resultado una ejecución mucho más rápida, aunque el objeto resultante es un poco diferente. En cambio, el objeto resultante de map_async
tiene un atributo _number_left
y un método ready()
:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
Mi favorito personal: te da una pequeña y agradable barra de progreso y ETA de finalización mientras las cosas se ejecutan y se comprometen en paralelo.
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
No es necesario acceder a los atributos privados del conjunto de resultados:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write(''/rdone {0:%}''.format(i/num_tasks))
Sé que esta es una pregunta bastante antigua, pero esto es lo que estoy haciendo cuando quiero seguir la progresión de un grupo de tareas en Python.
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
Básicamente, utiliza apply_async con un callbak (en este caso, es para agregar el valor devuelto a una lista), por lo que no tiene que esperar para hacer otra cosa. Luego, dentro de un ciclo while, verifica la progresión del trabajo. En este caso, agregué un widget para que se vea mejor.
La salida:
4 of 4
[''AA'', ''BB'', ''CC'', ''DD'']
Espero eso ayude.