thread programming parallel example python multithreading parallel-processing multiprocessing joblib

python - programming - Seguimiento del progreso de la ejecución joblib.Parallel



python multiprocessing example (3)

La documentación que ha vinculado indica que Parallel tiene un medidor de progreso opcional. Se implementa utilizando el argumento de palabra clave de callback proporcionado por multiprocessing.Pool.apply_async :

# This is inside a dispatch function self._lock.acquire() job = self._pool.apply_async(SafeFunction(func), args, kwargs, callback=CallBack(self.n_dispatched, self)) self._jobs.append(job) self.n_dispatched += 1

...

class CallBack(object): """ Callback used by parallel: it is used for progress reporting, and to add data to be processed """ def __init__(self, index, parallel): self.parallel = parallel self.index = index def __call__(self, out): self.parallel.print_progress(self.index) if self.parallel._original_iterable: self.parallel.dispatch_next()

Y aquí está print_progress :

def print_progress(self, index): elapsed_time = time.time() - self._start_time # This is heuristic code to print only ''verbose'' times a messages # The challenge is that we may not know the queue length if self._original_iterable: if _verbosity_filter(index, self.verbose): return self._print(''Done %3i jobs | elapsed: %s'', (index + 1, short_format_time(elapsed_time), )) else: # We are finished dispatching queue_length = self.n_dispatched # We always display the first loop if not index == 0: # Display depending on the number of remaining items # A message as soon as we finish dispatching, cursor is 0 cursor = (queue_length - index + 1 - self._pre_dispatch_amount) frequency = (queue_length // self.verbose) + 1 is_last_item = (index + 1 == queue_length) if (is_last_item or cursor % frequency): return remaining_time = (elapsed_time / (index + 1) * (self.n_dispatched - index - 1.)) self._print(''Done %3i out of %3i | elapsed: %s remaining: %s'', (index + 1, queue_length, short_format_time(elapsed_time), short_format_time(remaining_time), ))

La forma en que implementan esto es algo raro, para ser honesto, parece suponer que las tareas siempre se completarán en el orden en que se inician. La variable de index que va a print_progress es solo la variable self.n_dispatched en el momento en que realmente se inició el trabajo. Por lo tanto, el primer trabajo lanzado siempre terminará con un index de 0, aunque digamos que el tercer trabajo finalizó primero. También significa que en realidad no llevan un registro de la cantidad de trabajos completados . Entonces no hay una variable de instancia para que usted pueda monitorear.

Creo que lo mejor que puedes hacer es crear tu propia clase de CallBack y el parche de mono Paralelo:

from math import sqrt from collections import defaultdict from joblib import Parallel, delayed class CallBack(object): completed = defaultdict(int) def __init__(self, index, parallel): self.index = index self.parallel = parallel def __call__(self, index): CallBack.completed[self.parallel] += 1 print("done with {}".format(CallBack.completed[self.parallel])) if self.parallel._original_iterable: self.parallel.dispatch_next() import joblib.parallel joblib.parallel.CallBack = CallBack if __name__ == "__main__": print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))

Salida:

done with 1 done with 2 done with 3 done with 4 done with 5 done with 6 done with 7 done with 8 done with 9 done with 10 [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

De esta forma, se llama a su devolución de llamada cuando se completa un trabajo, en lugar de hacerlo por defecto.

¿Hay una manera simple de seguir el progreso general de una ejecución joblib.Parallel ?

Tengo una ejecución de larga duración compuesta de miles de trabajos, que quiero rastrear y registrar en una base de datos. Sin embargo, para hacer eso, cada vez que Parallel finaliza una tarea, necesito que ejecute una devolución de llamada, informando cuántos trabajos restantes quedan.

Realicé una tarea similar antes con el multiprocesamiento. Pool de stdlib de Python, al lanzar un hilo que registra el número de trabajos pendientes en la lista de trabajos de Pool.

Al mirar el código, Parallel hereda el Pool, así que pensé que podría hacer el mismo truco, pero no parece usar esta lista, y no he podido averiguar cómo más "leer" es interno. estado de otra manera.


Ampliando la respuesta de dano para la versión más reciente de la biblioteca joblib. Hubo un par de cambios en la implementación interna.

from joblib import Parallel, delayed from collections import defaultdict # patch joblib progress callback class BatchCompletionCallBack(object): completed = defaultdict(int) def __init__(self, time, index, parallel): self.index = index self.parallel = parallel def __call__(self, index): BatchCompletionCallBack.completed[self.parallel] += 1 print("done with {}".format(BatchCompletionCallBack.completed[self.parallel])) if self.parallel._original_iterator is not None: self.parallel.dispatch_next() import joblib.parallel joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack


Aquí hay otra respuesta a su pregunta con la siguiente sintaxis:

aprun = ParallelExecutor(n_jobs=5) a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5)) a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) a2 = aprun(bar=''txt'')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4)) a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))

https://.com/a/40415477/232371