python python-3.x asynchronous celery python-asyncio

python - ¿Cómo combinar el apio con el asyncio?



python-3.x asynchronous (3)

Eso será posible a partir de la versión 5.0 de Celery como se indica en el sitio oficial:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. La próxima versión principal de Celery solo será compatible con Python 3.5, donde estamos planeando aprovechar la nueva biblioteca de asyncio.
  2. Eliminar el soporte para Python 2 nos permitirá eliminar cantidades masivas de código de compatibilidad, y usar Python 3.5 nos permite aprovechar las ventajas de escribir, async / await, asyncio y conceptos similares para los que no hay alternativa en las versiones anteriores.

Los anteriores fueron citados del enlace anterior.

¡Así que lo mejor es esperar a que se distribuya la versión 5.0 !

Mientras tanto, feliz codificación :)

¿Cómo puedo crear un contenedor que haga que las tareas de apio se vean como asyncio.Task ? ¿O hay una mejor manera de integrar Celery con asyncio ?

@asksol, el creador de Celery, dijo esto :

Es bastante común usar Celery como una capa distribuida en la parte superior de los marcos de E / S asíncronos (sugerencia principal: enrutar las tareas vinculadas a la CPU a un trabajador de pre-entrenamiento significa que no bloquearán su ciclo de eventos).

Pero no pude encontrar ningún ejemplo de código específicamente para el framework asyncio .


La manera más limpia que he encontrado para hacer esto es envolver la función async en asgiref.sync.async_to_sync (desde asgiref ):

from asgiref.sync import async_to_sync from celery.task import periodic_task async def return_hello(): await sleep(1) return ''hello'' @periodic_task( run_every=2, name=''return_hello'', ) def task_return_hello(): async_to_sync(return_hello)()

Saqué este ejemplo de una publicación de blog que escribí.


Puede envolver cualquier llamada de bloqueo en una tarea usando run_in_executor como se describe en la documentation , también agregué en el ejemplo un timeout personalizado:

def run_async_task( target, *args, timeout = 60, **keywords ) -> Future: loop = asyncio.get_event_loop() return asyncio.wait_for( loop.run_in_executor( executor, functools.partial(target, *args, **keywords) ), timeout=timeout, loop=loop ) loop = asyncio.get_event_loop() async_result = loop.run_until_complete( run_async_task, your_task.delay, some_arg, some_karg="" ) result = loop.run_until_complete( run_async_task, async_result.result )