sincronizar - stop thread python
Python Asyncio, cómo crear y cancelar tareas desde otro hilo. (4)
Creo que es posible que deba hacer que su método add_task
consciente de si se está llamando desde un subproceso distinto al del bucle de eventos. De esa manera, si se llama desde el mismo hilo, puede llamar directamente a asyncio.async
, de lo contrario, puede hacer un trabajo adicional para pasar la tarea del hilo del bucle al hilo de llamada. Aquí hay un ejemplo:
import time
import asyncio
import functools
from threading import Thread, current_thread, Event
from concurrent.futures import Future
class B(Thread):
def __init__(self, start_event):
Thread.__init__(self)
self.loop = None
self.tid = None
self.event = start_event
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.tid = current_thread()
self.loop.call_soon(self.event.set)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
def _async_add(func, fut):
try:
ret = func()
fut.set_result(ret)
except Exception as e:
fut.set_exception(e)
f = functools.partial(asyncio.async, coro, loop=self.loop)
if current_thread() == self.tid:
return f() # We can call directly if we''re not going between threads.
else:
# We''re in a non-event loop thread so we use a Future
# to get the task from the event loop thread once
# it''s ready.
fut = Future()
self.loop.call_soon_threadsafe(_async_add, f, fut)
return fut.result()
def cancel_task(self, task):
self.loop.call_soon_threadsafe(task.cancel)
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
event = Event()
b = B(event)
b.start()
event.wait() # Let the loop''s thread signal us, rather than sleeping
t = b.add_task(test()) # This is a real task
time.sleep(10)
b.stop()
Primero, guardamos el ID de hilo del bucle de eventos en el método de run
, para poder averiguar si las llamadas a add_task
provienen de otros hilos más tarde. Si se llama a add_task
desde un subproceso de bucle que no es de evento, usamos call_soon_threadsafe
para llamar a una función que programará la rutina y luego usaremos un concurrent.futures.Future
para devolver la tarea al subproceso de llamada, que espera el resultado del Future
.
Una nota sobre la cancelación de una tarea: Cuando llame a cancel
en una Task
, se generará un CancelledError
en la ruta la próxima vez que se ejecute el bucle de eventos. Esto significa que la rutina que se está ajustando a la tarea se cancelará debido a la excepción la próxima vez que alcance un punto de rendimiento, a menos que la rutina detecte el error CancelledError
y evite que se aborte. También tenga en cuenta que esto solo funciona si la función que se está envolviendo es en realidad una coroutina interrumpible; un asyncio.Future
devuelto por BaseEventLoop.run_in_executor
, por ejemplo, no se puede cancelar realmente, porque en realidad está envuelto alrededor de un concurrent.futures.Future
, y esos no pueden cancelarse una vez que su función subyacente realmente comienza a ejecutarse. En esos casos, el asyncio.Future
dirá que se canceló, pero la función que se ejecuta en el ejecutor continuará ejecutándose.
Edición: Se actualizó el primer ejemplo para usar concurrent.futures.Future
, en lugar de una queue.Queue
, según la sugerencia de Andrew Svetlov.
Nota: asyncio.async
está en desuso debido a que la versión 3.4.4 usa asyncio.ensure_future
en asyncio.ensure_future
lugar.
Tengo una aplicación multi-hilo de python. Quiero ejecutar un bucle de asyncio en un hilo y publicar calbacks y coroutines desde otro hilo. Debería ser fácil, pero no puedo asyncio la cabeza con las cosas de asyncio .
Llegué a la siguiente solución que hace la mitad de lo que quiero, siéntase libre de comentar cualquier cosa:
import asyncio
from threading import Thread
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop) #why do I need that??
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def add_task(self, coro):
"""this method should return a task object, that I
can cancel, not a handle"""
f = functools.partial(self.loop.create_task, coro)
return self.loop.call_soon_threadsafe(f)
def cancel_task(self, xx):
#no idea
@asyncio.coroutine
def test():
while True:
print("running")
yield from asyncio.sleep(1)
b.start()
time.sleep(1) #need to wait for loop to start
t = b.add_task(test())
time.sleep(10)
#here the program runs fine but how can I cancel the task?
b.stop()
Así que iniciar y detener el bucle funciona bien. Pensé en crear una tarea usando create_task, pero ese método no es seguro para hilos, así que lo envolví en call_soon_threadsafe. Pero me gustaría poder obtener el objeto de tarea para poder cancelar la tarea. Podría hacer una cosa complicada usando Futuro y Condición, pero debe haber una forma más simple, ¿no es así?
Desde la versión 3.4.4, asyncio
proporciona una función llamada run_coroutine_threadsafe para enviar un objeto de rutina de un hilo a un bucle de eventos. Devuelve un concurrent.futures.Future para acceder al resultado o cancelar la tarea.
Usando tu ejemplo:
@asyncio.coroutine
def test(loop):
try:
while True:
print("Running")
yield from asyncio.sleep(1, loop=loop)
except asyncio.CancelledError:
print("Cancelled")
loop.stop()
raise
loop = asyncio.new_event_loop()
thread = threading.Thread(target=loop.run_forever)
future = asyncio.run_coroutine_threadsafe(test(loop), loop)
thread.start()
time.sleep(5)
future.cancel()
thread.join()
Tu haces todo bien. Para detener tareas, hacer método.
class B(Thread):
# ...
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)
Por cierto, tiene que configurar un bucle de eventos para el subproceso creado explícitamente por
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
porque asyncio
crea un bucle de eventos implícito solo para el hilo principal.
solo como referencia aquí, el código que finalmente implementé en base a la ayuda que recibí en este sitio, es más sencillo ya que no necesitaba todas las funciones. ¡gracias de nuevo!
import asyncio
from threading import Thread
from concurrent.futures import Future
import functools
class B(Thread):
def __init__(self):
Thread.__init__(self)
self.loop = None
def run(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def stop(self):
self.loop.call_soon_threadsafe(self.loop.stop)
def _add_task(self, future, coro):
task = self.loop.create_task(coro)
future.set_result(task)
def add_task(self, coro):
future = Future()
p = functools.partial(self._add_task, future, coro)
self.loop.call_soon_threadsafe(p)
return future.result() #block until result is available
def cancel(self, task):
self.loop.call_soon_threadsafe(task.cancel)