python - programming - Seguimiento del progreso de la ejecución joblib.Parallel
python multiprocessing example (3)
La documentación que ha vinculado indica que Parallel
tiene un medidor de progreso opcional. Se implementa utilizando el argumento de palabra clave de callback
proporcionado por multiprocessing.Pool.apply_async
:
# This is inside a dispatch function
self._lock.acquire()
job = self._pool.apply_async(SafeFunction(func), args,
kwargs, callback=CallBack(self.n_dispatched, self))
self._jobs.append(job)
self.n_dispatched += 1
...
class CallBack(object):
""" Callback used by parallel: it is used for progress reporting, and
to add data to be processed
"""
def __init__(self, index, parallel):
self.parallel = parallel
self.index = index
def __call__(self, out):
self.parallel.print_progress(self.index)
if self.parallel._original_iterable:
self.parallel.dispatch_next()
Y aquí está print_progress
:
def print_progress(self, index):
elapsed_time = time.time() - self._start_time
# This is heuristic code to print only ''verbose'' times a messages
# The challenge is that we may not know the queue length
if self._original_iterable:
if _verbosity_filter(index, self.verbose):
return
self._print(''Done %3i jobs | elapsed: %s'',
(index + 1,
short_format_time(elapsed_time),
))
else:
# We are finished dispatching
queue_length = self.n_dispatched
# We always display the first loop
if not index == 0:
# Display depending on the number of remaining items
# A message as soon as we finish dispatching, cursor is 0
cursor = (queue_length - index + 1
- self._pre_dispatch_amount)
frequency = (queue_length // self.verbose) + 1
is_last_item = (index + 1 == queue_length)
if (is_last_item or cursor % frequency):
return
remaining_time = (elapsed_time / (index + 1) *
(self.n_dispatched - index - 1.))
self._print(''Done %3i out of %3i | elapsed: %s remaining: %s'',
(index + 1,
queue_length,
short_format_time(elapsed_time),
short_format_time(remaining_time),
))
La forma en que implementan esto es algo raro, para ser honesto, parece suponer que las tareas siempre se completarán en el orden en que se inician. La variable de index
que va a print_progress
es solo la variable self.n_dispatched
en el momento en que realmente se inició el trabajo. Por lo tanto, el primer trabajo lanzado siempre terminará con un index
de 0, aunque digamos que el tercer trabajo finalizó primero. También significa que en realidad no llevan un registro de la cantidad de trabajos completados . Entonces no hay una variable de instancia para que usted pueda monitorear.
Creo que lo mejor que puedes hacer es crear tu propia clase de CallBack y el parche de mono Paralelo:
from math import sqrt
from collections import defaultdict
from joblib import Parallel, delayed
class CallBack(object):
completed = defaultdict(int)
def __init__(self, index, parallel):
self.index = index
self.parallel = parallel
def __call__(self, index):
CallBack.completed[self.parallel] += 1
print("done with {}".format(CallBack.completed[self.parallel]))
if self.parallel._original_iterable:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.CallBack = CallBack
if __name__ == "__main__":
print(Parallel(n_jobs=2)(delayed(sqrt)(i**2) for i in range(10)))
Salida:
done with 1
done with 2
done with 3
done with 4
done with 5
done with 6
done with 7
done with 8
done with 9
done with 10
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
De esta forma, se llama a su devolución de llamada cuando se completa un trabajo, en lugar de hacerlo por defecto.
¿Hay una manera simple de seguir el progreso general de una ejecución joblib.Parallel ?
Tengo una ejecución de larga duración compuesta de miles de trabajos, que quiero rastrear y registrar en una base de datos. Sin embargo, para hacer eso, cada vez que Parallel finaliza una tarea, necesito que ejecute una devolución de llamada, informando cuántos trabajos restantes quedan.
Realicé una tarea similar antes con el multiprocesamiento. Pool de stdlib de Python, al lanzar un hilo que registra el número de trabajos pendientes en la lista de trabajos de Pool.
Al mirar el código, Parallel hereda el Pool, así que pensé que podría hacer el mismo truco, pero no parece usar esta lista, y no he podido averiguar cómo más "leer" es interno. estado de otra manera.
Ampliando la respuesta de dano para la versión más reciente de la biblioteca joblib. Hubo un par de cambios en la implementación interna.
from joblib import Parallel, delayed
from collections import defaultdict
# patch joblib progress callback
class BatchCompletionCallBack(object):
completed = defaultdict(int)
def __init__(self, time, index, parallel):
self.index = index
self.parallel = parallel
def __call__(self, index):
BatchCompletionCallBack.completed[self.parallel] += 1
print("done with {}".format(BatchCompletionCallBack.completed[self.parallel]))
if self.parallel._original_iterator is not None:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.BatchCompletionCallBack = BatchCompletionCallBack
Aquí hay otra respuesta a su pregunta con la siguiente sintaxis:
aprun = ParallelExecutor(n_jobs=5)
a1 = aprun(total=25)(delayed(func)(i ** 2 + j) for i in range(5) for j in range(5))
a2 = aprun(total=16)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=''txt'')(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
a2 = aprun(bar=None)(delayed(func)(i ** 2 + j) for i in range(4) for j in range(4))
https://.com/a/40415477/232371