django celery kombu

django - Broker en memoria para pruebas de apio.



celery kombu (3)

Aquí hay un ejemplo más completo de un Django TransactionTestCase que funciona con Celery 4.x.

import threading from django.test import TransactionTestCase from django.db import connections from myproj.celery import app # your Celery app class CeleryTestCase(TransactionTestCase): """Test case with Celery support.""" @classmethod def setUpClass(cls): super().setUpClass() app.control.purge() cls._worker = app.Worker(app=app, pool=''solo'', concurrency=1) connections.close_all() cls._thread = threading.Thread(target=cls._worker.start) cls._thread.daemon = True cls._thread.start() @classmethod def tearDownClass(cls): cls._worker.stop() super().tearDownClass()

Tenga en cuenta que esto no cambia los nombres de las colas a colas de prueba, por lo que si también está ejecutando la aplicación, también querrá hacer eso.

Tengo una API REST escrita en Django, con un punto final que pone en cola una tarea de apio al publicar en ella. La respuesta contiene el ID de tarea que me gustaría usar para probar que la tarea se creó y obtener el resultado. Entonces, me gustaría hacer algo como:

def test_async_job(): response = self.client.post("/api/jobs/", some_test_data, format="json") task_id = response.data[''task_id''] result = my_task.AsyncResult(task_id).get() self.assertEquals(result, ...)

Obviamente, no quiero tener que dirigir a un trabajador de apio para realizar las pruebas de unidad, espero burlarme de alguna manera. No puedo usar CELERY_ALWAYS_EAGER porque eso parece pasar por alto al agente de bolsa, impidiéndome usar AsyncResult para obtener la tarea por su ID (como se indica here ).

Al revisar los documentos de apio y kombu , descubrí que hay un transporte en memoria para las pruebas unitarias, que haría lo que estoy buscando. Intenté anular la configuración de BROKER_URL para usarla en las pruebas:

@override_settings(BROKER_URL=''memory://'') def test_async_job():

Pero el comportamiento es el mismo que con el corredor ampq: bloquea la prueba esperando el resultado. ¿Alguna idea de cómo se supone que debo configurar este agente para que funcione en las pruebas?


Puede especificar el broker_backend en su configuración:

if ''test'' in sys.argv[1:]: BROKER_BACKEND = ''memory'' CELERY_TASK_ALWAYS_EAGER = True CELERY_TASK_EAGER_PROPAGATES = True

o puede anular la configuración con un decorador directamente en su prueba

import unittest from django.test.utils import override_settings class MyTestCase(unittest.TestCase): @override_settings(CELERY_TASK_EAGER_PROPAGATES=True, CELERY_TASK_ALWAYS_EAGER=True, BROKER_BACKEND=''memory'') def test_mytask(self): ...


Puede usar el intermediario en memoria de Kombu para ejecutar pruebas unitarias, sin embargo, para hacerlo debe hacer girar un trabajador de apio utilizando el mismo objeto de aplicación de apio que el servidor Django.

Para usar el intermediario en memoria, configure BROKER_URL en memory://localhost/

Luego, para hacer girar un pequeño apio, puedes hacer lo siguiente:

app = <Django Celery App> # Set the worker up to run in-place instead of using a pool app.conf.CELERYD_CONCURRENCY = 1 app.conf.CELERYD_POOL = ''solo'' # Code to start the worker def run_worker(): app.worker_main() # Create a thread and run the worker in it import threading t = threading.Thread(target=run_worker) t.setDaemon(True) t.start()

Debe asegurarse de utilizar la misma aplicación que la instancia de apio de Django.

Tenga en cuenta que iniciar el trabajador imprimirá muchas cosas y modificará la configuración de registro.