simple run results ocean beat python django celery django-celery

python - run - django-celery-results



Detecta si el apio está disponible/funcionando (3)

Aquí está el código que he estado usando. celery.task.control.Inspect.stats() devuelve un dict que contiene muchos detalles sobre los trabajadores actualmente disponibles, ninguno si no hay trabajadores en ejecución, o genera un IOError si no se puede conectar con el intermediario de mensajes. Estoy usando RabbitMQ: es posible que otros sistemas de mensajería se comporten de forma ligeramente diferente. Esto funcionó en Celery 2.3.x y 2.4.x; No estoy seguro de cuánto tiempo más atrás.

def get_celery_worker_status(): ERROR_KEY = "ERROR" try: from celery.task.control import inspect insp = inspect() d = insp.stats() if not d: d = { ERROR_KEY: ''No running Celery workers were found.'' } except IOError as e: from errno import errorcode msg = "Error connecting to the backend: " + str(e) if len(e.args) > 0 and errorcode.get(e.args[0]) == ''ECONNREFUSED'': msg += '' Check that the RabbitMQ server is running.'' d = { ERROR_KEY: msg } except ImportError as e: d = { ERROR_KEY: str(e)} return d

Estoy usando Celery para administrar tareas asíncronas. Ocasionalmente, sin embargo, el proceso de apio disminuye y no se ejecuta ninguna de las tareas. Me gustaría poder verificar el estado del apio y asegurarme de que todo funciona bien, y si detecto algún problema, mostraré un mensaje de error al usuario. De la documentación de Celery Worker parece que podría usar ping o inspect para esto, pero el ping se siente raro y no está claro exactamente cómo se debe usar la inspección (si inspeccionar (). Registered () está vacío?).

Cualquier orientación sobre esto sería apreciada. Básicamente lo que estoy buscando es un método como ese:

def celery_is_alive(): from celery.task.control import inspect return bool(inspect().registered()) # is this right??

EDITAR: ni siquiera parece que registered () esté disponible en apio 2.3.3 (aunque los 2.1 documentos lo incluyen). Quizás ping es la respuesta correcta.

EDITAR: Ping tampoco parece hacer lo que pensé que haría, así que todavía no estoy seguro de la respuesta aquí.


Lo siguiente funcionó para mí:

import socket from kombu import Connection celery_broker_url = "amqp://localhost" try: conn = Connection(celery_broker_url) conn.ensure_connection(max_retries=3) except socket.error: raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))