paralelismo multitarea multihilos ejemplos python callback parallel-processing multiprocessing

multihilos - multitarea en python



¿Quién ejecuta la devolución de llamada cuando se utiliza el método apply_async de un grupo de multiprocesamiento? (1)

De hecho, hay una pista en los documentos:

la devolución de llamada debe completarse inmediatamente, ya que de lo contrario se bloqueará el hilo que maneja los resultados .

Las devoluciones de llamada se manejan en el proceso principal, pero se ejecutan en su propio hilo separado . Cuando creas un Pool en realidad crea algunos objetos Thread internamente:

class Pool(object): Process = Process def __init__(self, processes=None, initializer=None, initargs=(), maxtasksperchild=None): self._setup_queues() self._taskqueue = Queue.Queue() self._cache = {} ... # stuff we don''t care about self._worker_handler = threading.Thread( target=Pool._handle_workers, args=(self, ) ) self._worker_handler.daemon = True self._worker_handler._state = RUN self._worker_handler.start() self._task_handler = threading.Thread( target=Pool._handle_tasks, args=(self._taskqueue, self._quick_put, self._outqueue, self._pool, self._cache) ) self._task_handler.daemon = True self._task_handler._state = RUN self._task_handler.start() self._result_handler = threading.Thread( target=Pool._handle_results, args=(self._outqueue, self._quick_get, self._cache) ) self._result_handler.daemon = True self._result_handler._state = RUN self._result_handler.start()

El hilo interesante para nosotros es _result_handler ; vamos a llegar a por qué en breve.

Al cambiar de marcha por un segundo, cuando ejecuta apply_async , crea un objeto ApplyResult internamente para administrar el resultado del hijo:

def apply_async(self, func, args=(), kwds={}, callback=None): assert self._state == RUN result = ApplyResult(self._cache, callback) self._taskqueue.put(([(result._job, None, func, args, kwds)], None)) return result class ApplyResult(object): def __init__(self, cache, callback): self._cond = threading.Condition(threading.Lock()) self._job = job_counter.next() self._cache = cache self._ready = False self._callback = callback cache[self._job] = self def _set(self, i, obj): self._success, self._value = obj if self._callback and self._success: self._callback(self._value) self._cond.acquire() try: self._ready = True self._cond.notify() finally: self._cond.release() del self._cache[self._job]

Como puede ver, el método _set es el que termina ejecutando la callback pasada, asumiendo que la tarea fue exitosa. También __init__ cuenta que se agrega a un __init__ de cache global al final de __init__ .

Ahora, de vuelta al objeto de hilo _result_handler . Ese objeto llama a la función _handle_results , que se parece a esto:

while 1: try: task = get() except (IOError, EOFError): debug(''result handler got EOFError/IOError -- exiting'') return if thread._state: assert thread._state == TERMINATE debug(''result handler found thread._state=TERMINATE'') break if task is None: debug(''result handler got sentinel'') break job, i, obj = task try: cache[job]._set(i, obj) # Here is _set (and therefore our callback) being called! except KeyError: pass # More stuff

Es un bucle que solo saca los resultados de los niños de la cola, encuentra la entrada para él en el cache y llama a _set , que ejecuta nuestra devolución de llamada. Puede ejecutarse aunque esté en un bucle porque no se está ejecutando en el subproceso principal.

Estoy tratando de entender un poco de lo que sucede detrás de las escenas cuando se utiliza el método apply_sync de un grupo de multiprocesamiento.

¿Quién ejecuta el método de devolución de llamada? ¿Es el proceso principal que se llama apply_async?

Digamos que envío un montón de comandos apply_async con devoluciones de llamada y luego continúo con mi programa. Mi programa todavía está haciendo cosas cuando el apply_async comienza a terminar. ¿Cómo se ejecuta la devolución de llamada en mi "proceso principal" mientras el proceso principal todavía está ocupado con el script?

Aquí hay un ejemplo.

import multiprocessing import time def callback(x): print ''{} running callback with arg {}''.format(multiprocessing.current_process().name, x) def func(x): print ''{} running func with arg {}''.format(multiprocessing.current_process().name, x) return x pool = multiprocessing.Pool() args = range(20) for a in args: pool.apply_async(func, (a,), callback=callback) print ''{} going to sleep for a minute''.format(multiprocessing.current_process().name) t0 = time.time() while time.time() - t0 < 60: pass print ''Finished with the script''

La salida es algo así como

PoolWorker-1 ejecuta func con arg 0

PoolWorker-2 ejecuta func con arg 1

PoolWorker-3 ejecuta func con arg 2

MainProcess va a dormir por un minuto <- el proceso principal está ocupado

PoolWorker-4 ejecuta func con arg 3

PoolWorker-1 ejecuta func con arg 4

PoolWorker-2 ejecuta func con arg 5

PoolWorker-3 ejecuta func con arg 6

PoolWorker-4 ejecuta func con arg 7

MainProcess ejecuta la devolución de llamada con arg 0 <- ¡el proceso principal ejecuta la devolución de llamada mientras aún está en el bucle while!

MainProcess ejecutando callback con arg 1

MainProcess ejecutando callback con arg 2

MainProcess ejecutando callback con arg 3

MainProcess ejecutando callback con arg 4

PoolWorker-1 ejecuta func con arg 8

...

Terminado con guión

¿Cómo está ejecutando MainProcess la devolución de llamada mientras está en medio de ese bucle while?

Hay una declaración sobre la devolución de llamada en la documentación para multiprocessing.Pool que parece una sugerencia pero no la entiendo.

apply_async (func [, args [, kwds [, callback]]])

Una variante del método apply () que devuelve un objeto de resultado.

Si se especifica la devolución de llamada, debe ser una llamada que acepte un solo argumento. Cuando el resultado esté listo, se le aplicará una devolución de llamada (a menos que la llamada haya fallado). la devolución de llamada debe completarse inmediatamente, ya que de lo contrario se bloqueará el hilo que maneja los resultados.