threading set_start_method multiple ejemplos python locking multiprocessing python-multiprocessing

set_start_method - Comprender el multiprocesamiento: administración de memoria compartida, bloqueos y colas en Python



python pool process (1)

Multiprocessing es una herramienta poderosa en python, y quiero entenderlo más en profundidad. Quiero saber cuándo usar Locks y Queues regulares y cuándo usar un Manager multiprocesamiento para compartirlos entre todos los procesos.

Se me ocurrieron los siguientes escenarios de prueba con cuatro condiciones diferentes para el multiprocesamiento:

  1. Usando un grupo y NO Manager

  2. Usando un grupo y un administrador

  3. Usando procesos individuales y NO Manager

  4. Usando procesos individuales y un Gerente

El trabajo

Todas las condiciones ejecutan una función de trabajo the_job . the_job consiste en una impresión que está asegurada por un bloqueo. Además, la entrada a la función simplemente se pone en una cola (para ver si se puede recuperar de la cola). Esta entrada es simplemente un índice idx del range(10) creado en el script principal llamado start_scenario (que se muestra en la parte inferior).

def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print ''I'' print ''was '' print ''here '' print ''!!!!'' print ''1111'' print ''einhundertelfzigelf/n'' who= '' By run %d /n'' % idx print who lock.release() queue.put(idx)

El éxito de una condición se define como recordar perfectamente la entrada de la cola, ver la función read_queue en la parte inferior.

Las condiciones

Las condiciones 1 y 2 son bastante autoexplicativas. La condición 1 implica crear un bloqueo y una cola, y pasarlos a un grupo de procesos:

def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue)

(La función auxiliar make_iterator aparece en la parte inferior de esta publicación.) La condición 1 falla con RuntimeError: Lock objects should only be shared between processes through inheritance .

La condición 2 es bastante similar, pero ahora la cerradura y la cola están bajo la supervisión de un administrador:

def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.imap(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue)

En la condición 3, los nuevos procesos se inician manualmente, y el bloqueo y la cola se crean sin un administrador:

def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue)

La condición 4 es similar pero nuevamente ahora usando un administrador:

def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue)

En ambas condiciones, 3 y 4, comienzo un nuevo proceso para cada una de las 10 tareas del the_job con la mayoría de los procesos ncores operando al mismo tiempo. Esto se logra con la siguiente función auxiliar:

def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1)

El resultado

Solo la condición 1 falla ( RuntimeError: Lock objects should only be shared between processes through inheritance ) mientras que las otras 3 condiciones son exitosas. Intento entender este resultado.

¿Por qué el grupo necesita compartir un bloqueo y una cola entre todos los procesos, pero los procesos individuales de la condición 3 no lo hacen?

Lo que sé es que para las condiciones de grupo (1 y 2) todos los datos de los iteradores pasan por decapado, mientras que en condiciones de proceso único (3 y 4) todos los datos de los iteradores pasan por herencia del proceso principal (yo soy usando Linux ). Supongo que hasta que se cambie la memoria desde un proceso secundario, se accede a la misma memoria que usa el proceso parental (copy-on-write). Pero tan pronto como uno dice lock.acquire() , esto debería cambiarse y los procesos secundarios usan bloqueos diferentes colocados en otro lugar en la memoria, ¿no? ¿Cómo sabe un niño procesar que un hermano ha activado un bloqueo que no se comparte a través de un administrador?

Finalmente, algo relacionado es mi pregunta sobre las diferentes condiciones 3 y 4. Ambos tienen procesos individuales, pero difieren en el uso de un administrador. ¿Ambos se consideran un código válido ? ¿O debería uno evitar el uso de un administrador si realmente no hay necesidad de uno?

Script completo

Para aquellos que simplemente quieren copiar y pegar todo para ejecutar el código, aquí está el script completo:

