procesos multitarea ejemplos crear concurrentes con python multiprocessing

ejemplos - multitarea en python



¿Muestra el progreso de una llamada de mapa de grupo de multiprocesamiento de Python? (6)

Tengo un script que realiza con éxito un grupo de tareas multiprocesamiento con una imap_unordered() a imap_unordered() :

p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work p.join() # Wait for completion

Sin embargo, mis num_tasks son alrededor de 250,000, por lo que join() bloquea el hilo principal durante 10 segundos más o menos, y me gustaría poder hacer un eco incremental de la línea de comando para mostrar que el proceso principal no está bloqueado. Algo como:

p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work while (True): remaining = rs.tasks_remaining() # How many of the map call haven''t been done yet? if (remaining == 0): break # Jump out of while loop print "Waiting for", remaining, "tasks to complete..." time.sleep(2)

¿Hay algún método para el objeto resultante o el grupo en sí que indique la cantidad de tareas restantes? Intenté usar un objeto multiprocessing.Value como contador ( do_work llama a un counter.value += 1 acción después de hacer su tarea), pero el contador solo llega al ~ 85% del valor total antes de detener el incremento.


Creé una clase personalizada para crear una impresión de progreso. Maby esto ayuda:

from multiprocessing import Pool, cpu_count class ParallelSim(object): def __init__(self, processes=cpu_count()): self.pool = Pool(processes=processes) self.total_processes = 0 self.completed_processes = 0 self.results = [] def add(self, func, args): self.pool.apply_async(func=func, args=args, callback=self.complete) self.total_processes += 1 def complete(self, result): self.results.extend(result) self.completed_processes += 1 print(''Progress: {:.2f}%''.format((self.completed_processes/self.total_processes)*100)) def run(self): self.pool.close() self.pool.join() def get_results(self): return self.results


Descubrí que el trabajo ya estaba hecho cuando traté de verificar su progreso. Esto es lo que funcionó para mí usando tqdm .

pip install tqdm

from multiprocessing import Pool from tqdm import tqdm tasks = range(5) pool = Pool() pbar = tqdm(total=len(tasks)) def do_work(x): # do something with x pbar.update(1) pool.imap_unordered(do_work, tasks) pool.close() pool.join() pbar.close()

Esto debería funcionar con todos los sabores de multiprocesamiento, ya sea que bloqueen o no.


Encontré una respuesta yo mismo con un poco más de excavación: Echando un vistazo al __dict__ del objeto de resultado imap_unordered , encontré que tiene un atributo _index que se incrementa con cada finalización de tarea. Así que esto funciona para el registro, envuelto en el ciclo while:

p = multiprocessing.Pool() rs = p.imap_unordered(do_work, xrange(num_tasks)) p.close() # No more work while (True): completed = rs._index if (completed == num_tasks): break print "Waiting for", num_tasks-completed, "tasks to complete..." time.sleep(2)

Sin embargo, encontré que el intercambio de imap_unordered para map_async daba map_async resultado una ejecución mucho más rápida, aunque el objeto resultante es un poco diferente. En cambio, el objeto resultante de map_async tiene un atributo _number_left y un método ready() :

p = multiprocessing.Pool() rs = p.map_async(do_work, xrange(num_tasks)) p.close() # No more work while (True): if (rs.ready()): break remaining = rs._number_left print "Waiting for", remaining, "tasks to complete..." time.sleep(0.5)


Mi favorito personal: te da una pequeña y agradable barra de progreso y ETA de finalización mientras las cosas se ejecutan y se comprometen en paralelo.

from multiprocessing import Pool import tqdm pool = Pool(processes=8) for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)): pass


No es necesario acceder a los atributos privados del conjunto de resultados:

from __future__ import division import sys for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1): sys.stderr.write(''/rdone {0:%}''.format(i/num_tasks))


Sé que esta es una pregunta bastante antigua, pero esto es lo que estoy haciendo cuando quiero seguir la progresión de un grupo de tareas en Python.

from progressbar import ProgressBar, SimpleProgress import multiprocessing as mp from time import sleep def my_function(letter): sleep(2) return letter+letter dummy_args = ["A", "B", "C", "D"] pool = mp.Pool(processes=2) results = [] pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start() r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args] while len(results) != len(dummy_args): pbar.update(len(results)) sleep(0.5) pbar.finish() print results

Básicamente, utiliza apply_async con un callbak (en este caso, es para agregar el valor devuelto a una lista), por lo que no tiene que esperar para hacer otra cosa. Luego, dentro de un ciclo while, verifica la progresión del trabajo. En este caso, agregué un widget para que se vea mejor.

La salida:

4 of 4 [''AA'', ''BB'', ''CC'', ''DD'']

Espero eso ayude.