python - ¿Cómo combinar el apio con el asyncio?
python-3.x asynchronous (3)
Eso será posible a partir de la versión 5.0 de Celery como se indica en el sitio oficial:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
- La próxima versión principal de Celery solo será compatible con Python 3.5, donde estamos planeando aprovechar la nueva biblioteca de asyncio.
- Eliminar el soporte para Python 2 nos permitirá eliminar cantidades masivas de código de compatibilidad, y usar Python 3.5 nos permite aprovechar las ventajas de escribir, async / await, asyncio y conceptos similares para los que no hay alternativa en las versiones anteriores.
Los anteriores fueron citados del enlace anterior.
¡Así que lo mejor es esperar a que se distribuya la versión 5.0 !
Mientras tanto, feliz codificación :)
¿Cómo puedo crear un contenedor que haga que las tareas de apio se vean como asyncio.Task
? ¿O hay una mejor manera de integrar Celery con asyncio
?
@asksol, el creador de Celery, dijo esto :
Es bastante común usar Celery como una capa distribuida en la parte superior de los marcos de E / S asíncronos (sugerencia principal: enrutar las tareas vinculadas a la CPU a un trabajador de pre-entrenamiento significa que no bloquearán su ciclo de eventos).
Pero no pude encontrar ningún ejemplo de código específicamente para el framework asyncio
.
La manera más limpia que he encontrado para hacer esto es envolver la función async
en asgiref.sync.async_to_sync
(desde asgiref
):
from asgiref.sync import async_to_sync
from celery.task import periodic_task
async def return_hello():
await sleep(1)
return ''hello''
@periodic_task(
run_every=2,
name=''return_hello'',
)
def task_return_hello():
async_to_sync(return_hello)()
Saqué este ejemplo de una publicación de blog que escribí.
Puede envolver cualquier llamada de bloqueo en una tarea usando run_in_executor
como se describe en la documentation , también agregué en el ejemplo un timeout personalizado:
def run_async_task(
target,
*args,
timeout = 60,
**keywords
) -> Future:
loop = asyncio.get_event_loop()
return asyncio.wait_for(
loop.run_in_executor(
executor,
functools.partial(target, *args, **keywords)
),
timeout=timeout,
loop=loop
)
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
run_async_task, your_task.delay, some_arg, some_karg=""
)
result = loop.run_until_complete(
run_async_task, async_result.result
)