set_start_method - Comprender el multiprocesamiento: administración de memoria compartida, bloqueos y colas en Python
python pool process (1)
Multiprocessing es una herramienta poderosa en python, y quiero entenderlo más en profundidad. Quiero saber cuándo usar Locks y Queues regulares y cuándo usar un Manager multiprocesamiento para compartirlos entre todos los procesos.
Se me ocurrieron los siguientes escenarios de prueba con cuatro condiciones diferentes para el multiprocesamiento:
Usando un grupo y NO Manager
Usando un grupo y un administrador
Usando procesos individuales y NO Manager
Usando procesos individuales y un Gerente
El trabajo
Todas las condiciones ejecutan una función de trabajo the_job
. the_job
consiste en una impresión que está asegurada por un bloqueo. Además, la entrada a la función simplemente se pone en una cola (para ver si se puede recuperar de la cola). Esta entrada es simplemente un índice idx
del range(10)
creado en el script principal llamado start_scenario
(que se muestra en la parte inferior).
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print ''I''
print ''was ''
print ''here ''
print ''!!!!''
print ''1111''
print ''einhundertelfzigelf/n''
who= '' By run %d /n'' % idx
print who
lock.release()
queue.put(idx)
El éxito de una condición se define como recordar perfectamente la entrada de la cola, ver la función read_queue
en la parte inferior.
Las condiciones
Las condiciones 1 y 2 son bastante autoexplicativas. La condición 1 implica crear un bloqueo y una cola, y pasarlos a un grupo de procesos:
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
(La función auxiliar make_iterator
aparece en la parte inferior de esta publicación.) La condición 1 falla con RuntimeError: Lock objects should only be shared between processes through inheritance
.
La condición 2 es bastante similar, pero ahora la cerradura y la cola están bajo la supervisión de un administrador:
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.imap(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
En la condición 3, los nuevos procesos se inician manualmente, y el bloqueo y la cola se crean sin un administrador:
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
La condición 4 es similar pero nuevamente ahora usando un administrador:
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
En ambas condiciones, 3 y 4, comienzo un nuevo proceso para cada una de las 10 tareas del the_job
con la mayoría de los procesos ncores operando al mismo tiempo. Esto se logra con la siguiente función auxiliar:
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
El resultado
Solo la condición 1 falla ( RuntimeError: Lock objects should only be shared between processes through inheritance
) mientras que las otras 3 condiciones son exitosas. Intento entender este resultado.
¿Por qué el grupo necesita compartir un bloqueo y una cola entre todos los procesos, pero los procesos individuales de la condición 3 no lo hacen?
Lo que sé es que para las condiciones de grupo (1 y 2) todos los datos de los iteradores pasan por decapado, mientras que en condiciones de proceso único (3 y 4) todos los datos de los iteradores pasan por herencia del proceso principal (yo soy usando Linux ). Supongo que hasta que se cambie la memoria desde un proceso secundario, se accede a la misma memoria que usa el proceso parental (copy-on-write). Pero tan pronto como uno dice lock.acquire()
, esto debería cambiarse y los procesos secundarios usan bloqueos diferentes colocados en otro lugar en la memoria, ¿no? ¿Cómo sabe un niño procesar que un hermano ha activado un bloqueo que no se comparte a través de un administrador?
Finalmente, algo relacionado es mi pregunta sobre las diferentes condiciones 3 y 4. Ambos tienen procesos individuales, pero difieren en el uso de un administrador. ¿Ambos se consideran un código válido ? ¿O debería uno evitar el uso de un administrador si realmente no hay necesidad de uno?
Script completo
Para aquellos que simplemente quieren copiar y pegar todo para ejecutar el código, aquí está el script completo:
__author__ = ''Me and myself''
import multiprocessing as mp
import time
def the_job(args):
"""The job for multiprocessing.
Prints some stuff secured by a lock and
finally puts the input into a queue.
"""
idx = args[0]
lock = args[1]
queue=args[2]
lock.acquire()
print ''I''
print ''was ''
print ''here ''
print ''!!!!''
print ''1111''
print ''einhundertelfzigelf/n''
who= '' By run %d /n'' % idx
print who
lock.release()
queue.put(idx)
def read_queue(queue):
"""Turns a qeue into a normal python list."""
results = []
while not queue.empty():
result = queue.get()
results.append(result)
return results
def make_iterator(args, lock, queue):
"""Makes an iterator over args and passes the lock an queue to each element."""
return ((arg, lock, queue) for arg in args)
def start_scenario(scenario_number = 1):
"""Starts one of four multiprocessing scenarios.
:param scenario_number: Index of scenario, 1 to 4
"""
args = range(10)
ncores = 3
if scenario_number==1:
result = scenario_1_pool_no_manager(the_job, args, ncores)
elif scenario_number==2:
result = scenario_2_pool_manager(the_job, args, ncores)
elif scenario_number==3:
result = scenario_3_single_processes_no_manager(the_job, args, ncores)
elif scenario_number==4:
result = scenario_4_single_processes_manager(the_job, args, ncores)
if result != args:
print ''Scenario %d fails: %s != %s'' % (scenario_number, args, result)
else:
print ''Scenario %d successful!'' % scenario_number
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
FAILS!
"""
mypool = mp.Pool(ncores)
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_2_pool_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITH a Manager for the lock and queue.
SUCCESSFUL!
"""
mypool = mp.Pool(ncores)
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
mypool.map(jobfunc, iterator)
mypool.close()
mypool.join()
return read_queue(queue)
def scenario_3_single_processes_no_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITHOUT a Manager,
SUCCESSFUL!
"""
lock = mp.Lock()
queue = mp.Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def scenario_4_single_processes_manager(jobfunc, args, ncores):
"""Runs an individual process for every task WITH a Manager,
SUCCESSFUL!
"""
lock = mp.Manager().Lock()
queue = mp.Manager().Queue()
iterator = make_iterator(args, lock, queue)
do_job_single_processes(jobfunc, iterator, ncores)
return read_queue(queue)
def do_job_single_processes(jobfunc, iterator, ncores):
"""Runs a job function by starting individual processes for every task.
At most `ncores` processes operate at the same time
:param jobfunc: Job to do
:param iterator:
Iterator over different parameter settings,
contains a lock and a queue
:param ncores:
Number of processes operating at the same time
"""
keep_running=True
process_dict = {} # Dict containing all subprocees
while len(process_dict)>0 or keep_running:
terminated_procs_pids = []
# First check if some processes did finish their job
for pid, proc in process_dict.iteritems():
# Remember the terminated processes
if not proc.is_alive():
terminated_procs_pids.append(pid)
# And delete these from the process dict
for terminated_proc in terminated_procs_pids:
process_dict.pop(terminated_proc)
# If we have less active processes than ncores and there is still
# a job to do, add another process
if len(process_dict) < ncores and keep_running:
try:
task = iterator.next()
proc = mp.Process(target=jobfunc,
args=(task,))
proc.start()
process_dict[proc.pid]=proc
except StopIteration:
# All tasks have been started
keep_running=False
time.sleep(0.1)
def main():
"""Runs 1 out of 4 different multiprocessing scenarios"""
start_scenario(1)
if __name__ == ''__main__'':
main()
multiprocessing.Lock
se implementa utilizando un objeto Semaphore proporcionado por el sistema operativo. En Linux, el niño simplemente hereda un identificador para el semáforo del padre a través de os.fork
. Esta no es una copia del semáforo; en realidad está heredando el mismo identificador que el padre, de la misma manera que los descriptores de archivos pueden heredarse. Windows, por otro lado, no es compatible con os.fork
, por lo que tiene que desenterrar el Lock
. Lo hace creando un identificador duplicado para el semáforo de Windows utilizado internamente por el objeto multiprocessing.Lock
, utilizando la API Windows DuplicateHandle
, que establece:
El identificador duplicado se refiere al mismo objeto que el identificador original. Por lo tanto, cualquier cambio en el objeto se refleja a través de ambas manijas
La API DuplicateHandle
permite ceder la propiedad del identificador duplicado al proceso hijo, para que el proceso hijo pueda usarlo después de deshacerte de él. Al crear un identificador duplicado propiedad del niño, puede "compartir" efectivamente el objeto de bloqueo.
Aquí está el objeto de semáforo en multiprocessing/synchronize.py
class SemLock(object):
def __init__(self, kind, value, maxvalue):
sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
debug(''created semlock with handle %s'' % sl.handle)
self._make_methods()
if sys.platform != ''win32'':
def _after_fork(obj):
obj._semlock._after_fork()
register_after_fork(self, _after_fork)
def _make_methods(self):
self.acquire = self._semlock.acquire
self.release = self._semlock.release
self.__enter__ = self._semlock.__enter__
self.__exit__ = self._semlock.__exit__
def __getstate__(self): # This is called when you try to pickle the `Lock`.
assert_spawning(self)
sl = self._semlock
return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
def __setstate__(self, state): # This is called when unpickling a `Lock`
self._semlock = _multiprocessing.SemLock._rebuild(*state)
debug(''recreated blocker with handle %r'' % state[0])
self._make_methods()
Tenga en cuenta la llamada assert_spawning
en __getstate__
, que se llama cuando se decapa el objeto. Así es como se implementa eso:
#
# Check that the current thread is spawning a child process
#
def assert_spawning(self):
if not Popen.thread_is_spawning():
raise RuntimeError(
''%s objects should only be shared between processes''
'' through inheritance'' % type(self).__name__
)
Esa función es la que asegura que estás "heredando" el Lock
, llamando a thread_is_spawning
. En Linux, ese método simplemente devuelve False
:
@staticmethod
def thread_is_spawning():
return False
Esto se debe a que Linux no necesita agrupar para heredar Lock
, por lo que si __getstate__
se está llamando en Linux, no debemos heredarlo. En Windows, hay más cosas en juego:
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
class Popen(object):
''''''
Start a subprocess to run the code of a process object
''''''
_tls = thread._local()
def __init__(self, process_obj):
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, ''wb'')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
@staticmethod
def thread_is_spawning():
return getattr(Popen._tls, ''process_handle'', None) is not None
Aquí, thread_is_spawning
devuelve True
si el objeto Popen._tls
tiene un atributo process_handle
. Podemos ver que el atributo process_handle
se crea en __init__
, luego los datos que queremos heredar pasan del padre al hijo usando dump
, y luego se elimina el atributo. Entonces thread_is_spawning
solo será True
durante __init__
. De acuerdo con este hilo de la lista de correo de python-ideas , esto es en realidad una limitación artificial añadida para simular el mismo comportamiento que os.fork
en Linux. En realidad, Windows podría admitir pasar el Lock
en cualquier momento, ya que DuplicateHandle
se puede ejecutar en cualquier momento.
Todo lo anterior se aplica al objeto Queue
porque usa Lock
internamente.
Diría que heredar objetos de Lock
es preferible a usar un Lock
Manager.Lock()
, porque cuando utilizas un Lock
Manager
, cada llamada que realizas al Lock
debe enviarse a través del IPC al proceso de Manager
, que será mucho más lento que usar un Lock
compartido que vive dentro del proceso de llamada. Sin embargo, ambos enfoques son perfectamente válidos.
Finalmente, es posible pasar un Lock
a todos los miembros de un grupo sin usar un Manager
, usando los argumentos de la palabra clave initializer
/ initargs
:
lock = None
def initialize_lock(l):
global lock
lock = l
def scenario_1_pool_no_manager(jobfunc, args, ncores):
"""Runs a pool of processes WITHOUT a Manager for the lock and queue.
"""
lock = mp.Lock()
mypool = mp.Pool(ncores, initializer=initialize_lock, initargs=(lock,))
queue = mp.Queue()
iterator = make_iterator(args, queue)
mypool.imap(jobfunc, iterator) # Don''t pass lock. It has to be used as a global in the child. (This means `jobfunc` would need to be re-written slightly.
mypool.close()
mypool.join()
return read_queue(queue)
Esto funciona porque los argumentos pasados a initargs
pasan al método __init__
de los objetos Process
que se ejecutan dentro del Pool
, por lo que terminan siendo heredados, en lugar de conservados.