multihilos - multitarea en python
¿Quién ejecuta la devolución de llamada cuando se utiliza el método apply_async de un grupo de multiprocesamiento? (1)
De hecho, hay una pista en los documentos:
la devolución de llamada debe completarse inmediatamente, ya que de lo contrario se bloqueará el hilo que maneja los resultados .
Las devoluciones de llamada se manejan en el proceso principal, pero se ejecutan en su propio hilo separado . Cuando creas un Pool
en realidad crea algunos objetos Thread
internamente:
class Pool(object):
Process = Process
def __init__(self, processes=None, initializer=None, initargs=(),
maxtasksperchild=None):
self._setup_queues()
self._taskqueue = Queue.Queue()
self._cache = {}
... # stuff we don''t care about
self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self, )
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
self._worker_handler.start()
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
args=(self._taskqueue, self._quick_put, self._outqueue,
self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
self._task_handler.start()
self._result_handler = threading.Thread(
target=Pool._handle_results,
args=(self._outqueue, self._quick_get, self._cache)
)
self._result_handler.daemon = True
self._result_handler._state = RUN
self._result_handler.start()
El hilo interesante para nosotros es _result_handler
; vamos a llegar a por qué en breve.
Al cambiar de marcha por un segundo, cuando ejecuta apply_async
, crea un objeto ApplyResult
internamente para administrar el resultado del hijo:
def apply_async(self, func, args=(), kwds={}, callback=None):
assert self._state == RUN
result = ApplyResult(self._cache, callback)
self._taskqueue.put(([(result._job, None, func, args, kwds)], None))
return result
class ApplyResult(object):
def __init__(self, cache, callback):
self._cond = threading.Condition(threading.Lock())
self._job = job_counter.next()
self._cache = cache
self._ready = False
self._callback = callback
cache[self._job] = self
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
self._cond.acquire()
try:
self._ready = True
self._cond.notify()
finally:
self._cond.release()
del self._cache[self._job]
Como puede ver, el método _set
es el que termina ejecutando la callback
pasada, asumiendo que la tarea fue exitosa. También __init__
cuenta que se agrega a un __init__
de cache
global al final de __init__
.
Ahora, de vuelta al objeto de hilo _result_handler
. Ese objeto llama a la función _handle_results
, que se parece a esto:
while 1:
try:
task = get()
except (IOError, EOFError):
debug(''result handler got EOFError/IOError -- exiting'')
return
if thread._state:
assert thread._state == TERMINATE
debug(''result handler found thread._state=TERMINATE'')
break
if task is None:
debug(''result handler got sentinel'')
break
job, i, obj = task
try:
cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called!
except KeyError:
pass
# More stuff
Es un bucle que solo saca los resultados de los niños de la cola, encuentra la entrada para él en el cache
y llama a _set
, que ejecuta nuestra devolución de llamada. Puede ejecutarse aunque esté en un bucle porque no se está ejecutando en el subproceso principal.
Estoy tratando de entender un poco de lo que sucede detrás de las escenas cuando se utiliza el método apply_sync de un grupo de multiprocesamiento.
¿Quién ejecuta el método de devolución de llamada? ¿Es el proceso principal que se llama apply_async?
Digamos que envío un montón de comandos apply_async con devoluciones de llamada y luego continúo con mi programa. Mi programa todavía está haciendo cosas cuando el apply_async comienza a terminar. ¿Cómo se ejecuta la devolución de llamada en mi "proceso principal" mientras el proceso principal todavía está ocupado con el script?
Aquí hay un ejemplo.
import multiprocessing
import time
def callback(x):
print ''{} running callback with arg {}''.format(multiprocessing.current_process().name, x)
def func(x):
print ''{} running func with arg {}''.format(multiprocessing.current_process().name, x)
return x
pool = multiprocessing.Pool()
args = range(20)
for a in args:
pool.apply_async(func, (a,), callback=callback)
print ''{} going to sleep for a minute''.format(multiprocessing.current_process().name)
t0 = time.time()
while time.time() - t0 < 60:
pass
print ''Finished with the script''
La salida es algo así como
PoolWorker-1 ejecuta func con arg 0
PoolWorker-2 ejecuta func con arg 1
PoolWorker-3 ejecuta func con arg 2
MainProcess va a dormir por un minuto <- el proceso principal está ocupado
PoolWorker-4 ejecuta func con arg 3
PoolWorker-1 ejecuta func con arg 4
PoolWorker-2 ejecuta func con arg 5
PoolWorker-3 ejecuta func con arg 6
PoolWorker-4 ejecuta func con arg 7
MainProcess ejecuta la devolución de llamada con arg 0 <- ¡el proceso principal ejecuta la devolución de llamada mientras aún está en el bucle while!
MainProcess ejecutando callback con arg 1
MainProcess ejecutando callback con arg 2
MainProcess ejecutando callback con arg 3
MainProcess ejecutando callback con arg 4
PoolWorker-1 ejecuta func con arg 8
...
Terminado con guión
¿Cómo está ejecutando MainProcess la devolución de llamada mientras está en medio de ese bucle while?
Hay una declaración sobre la devolución de llamada en la documentación para multiprocessing.Pool que parece una sugerencia pero no la entiendo.
apply_async (func [, args [, kwds [, callback]]])
Una variante del método apply () que devuelve un objeto de resultado.
Si se especifica la devolución de llamada, debe ser una llamada que acepte un solo argumento. Cuando el resultado esté listo, se le aplicará una devolución de llamada (a menos que la llamada haya fallado). la devolución de llamada debe completarse inmediatamente, ya que de lo contrario se bloqueará el hilo que maneja los resultados.