python - imports - ¿Cómo pruebas tu unidad una tarea de Apio?
celery timezone (8)
prueba de unidad
import unittest
from myproject.myapp import celeryapp
class TestMyCeleryWorker(unittest.TestCase):
def setUp(self):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
accesorios py.test
# conftest.py
from myproject.myapp import celeryapp
@pytest.fixture(scope=''module'')
def celery_app(request):
celeryapp.conf.update(CELERY_ALWAYS_EAGER=True)
return celeryapp
# test_tasks.py
def test_some_task(celery_app):
...
Adición: hacer que send_task respete ansioso
from celery import current_app
def send_task(name, args=(), kwargs={}, **opts):
# https://github.com/celery/celery/issues/581
task = current_app.tasks[name]
return task.apply(args, kwargs, **opts)
current_app.send_task = send_task
La documentación de Aplery menciona la prueba de Apio en Django, pero no explica cómo probar una tarea de Apio si no está usando Django. ¿Cómo haces esto?
A partir de Celery 3.0 , una forma de establecer CELERY_ALWAYS_EAGER
en Django es:
from django.test import TestCase, override_settings
from .foo import foo_celery_task
class MyTest(TestCase):
@override_settings(CELERY_ALWAYS_EAGER=True)
def test_foo(self):
self.assertTrue(foo_celery_task.delay())
Depende de qué es exactamente lo que quiere probar.
- Pruebe el código de la tarea directamente. No llame a "task.delay (...)" simplemente llame a "task (...)" desde las pruebas de su unidad.
- Use CELERY_ALWAYS_EAGER . Esto hará que sus tareas sean llamadas inmediatamente en el momento en que diga "task.delay (...)", para que pueda probar la ruta completa (pero no cualquier comportamiento asincrónico).
Desde Celery v4.0 , los accesorios de py.test se provided para iniciar un trabajador de apio solo para la prueba y el apagado cuando finalice:
def test_myfunc_is_executed(celery_session_worker):
# celery_session_worker: <Worker: [email protected] (running)>
assert myfunc.delay().wait(3)
Entre otros dispositivos descritos en provided , puede cambiar las opciones predeterminadas de apio redefiniendo el accesorio celery_config
esta manera:
@pytest.fixture(scope=''session'')
def celery_config():
return {
''accept_content'': [''json'', ''pickle''],
''result_serializer'': ''pickle'',
}
De forma predeterminada, el trabajador de prueba utiliza un intermediario en memoria y un back-end de resultados. No es necesario utilizar un Redis o RabbitMQ local si no prueba funciones específicas.
En mi caso (y supongo que muchos otros), todo lo que quería era probar la lógica interna de una tarea usando Pytest.
TL; DR; terminó burlándose de todo ( OPCIÓN 2 )
Ejemplo de caso de uso :
proj/tasks.py
@shared_task(bind=True)
def add_task(self, a, b):
return a+b;
tests/test_tasks.py
from proj import add_task
def test_add():
assert add_task(1, 2) == 3, ''1 + 2 should equal 3''
pero, dado que el decorador shared_task
hace mucha lógica interna de apio, no es realmente una unidad de prueba.
Entonces, para mí, había 2 opciones:
OPCIÓN 1: lógica interna separada
proj/tasks_logic.py
def internal_add(a, b):
return a + b;
proj/tasks.py
from .tasks_logic import internal_add
@shared_task(bind=True)
def add_task(self, a, b):
return internal_add(a, b);
Esto se ve muy extraño, y aparte de hacerlo menos legible, se requiere extraer y pasar manualmente los atributos que son parte de la solicitud, por ejemplo el task_id
en caso de que lo necesite, lo que hace que la lógica sea menos pura.
OPCIÓN 2: se burla
burlarse de las partes internas del apio
tests/__init__.py
# noinspection PyUnresolvedReferences
from celery import shared_task
from mock import patch
def mock_signature(**kwargs):
return {}
def mocked_shared_task(*decorator_args, **decorator_kwargs):
def mocked_shared_decorator(func):
func.signature = func.si = func.s = mock_signature
return func
return mocked_shared_decorator
patch(''celery.shared_task'', mocked_shared_task).start()
que luego me permite simular el objeto de solicitud (nuevamente, en caso de que necesite cosas de la solicitud, como el id o el contador de reintentos).
tests/test_tasks.py
from proj import add_task
class MockedRequest:
def __init__(self, id=None):
self.id = id or 1
class MockedTask:
def __init__(self, id=None):
self.request = MockedRequest(id=id)
def test_add():
mocked_task = MockedTask(id=3)
assert add_task(mocked_task, 1, 2) == 3, ''1 + 2 should equal 3''
Esta solución es mucho más manual, pero me da el control que necesito para probar la unidad , sin repetirme, y sin perder el alcance del apio.
Es posible probar tareas de forma sincrónica usando cualquier lib de prueba de unidad. Normalmente hago 2 sesiones de prueba diferentes cuando trabajo con tareas de apio. El primero (como sugiero a continuación) es completamente sincrónico y debería ser el que asegure que el algoritmo haga lo que debería hacer. La segunda sesión utiliza todo el sistema (incluido el intermediario) y se asegura de que no tenga problemas de serialización ni ninguna otra distribución, problema de comunicación.
Asi que:
from celery import Celery
celery = Celery()
@celery.task
def add(x, y):
return x + y
Y tu prueba:
from nose.tools import eq_
def test_add_task():
rst = add.apply(args=(4, 4)).get()
eq_(rst, 8)
¡Espero que ayude!
Para aquellos en Apio 4 es:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
Debido a que los nombres de las configuraciones se han modificado y necesitan actualización si elige actualizar, consulte
http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#lowercase-setting-names
Yo uso esto:
with mock.patch(''celeryconfig.CELERY_ALWAYS_EAGER'', True, create=True):
...
Documentos: http://docs.celeryproject.org/en/3.1/configuration.html#celery-always-eager
CELERY_ALWAYS_EAGER te permite ejecutar tu tarea sincrónicamente y no necesitas un servidor de apio.