test imports celeryd_task_soft_time_limit backend_cleanup python unit-testing celery

python - imports - ¿Cómo pruebas tu unidad una tarea de Apio?



celery timezone (8)

prueba de unidad

import unittest from myproject.myapp import celeryapp class TestMyCeleryWorker(unittest.TestCase): def setUp(self): celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)

accesorios py.test

# conftest.py from myproject.myapp import celeryapp @pytest.fixture(scope=''module'') def celery_app(request): celeryapp.conf.update(CELERY_ALWAYS_EAGER=True) return celeryapp # test_tasks.py def test_some_task(celery_app): ...

Adición: hacer que send_task respete ansioso

from celery import current_app def send_task(name, args=(), kwargs={}, **opts): # https://github.com/celery/celery/issues/581 task = current_app.tasks[name] return task.apply(args, kwargs, **opts) current_app.send_task = send_task

La documentación de Aplery menciona la prueba de Apio en Django, pero no explica cómo probar una tarea de Apio si no está usando Django. ¿Cómo haces esto?


A partir de Celery 3.0 , una forma de establecer CELERY_ALWAYS_EAGER en Django es:

from django.test import TestCase, override_settings from .foo import foo_celery_task class MyTest(TestCase): @override_settings(CELERY_ALWAYS_EAGER=True) def test_foo(self): self.assertTrue(foo_celery_task.delay())


Depende de qué es exactamente lo que quiere probar.

  • Pruebe el código de la tarea directamente. No llame a "task.delay (...)" simplemente llame a "task (...)" desde las pruebas de su unidad.
  • Use CELERY_ALWAYS_EAGER . Esto hará que sus tareas sean llamadas inmediatamente en el momento en que diga "task.delay (...)", para que pueda probar la ruta completa (pero no cualquier comportamiento asincrónico).

Desde Celery v4.0 , los accesorios de py.test se provided para iniciar un trabajador de apio solo para la prueba y el apagado cuando finalice:

def test_myfunc_is_executed(celery_session_worker): # celery_session_worker: <Worker: [email protected] (running)> assert myfunc.delay().wait(3)

Entre otros dispositivos descritos en provided , puede cambiar las opciones predeterminadas de apio redefiniendo el accesorio celery_config esta manera:

@pytest.fixture(scope=''session'') def celery_config(): return { ''accept_content'': [''json'', ''pickle''], ''result_serializer'': ''pickle'', }

De forma predeterminada, el trabajador de prueba utiliza un intermediario en memoria y un back-end de resultados. No es necesario utilizar un Redis o RabbitMQ local si no prueba funciones específicas.


En mi caso (y supongo que muchos otros), todo lo que quería era probar la lógica interna de una tarea usando Pytest.

TL; DR; terminó burlándose de todo ( OPCIÓN 2 )

Ejemplo de caso de uso :

proj/tasks.py

@shared_task(bind=True) def add_task(self, a, b): return a+b;

tests/test_tasks.py

from proj import add_task def test_add(): assert add_task(1, 2) == 3, ''1 + 2 should equal 3''

pero, dado que el decorador shared_task hace mucha lógica interna de apio, no es realmente una unidad de prueba.

Entonces, para mí, había 2 opciones:

OPCIÓN 1: lógica interna separada

proj/tasks_logic.py

def internal_add(a, b): return a + b;

proj/tasks.py

from .tasks_logic import internal_add @shared_task(bind=True) def add_task(self, a, b): return internal_add(a, b);

Esto se ve muy extraño, y aparte de hacerlo menos legible, se requiere extraer y pasar manualmente los atributos que son parte de la solicitud, por ejemplo el task_id en caso de que lo necesite, lo que hace que la lógica sea menos pura.

OPCIÓN 2: se burla
burlarse de las partes internas del apio

tests/__init__.py

# noinspection PyUnresolvedReferences from celery import shared_task from mock import patch def mock_signature(**kwargs): return {} def mocked_shared_task(*decorator_args, **decorator_kwargs): def mocked_shared_decorator(func): func.signature = func.si = func.s = mock_signature return func return mocked_shared_decorator patch(''celery.shared_task'', mocked_shared_task).start()

que luego me permite simular el objeto de solicitud (nuevamente, en caso de que necesite cosas de la solicitud, como el id o el contador de reintentos).

tests/test_tasks.py

from proj import add_task class MockedRequest: def __init__(self, id=None): self.id = id or 1 class MockedTask: def __init__(self, id=None): self.request = MockedRequest(id=id) def test_add(): mocked_task = MockedTask(id=3) assert add_task(mocked_task, 1, 2) == 3, ''1 + 2 should equal 3''

Esta solución es mucho más manual, pero me da el control que necesito para probar la unidad , sin repetirme, y sin perder el alcance del apio.


Es posible probar tareas de forma sincrónica usando cualquier lib de prueba de unidad. Normalmente hago 2 sesiones de prueba diferentes cuando trabajo con tareas de apio. El primero (como sugiero a continuación) es completamente sincrónico y debería ser el que asegure que el algoritmo haga lo que debería hacer. La segunda sesión utiliza todo el sistema (incluido el intermediario) y se asegura de que no tenga problemas de serialización ni ninguna otra distribución, problema de comunicación.

Asi que:

from celery import Celery celery = Celery() @celery.task def add(x, y): return x + y

Y tu prueba:

from nose.tools import eq_ def test_add_task(): rst = add.apply(args=(4, 4)).get() eq_(rst, 8)

¡Espero que ayude!