parallel manager lock example current python multiprocessing contextmanager

manager - python parallel processing



Gestores de contexto y pools de multiprocesamiento. (2)

En primer lugar, esta es una gran pregunta! Después de profundizar un poco en el código de multiprocessing , creo que he encontrado una manera de hacer esto:

Cuando inicia un multiprocessing.Pool , internamente el objeto Pool crea un objeto multiprocessing.Process para cada miembro del pool. Cuando esos subprocesos se están iniciando, llaman a una función _bootstrap , que se parece a esto:

def _bootstrap(self): from . import util global _current_process try: # ... (stuff we don''t care about) util._finalizer_registry.clear() util._run_after_forkers() util.info(''child process calling self.run()'') try: self.run() exitcode = 0 finally: util._exit_function() # ... (more stuff we don''t care about)

El método de run es lo que realmente ejecuta el target que le dio al objeto Process . Para un proceso de Agrupación es un método con un bucle while de larga ejecución que espera que los elementos de trabajo entren en una cola interna. Lo que es realmente interesante para nosotros es lo que sucedió después de self.run : util._exit_function() .

Resulta que, esa función hace una limpieza que suena muy parecida a lo que estás buscando:

def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers, active_children=active_children, current_process=current_process): # NB: we hold on to references to functions in the arglist due to the # situation described below, where this function is called after this # module''s globals are destroyed. global _exiting info(''process shutting down'') debug(''running all "atexit" finalizers with priority >= 0'') # Very interesting! _run_finalizers(0)

Aquí está el docstring de _run_finalizers :

def _run_finalizers(minpriority=None): '''''' Run all finalizers whose exit priority is not None and at least minpriority Finalizers with highest priority are called first; finalizers with the same priority will be called in reverse order of creation. ''''''

El método en realidad se ejecuta a través de una lista de devoluciones de llamada de finalizador y las ejecuta:

items = [x for x in _finalizer_registry.items() if f(x)] items.sort(reverse=True) for key, finalizer in items: sub_debug(''calling %s'', finalizer) try: finalizer() except Exception: import traceback traceback.print_exc()

Perfecto. Entonces, ¿cómo llegamos al _finalizer_registry ? Hay un objeto no documentado llamado Finalize en multiprocessing.util que se encarga de agregar una devolución de llamada al registro:

class Finalize(object): '''''' Class which supports object finalization using weakrefs '''''' def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None): assert exitpriority is None or type(exitpriority) is int if obj is not None: self._weakref = weakref.ref(obj, self) else: assert exitpriority is not None self._callback = callback self._args = args self._kwargs = kwargs or {} self._key = (exitpriority, _finalizer_counter.next()) self._pid = os.getpid() _finalizer_registry[self._key] = self # That''s what we''re looking for!

Ok, así que poniéndolo todo junto en un ejemplo:

import multiprocessing from multiprocessing.util import Finalize resource_cm = None resource = None class Resource(object): def __init__(self, args): self.args = args def __enter__(self): print("in __enter__ of %s" % multiprocessing.current_process()) return self def __exit__(self, *args, **kwargs): print("in __exit__ of %s" % multiprocessing.current_process()) def open_resource(args): return Resource(args) def _worker_init(args): global resource print("calling init") resource_cm = open_resource(args) resource = resource_cm.__enter__() # Register a finalizer Finalize(resource, resource.__exit__, exitpriority=16) def hi(*args): print("we''re in the worker") if __name__ == "__main__": pool = multiprocessing.Pool(initializer=_worker_init, initargs=("abc",)) pool.map(hi, range(pool._processes)) pool.close() pool.join()

Salida:

calling init in __enter__ of <Process(PoolWorker-1, started daemon)> calling init calling init in __enter__ of <Process(PoolWorker-2, started daemon)> in __enter__ of <Process(PoolWorker-3, started daemon)> calling init in __enter__ of <Process(PoolWorker-4, started daemon)> we''re in the worker we''re in the worker we''re in the worker we''re in the worker in __exit__ of <Process(PoolWorker-1, started daemon)> in __exit__ of <Process(PoolWorker-2, started daemon)> in __exit__ of <Process(PoolWorker-3, started daemon)> in __exit__ of <Process(PoolWorker-4, started daemon)>

Como puede ver, a __exit__ se le llama a todos nuestros trabajadores cuando nos join() al grupo.

Supongamos que está utilizando un objeto multiprocessing.Pool y está utilizando la configuración de initializer del constructor para pasar una función de inicialización que luego crea un recurso en el espacio de nombres global. Supongamos que el recurso tiene un administrador de contexto. ¿Cómo manejaría el ciclo de vida del recurso administrado de contexto siempre que tenga que vivir la vida del proceso, pero que se limpie adecuadamente al final?

Hasta ahora, tengo algo como esto:

resource_cm = None resource = None def _worker_init(args): global resource resource_cm = open_resource(args) resource = resource_cm.__enter__()

A partir de aquí, los procesos del pool pueden utilizar el recurso. Hasta ahora tan bueno. Pero manejar la limpieza es un poco más complicado, ya que la clase multiprocessing.Pool no proporciona un argumento destructor o deinitializer .

Una de mis ideas es usar el módulo atexit y registrar la limpieza en el inicializador. Algo como esto:

def _worker_init(args): global resource resource_cm = open_resource(args) resource = resource_cm.__enter__() def _clean_up(): resource_cm.__exit__() import atexit atexit.register(_clean_up)

¿Es este un buen enfoque? ¿Hay una manera más fácil de hacer esto?

EDITAR: atexit no parece funcionar. Al menos no en la forma en que lo estoy usando arriba, así que a partir de ahora todavía no tengo una solución para este problema.


Puede subclasificar el Process y anular su método run() para que realice la limpieza antes de salir. Entonces deberías subclase Pool para que use tu proceso subclasificado:

from multiprocessing import Process from multiprocessing.pool import Pool class SafeProcess(Process): """ Process that will cleanup before exit """ def run(self, *args, **kw): result = super().run(*args, **kw) # cleanup however you want here return result class SafePool(Pool): Process = SafeProcess pool = SafePool(4) # use it as standard Pool