__author__ = ''Me and myself'' import multiprocessing as mp import time def the_job(args): """The job for multiprocessing. Prints some stuff secured by a lock and finally puts the input into a queue. """ idx = args[0] lock = args[1] queue=args[2] lock.acquire() print ''I'' print ''was '' print ''here '' print ''!!!!'' print ''1111'' print ''einhundertelfzigelf/n'' who= '' By run %d /n'' % idx print who lock.release() queue.put(idx) def read_queue(queue): """Turns a qeue into a normal python list.""" results = [] while not queue.empty(): result = queue.get() results.append(result) return results def make_iterator(args, lock, queue): """Makes an iterator over args and passes the lock an queue to each element.""" return ((arg, lock, queue) for arg in args) def start_scenario(scenario_number = 1): """Starts one of four multiprocessing scenarios. :param scenario_number: Index of scenario, 1 to 4 """ args = range(10) ncores = 3 if scenario_number==1: result = scenario_1_pool_no_manager(the_job, args, ncores) elif scenario_number==2: result = scenario_2_pool_manager(the_job, args, ncores) elif scenario_number==3: result = scenario_3_single_processes_no_manager(the_job, args, ncores) elif scenario_number==4: result = scenario_4_single_processes_manager(the_job, args, ncores) if result != args: print ''Scenario %d fails: %s != %s'' % (scenario_number, args, result) else: print ''Scenario %d successful!'' % scenario_number def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. FAILS! """ mypool = mp.Pool(ncores) lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_2_pool_manager(jobfunc, args, ncores): """Runs a pool of processes WITH a Manager for the lock and queue. SUCCESSFUL! """ mypool = mp.Pool(ncores) lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) mypool.map(jobfunc, iterator) mypool.close() mypool.join() return read_queue(queue) def scenario_3_single_processes_no_manager(jobfunc, args, ncores): """Runs an individual process for every task WITHOUT a Manager, SUCCESSFUL! """ lock = mp.Lock() queue = mp.Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def scenario_4_single_processes_manager(jobfunc, args, ncores): """Runs an individual process for every task WITH a Manager, SUCCESSFUL! """ lock = mp.Manager().Lock() queue = mp.Manager().Queue() iterator = make_iterator(args, lock, queue) do_job_single_processes(jobfunc, iterator, ncores) return read_queue(queue) def do_job_single_processes(jobfunc, iterator, ncores): """Runs a job function by starting individual processes for every task. At most `ncores` processes operate at the same time :param jobfunc: Job to do :param iterator: Iterator over different parameter settings, contains a lock and a queue :param ncores: Number of processes operating at the same time """ keep_running=True process_dict = {} # Dict containing all subprocees while len(process_dict)>0 or keep_running: terminated_procs_pids = [] # First check if some processes did finish their job for pid, proc in process_dict.iteritems(): # Remember the terminated processes if not proc.is_alive(): terminated_procs_pids.append(pid) # And delete these from the process dict for terminated_proc in terminated_procs_pids: process_dict.pop(terminated_proc) # If we have less active processes than ncores and there is still # a job to do, add another process if len(process_dict) < ncores and keep_running: try: task = iterator.next() proc = mp.Process(target=jobfunc, args=(task,)) proc.start() process_dict[proc.pid]=proc except StopIteration: # All tasks have been started keep_running=False time.sleep(0.1) def main(): """Runs 1 out of 4 different multiprocessing scenarios""" start_scenario(1) if __name__ == ''__main__'': main()


multiprocessing.Lock se implementa utilizando un objeto Semaphore proporcionado por el sistema operativo. En Linux, el niño simplemente hereda un identificador para el semáforo del padre a través de os.fork . Esta no es una copia del semáforo; en realidad está heredando el mismo identificador que el padre, de la misma manera que los descriptores de archivos pueden heredarse. Windows, por otro lado, no es compatible con os.fork , por lo que tiene que desenterrar el Lock . Lo hace creando un identificador duplicado para el semáforo de Windows utilizado internamente por el objeto multiprocessing.Lock , utilizando la API Windows DuplicateHandle , que establece:

El identificador duplicado se refiere al mismo objeto que el identificador original. Por lo tanto, cualquier cambio en el objeto se refleja a través de ambas manijas

La API DuplicateHandle permite ceder la propiedad del identificador duplicado al proceso hijo, para que el proceso hijo pueda usarlo después de deshacerte de él. Al crear un identificador duplicado propiedad del niño, puede "compartir" efectivamente el objeto de bloqueo.

Aquí está el objeto de semáforo en multiprocessing/synchronize.py

class SemLock(object): def __init__(self, kind, value, maxvalue): sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue) debug(''created semlock with handle %s'' % sl.handle) self._make_methods() if sys.platform != ''win32'': def _after_fork(obj): obj._semlock._after_fork() register_after_fork(self, _after_fork) def _make_methods(self): self.acquire = self._semlock.acquire self.release = self._semlock.release self.__enter__ = self._semlock.__enter__ self.__exit__ = self._semlock.__exit__ def __getstate__(self): # This is called when you try to pickle the `Lock`. assert_spawning(self) sl = self._semlock return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue) def __setstate__(self, state): # This is called when unpickling a `Lock` self._semlock = _multiprocessing.SemLock._rebuild(*state) debug(''recreated blocker with handle %r'' % state[0]) self._make_methods()

Tenga en cuenta la llamada assert_spawning en __getstate__ , que se llama cuando se decapa el objeto. Así es como se implementa eso:

# # Check that the current thread is spawning a child process # def assert_spawning(self): if not Popen.thread_is_spawning(): raise RuntimeError( ''%s objects should only be shared between processes'' '' through inheritance'' % type(self).__name__ )

Esa función es la que asegura que estás "heredando" el Lock , llamando a thread_is_spawning . En Linux, ese método simplemente devuelve False :

@staticmethod def thread_is_spawning(): return False

Esto se debe a que Linux no necesita agrupar para heredar Lock , por lo que si __getstate__ se está llamando en Linux, no debemos heredarlo. En Windows, hay más cosas en juego:

def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) class Popen(object): '''''' Start a subprocess to run the code of a process object '''''' _tls = thread._local() def __init__(self, process_obj): ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, ''wb'') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() @staticmethod def thread_is_spawning(): return getattr(Popen._tls, ''process_handle'', None) is not None

Aquí, thread_is_spawning devuelve True si el objeto Popen._tls tiene un atributo process_handle . Podemos ver que el atributo process_handle se crea en __init__ , luego los datos que queremos heredar pasan del padre al hijo usando dump , y luego se elimina el atributo. Entonces thread_is_spawning solo será True durante __init__ . De acuerdo con este hilo de la lista de correo de python-ideas , esto es en realidad una limitación artificial añadida para simular el mismo comportamiento que os.fork en Linux. En realidad, Windows podría admitir pasar el Lock en cualquier momento, ya que DuplicateHandle se puede ejecutar en cualquier momento.

Todo lo anterior se aplica al objeto Queue porque usa Lock internamente.

Diría que heredar objetos de Lock es preferible a usar un Lock Manager.Lock() , porque cuando utilizas un Lock Manager , cada llamada que realizas al Lock debe enviarse a través del IPC al proceso de Manager , que será mucho más lento que usar un Lock compartido que vive dentro del proceso de llamada. Sin embargo, ambos enfoques son perfectamente válidos.

Finalmente, es posible pasar un Lock a todos los miembros de un grupo sin usar un Manager , usando los argumentos de la palabra clave initializer / initargs :

lock = None def initialize_lock(l): global lock lock = l def scenario_1_pool_no_manager(jobfunc, args, ncores): """Runs a pool of processes WITHOUT a Manager for the lock and queue. """ lock = mp.Lock() mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,)) queue = mp.Queue() iterator = make_iterator(args, queue) mypool.imap(jobfunc, iterator) # Don''t pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly. mypool.close() mypool.join() return read_queue(queue)

Esto funciona porque los argumentos pasados ​​a initargs pasan al método __init__ de los objetos Process que se ejecutan dentro del Pool , por lo que terminan siendo heredados, en lugar de conservados.