xticks barplot python web-services celery django-celery

python - barplot - ¿Cómo verificar el estado de la tarea en Apio?



pandas plot (9)

Además de arriba, se puede ver fácilmente el enfoque programático utilizando el estado de Flower Task.

Monitoreo en tiempo real usando Apio Eventos. Flower es una herramienta basada en la web para monitorear y administrar clusters de apio.

  1. Progreso de la tarea e historia
  2. Posibilidad de mostrar detalles de la tarea (argumentos, hora de inicio, tiempo de ejecución y más)
  3. Gráficos y estadísticas

Documento oficial: Flor - Herramienta de monitoreo de apio

Instalación:

$ pip install flower

Uso:

http://localhost:5555

¿Cómo se puede verificar si una tarea se está ejecutando en apio (específicamente, estoy usando apio-django)?

He leído la documentación y he buscado en Google, pero no puedo ver una llamada como:

my_example_task.state() == RUNNING

Mi caso de uso es que tengo un servicio externo (Java) para la transcodificación. Cuando envío un documento para ser transcodificado, quiero verificar si la tarea que ejecuta ese servicio se está ejecutando, y si no, para (re) iniciarlo.

Estoy usando las versiones estables actuales - 2.4, creo.


Cada objeto Task tiene una propiedad .request , que contiene el objeto AsyncRequest . En consecuencia, la siguiente línea proporciona el estado de una task Tarea:

task.AsyncResult(task.request.id).state


Encontré información útil en el

Guía de los trabajadores de Celery Project inspeccionar a los trabajadores

Para mi caso, estoy revisando para ver si se está ejecutando Apio.

inspect_workers = task.app.control.inspect() if inspect_workers.registered() is None: state = ''FAILURE'' else: state = str(task.state)

Puede jugar con inspeccionar para satisfacer sus necesidades.


La creación de un objeto AsyncResult partir de la identificación de la tarea es la recomendada en las FAQ para obtener el estado de la tarea cuando lo único que tiene es la identificación de la tarea.

Sin embargo, a partir de Apio 3.x, hay advertencias significativas que pueden morder a las personas si no les prestan atención. Realmente depende del escenario de uso específico.

Por defecto, Aplery no registra un estado "en ejecución".

Para que Aplery registre que una tarea se está ejecutando, debe establecer task_track_started en True . Aquí hay una tarea simple que prueba esto:

@app.task(bind=True) def test(self): print self.AsyncResult(self.request.id).state

Cuando task_track_started es False , que es el valor predeterminado, el estado del espectáculo es PENDING aunque la tarea haya comenzado. Si configura task_track_started en True , entonces el estado se STARTED .

El estado PENDING significa "No sé".

Un AsyncResult con el estado PENDING no significa nada más que el hecho de que Aplery no conoce el estado de la tarea. Esto podría deberse a varias razones.

Por un lado, AsyncResult se puede construir con identificadores de tareas no válidos. Dichas "tareas" se considerarán pendientes por parte de Aplery:

>>> task.AsyncResult("invalid").status ''PENDING''

Ok, nadie va a AsyncResult identificadores obviamente no válidos a AsyncResult . Bastante bien, pero también tiene por efecto que AsyncResult también considere una tarea que se ha ejecutado con éxito pero que Celery ha olvidado como PENDING . De nuevo, en algunos casos de uso, esto puede ser un problema. Parte del problema depende de cómo se configura Celery para mantener los resultados de las tareas, ya que depende de la disponibilidad de las "lápidas" en el backend de resultados. ("Tombstones" es el término utilizado en la documentación de Aplery para los fragmentos de datos que registran cómo finalizó la tarea). Usar AsyncResult no funcionará en absoluto si task_ignore_result es True . Un problema más molesto es que Celery expira las lápidas por defecto. La configuración result_expires por defecto está establecida en 24 horas. Por lo tanto, si inicia una tarea y registra la identificación en el almacenamiento a largo plazo, y más 24 horas después, crea un AsyncResult con ella, el estado será PENDING .

Todas las "tareas reales" comienzan en el estado PENDING . Entonces, PENDING de una tarea puede significar que la tarea fue solicitada pero nunca progresó más allá de esto (por la razón que sea). O podría significar que la tarea funcionó, pero Aplery olvidó su estado.

¡Ay! AsyncResult no funcionará para mí. ¿Que más puedo hacer?

Prefiero hacer un seguimiento de los objetivos que realizar un seguimiento de las tareas . Sí conservo algo de información de tareas, pero es realmente secundario hacer un seguimiento de los objetivos. Los objetivos se almacenan en almacenamiento independiente de Celery. Cuando una solicitud necesita realizar un cálculo depende de que se haya logrado algún objetivo, comprueba si el objetivo ya se ha alcanzado; si es así, utiliza este objetivo en caché; de lo contrario, inicia la tarea que afectará al objetivo y lo envía a el cliente que hizo que la solicitud HTTP sea una respuesta que indica que debe esperar un resultado.

Los nombres de las variables y los hipervínculos anteriores son para Apio 4.x. En 3.x, las variables y los hipervínculos correspondientes son: CELERY_TRACK_STARTED , CELERY_IGNORE_RESULT , CELERY_TASK_RESULT_EXPIRES .


Regrese el task_id (que se da desde .delay ()) y luego pregunte a la instancia de apio sobre el estado:

x = method.delay(1,2) print x.task_id

Al preguntar, obtenga un nuevo AsyncResult usando este task_id:

from celery.result import AsyncResult res = AsyncResult("your-task-id") res.ready()


También puede crear estados personalizados y actualizar su ejecución de tareas de valor. Este ejemplo es de documentos:

@app.task(bind=True) def upload_files(self, filenames): for i, file in enumerate(filenames): if not self.request.called_directly: self.update_state(state=''PROGRESS'', meta={''current'': i, ''total'': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states


Tratar:

task.AsyncResult(task.request.id).state

esto proporcionará el estado de la tarea de apio. Si la tarea de apio ya está bajo el estado de FALLA arrojará una excepción:

raised unexpected: KeyError(''exc_type'',)


Una vieja pregunta, pero recientemente me encontré con este problema.

Si intentas obtener el task_id, puedes hacerlo así:

import celery from celery_app import add from celery import uuid task_id = uuid() result = add.apply_async((2, 2), task_id=task_id)

Ahora sabe exactamente lo que es el task_id y ahora puede usarlo para obtener el AsyncResult:

# grab the AsyncResult result = celery.result.AsyncResult(task_id) # print the task id print result.task_id 09dad9cf-c9fa-4aee-933f-ff54dae39bdf # print the AsyncResult''s status print result.status SUCCESS # print the result returned print result.result 4