manager - python parallel processing
Gestores de contexto y pools de multiprocesamiento. (2)
En primer lugar, esta es una gran pregunta! Después de profundizar un poco en el código de multiprocessing
, creo que he encontrado una manera de hacer esto:
Cuando inicia un multiprocessing.Pool
, internamente el objeto Pool
crea un objeto multiprocessing.Process
para cada miembro del pool. Cuando esos subprocesos se están iniciando, llaman a una función _bootstrap
, que se parece a esto:
def _bootstrap(self):
from . import util
global _current_process
try:
# ... (stuff we don''t care about)
util._finalizer_registry.clear()
util._run_after_forkers()
util.info(''child process calling self.run()'')
try:
self.run()
exitcode = 0
finally:
util._exit_function()
# ... (more stuff we don''t care about)
El método de run
es lo que realmente ejecuta el target
que le dio al objeto Process
. Para un proceso de Agrupación es un método con un bucle while de larga ejecución que espera que los elementos de trabajo entren en una cola interna. Lo que es realmente interesante para nosotros es lo que sucedió después de self.run
: util._exit_function()
.
Resulta que, esa función hace una limpieza que suena muy parecida a lo que estás buscando:
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
active_children=active_children,
current_process=current_process):
# NB: we hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module''s globals are destroyed.
global _exiting
info(''process shutting down'')
debug(''running all "atexit" finalizers with priority >= 0'') # Very interesting!
_run_finalizers(0)
Aquí está el docstring de _run_finalizers
:
def _run_finalizers(minpriority=None):
''''''
Run all finalizers whose exit priority is not None and at least minpriority
Finalizers with highest priority are called first; finalizers with
the same priority will be called in reverse order of creation.
''''''
El método en realidad se ejecuta a través de una lista de devoluciones de llamada de finalizador y las ejecuta:
items = [x for x in _finalizer_registry.items() if f(x)]
items.sort(reverse=True)
for key, finalizer in items:
sub_debug(''calling %s'', finalizer)
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
Perfecto. Entonces, ¿cómo llegamos al _finalizer_registry
? Hay un objeto no documentado llamado Finalize
en multiprocessing.util
que se encarga de agregar una devolución de llamada al registro:
class Finalize(object):
''''''
Class which supports object finalization using weakrefs
''''''
def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
assert exitpriority is None or type(exitpriority) is int
if obj is not None:
self._weakref = weakref.ref(obj, self)
else:
assert exitpriority is not None
self._callback = callback
self._args = args
self._kwargs = kwargs or {}
self._key = (exitpriority, _finalizer_counter.next())
self._pid = os.getpid()
_finalizer_registry[self._key] = self # That''s what we''re looking for!
Ok, así que poniéndolo todo junto en un ejemplo:
import multiprocessing
from multiprocessing.util import Finalize
resource_cm = None
resource = None
class Resource(object):
def __init__(self, args):
self.args = args
def __enter__(self):
print("in __enter__ of %s" % multiprocessing.current_process())
return self
def __exit__(self, *args, **kwargs):
print("in __exit__ of %s" % multiprocessing.current_process())
def open_resource(args):
return Resource(args)
def _worker_init(args):
global resource
print("calling init")
resource_cm = open_resource(args)
resource = resource_cm.__enter__()
# Register a finalizer
Finalize(resource, resource.__exit__, exitpriority=16)
def hi(*args):
print("we''re in the worker")
if __name__ == "__main__":
pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",))
pool.map(hi, range(pool._processes))
pool.close()
pool.join()
Salida:
calling init
in __enter__ of <Process(PoolWorker-1, started daemon)>
calling init
calling init
in __enter__ of <Process(PoolWorker-2, started daemon)>
in __enter__ of <Process(PoolWorker-3, started daemon)>
calling init
in __enter__ of <Process(PoolWorker-4, started daemon)>
we''re in the worker
we''re in the worker
we''re in the worker
we''re in the worker
in __exit__ of <Process(PoolWorker-1, started daemon)>
in __exit__ of <Process(PoolWorker-2, started daemon)>
in __exit__ of <Process(PoolWorker-3, started daemon)>
in __exit__ of <Process(PoolWorker-4, started daemon)>
Como puede ver, a __exit__
se le llama a todos nuestros trabajadores cuando nos join()
al grupo.
Supongamos que está utilizando un objeto multiprocessing.Pool
y está utilizando la configuración de initializer
del constructor para pasar una función de inicialización que luego crea un recurso en el espacio de nombres global. Supongamos que el recurso tiene un administrador de contexto. ¿Cómo manejaría el ciclo de vida del recurso administrado de contexto siempre que tenga que vivir la vida del proceso, pero que se limpie adecuadamente al final?
Hasta ahora, tengo algo como esto:
resource_cm = None
resource = None
def _worker_init(args):
global resource
resource_cm = open_resource(args)
resource = resource_cm.__enter__()
A partir de aquí, los procesos del pool pueden utilizar el recurso. Hasta ahora tan bueno. Pero manejar la limpieza es un poco más complicado, ya que la clase multiprocessing.Pool
no proporciona un argumento destructor
o deinitializer
.
Una de mis ideas es usar el módulo atexit
y registrar la limpieza en el inicializador. Algo como esto:
def _worker_init(args):
global resource
resource_cm = open_resource(args)
resource = resource_cm.__enter__()
def _clean_up():
resource_cm.__exit__()
import atexit
atexit.register(_clean_up)
¿Es este un buen enfoque? ¿Hay una manera más fácil de hacer esto?
EDITAR: atexit
no parece funcionar. Al menos no en la forma en que lo estoy usando arriba, así que a partir de ahora todavía no tengo una solución para este problema.
Puede subclasificar el Process
y anular su método run()
para que realice la limpieza antes de salir. Entonces deberías subclase Pool
para que use tu proceso subclasificado:
from multiprocessing import Process
from multiprocessing.pool import Pool
class SafeProcess(Process):
""" Process that will cleanup before exit """
def run(self, *args, **kw):
result = super().run(*args, **kw)
# cleanup however you want here
return result
class SafePool(Pool):
Process = SafeProcess
pool = SafePool(4) # use it as standard Pool