python executor event-loop python-asyncio

python - asyncio: ¿Es posible cancelar un futuro dirigido por un ejecutor?



executor event-loop (2)

Como los subprocesos comparten el mismo espacio de direcciones de memoria de un proceso, no hay una forma segura de terminar un subproceso en ejecución. Esta es la razón por la cual la mayoría de los lenguajes de programación no permiten matar hilos en ejecución (hay muchos trucos feos en torno a esta limitación).

Java lo aprendió por las malas .

Una solución consistiría en ejecutar su función en un proceso separado en lugar de un hilo y terminarla con gracia.

La biblioteca Pebble ofrece una interfaz similar al concurrent.futures admite la ejecución de Futures a cancelar.

from pebble import ProcessPool def function(foo, bar=0): return foo + bar with ProcessPool() as pool: future = pool.schedule(function, args=[1]) # if running, the container process will be terminated # a new process will be started consuming the next task future.cancel()

Me gustaría iniciar una función de bloqueo en un Ejecutor usando la llamada asincio loop.run_in_executor y luego cancelarla más tarde, pero eso no parece funcionar para mí.

Aquí está el código:

import asyncio import time from concurrent.futures import ThreadPoolExecutor def blocking_func(seconds_to_block): for i in range(seconds_to_block): print(''blocking {}/{}''.format(i, seconds_to_block)) time.sleep(1) print(''done blocking {}''.format(seconds_to_block)) @asyncio.coroutine def non_blocking_func(seconds): for i in range(seconds): print(''yielding {}/{}''.format(i, seconds)) yield from asyncio.sleep(1) print(''done non blocking {}''.format(seconds)) @asyncio.coroutine def main(): non_blocking_futures = [non_blocking_func(x) for x in range(1, 4)] blocking_future = loop.run_in_executor(None, blocking_func, 5) print(''wait a few seconds!'') yield from asyncio.sleep(1.5) blocking_future.cancel() yield from asyncio.wait(non_blocking_futures) loop = asyncio.get_event_loop() executor = ThreadPoolExecutor(max_workers=1) loop.set_default_executor(executor) asyncio.async(main()) loop.run_forever()

Esperaría que el código anterior solo permita la salida de la función de bloqueo:

blocking 0/5 blocking 1/5

y luego ver la salida de la función sin bloqueo. Pero en cambio, el futuro de bloqueo continúa incluso después de que haya cancelado.

¿Es posible? ¿Hay alguna otra forma de hacerlo?

Gracias

Editar: más discusión sobre cómo ejecutar el código de bloqueo y no bloqueo utilizando asyncio: Cómo interactuar el código de bloqueo y no bloqueo con asyncio


En este caso, no hay forma de cancelar el Future una vez que realmente ha comenzado a ejecutarse, porque confía en el comportamiento de concurrent.futures.Future , y sus documentos indican lo siguiente :

cancel()

Intenta cancelar la llamada. Si la llamada se está ejecutando actualmente y no se puede cancelar, el método devolverá False ; de lo contrario, la llamada se cancelará y el método devolverá True .

Entonces, la única vez que la cancelación sería exitosa es si la tarea aún está pendiente dentro del Executor . Ahora, en realidad está utilizando un asyncio.Future envuelto alrededor de un concurrent.futures.Future , y en la práctica, el asyncio.Future devuelto por loop.run_in_executor() generará un CancellationError si intenta yield from después de llamar a cancel() , incluso si la tarea subyacente ya se está ejecutando. Pero, en realidad no cancelará la ejecución de la tarea dentro del Executor .

Si realmente necesita cancelar la tarea, deberá utilizar un método más convencional para interrumpir la tarea que se ejecuta en el hilo. Los detalles de cómo lo hace dependen del caso de uso. Para el caso de uso que presentó en el ejemplo, podría usar un threading.Event .

def blocking_func(seconds_to_block, event): for i in range(seconds_to_block): if event.is_set(): return print(''blocking {}/{}''.format(i, seconds_to_block)) time.sleep(1) print(''done blocking {}''.format(seconds_to_block)) ... event = threading.Event() blocking_future = loop.run_in_executor(None, blocking_func, 5, event) print(''wait a few seconds!'') yield from asyncio.sleep(1.5) blocking_future.cancel() # Mark Future as cancelled event.set() # Actually interrupt blocking_func