unittest - python pruebas
Pruebas unitarias con django-apio? (4)
Aquí hay un extracto de mi clase base de prueba que apply_async
método apply_async
y los registros de las llamadas a él (que incluye Task.delay
). Es un poco asqueroso, pero se ha adaptado a mis necesidades en los últimos meses. He estado usándolo
from django.test import TestCase
from celery.task.base import Task
# For recent versions, Task has been moved to celery.task.app:
# from celery.app.task import Task
# See http://docs.celeryproject.org/en/latest/reference/celery.app.task.html
class CeleryTestCaseBase(TestCase):
def setUp(self):
super(CeleryTestCaseBase, self).setUp()
self.applied_tasks = []
self.task_apply_async_orig = Task.apply_async
@classmethod
def new_apply_async(task_class, args=None, kwargs=None, **options):
self.handle_apply_async(task_class, args, kwargs, **options)
# monkey patch the regular apply_sync with our method
Task.apply_async = new_apply_async
def tearDown(self):
super(CeleryTestCaseBase, self).tearDown()
# Reset the monkey patch to the original method
Task.apply_async = self.task_apply_async_orig
def handle_apply_async(self, task_class, args=None, kwargs=None, **options):
self.applied_tasks.append((task_class, tuple(args), kwargs))
def assert_task_sent(self, task_class, *args, **kwargs):
was_sent = any(task_class == task[0] and args == task[1] and kwargs == task[2]
for task in self.applied_tasks)
self.assertTrue(was_sent, ''Task not called w/class %s and args %s'' % (task_class, args))
def assert_task_not_sent(self, task_class):
was_sent = any(task_class == task[0] for task in self.applied_tasks)
self.assertFalse(was_sent, ''Task was not expected to be called, but was. Applied tasks: %s'' % self.applied_tasks)
Aquí hay un ejemplo "fuera de lo común" de cómo lo usarías en tus casos de prueba:
mymodule.py
from my_tasks import SomeTask
def run_some_task(should_run):
if should_run:
SomeTask.delay(1, some_kwarg=2)
test_mymodule.py
class RunSomeTaskTest(CeleryTestCaseBase):
def test_should_run(self):
run_some_task(should_run=True)
self.assert_task_sent(SomeTask, 1, some_kwarg=2)
def test_should_not_run(self):
run_some_task(should_run=False)
self.assert_task_not_sent(SomeTask)
Estoy tratando de encontrar una metodología de prueba para nuestro proyecto django-celery . He leído las notas en la documentation , pero no me dio una buena idea de qué hacer en realidad. No estoy preocupado por probar las tareas en los daemons reales, solo la funcionalidad de mi código. Principalmente me pregunto:
- ¿Cómo podemos pasar
task.delay()
altotask.delay()
durante la prueba (intenté configurarCELERY_ALWAYS_EAGER = True
pero noCELERY_ALWAYS_EAGER = True
)? - ¿Cómo utilizamos la configuración de prueba que se recomienda (si es la mejor) sin cambiar realmente nuestra configuración.py?
- ¿Todavía podemos usar la
manage.py test
o tenemos que usar un corredor personalizado?
En general, cualquier sugerencia o consejo para probar con apio sería muy útil.
Intenta configurar:
BROKER_BACKEND = ''memory''
(Gracias al comentario de asksol )
Me gusta usar el decorador override_settings en las pruebas que necesitan resultados de apio para completar.
from django.test import TestCase
from django.test.utils import override_settings
from myapp.tasks import mytask
class AddTestCase(TestCase):
@override_settings(CELERY_EAGER_PROPAGATES_EXCEPTIONS=True,
CELERY_ALWAYS_EAGER=True,
BROKER_BACKEND=''memory'')
def test_mytask(self):
result = mytask.delay()
self.assertTrue(result.successful())
Si desea aplicar esto a todas las pruebas, puede utilizar el corredor de prueba de apio como se describe en http://docs.celeryproject.org/en/2.5/django/unit-testing.html que básicamente establece la misma configuración, excepto ( BROKER_BACKEND = ''memory''
).
En la configuración de:
TEST_RUNNER = ''djcelery.contrib.test_runner.CeleryTestSuiteRunner''
Mira la fuente de CeleryTestSuiteRunner y está bastante claro lo que está sucediendo.
ya que todavía veo esto en los resultados de búsqueda, la configuración se anula con
TEST_RUNNER = ''djcelery.contrib.test_runner.CeleryTestSuiteRunner''
trabajado para mí según http://docs.celeryproject.org/en/2.5/django/unit-testing.html