python - barplot - ¿Cómo verificar el estado de la tarea en Apio?
pandas plot (9)
Además de arriba, se puede ver fácilmente el enfoque programático utilizando el estado de Flower Task.
Monitoreo en tiempo real usando Apio Eventos. Flower es una herramienta basada en la web para monitorear y administrar clusters de apio.
- Progreso de la tarea e historia
- Posibilidad de mostrar detalles de la tarea (argumentos, hora de inicio, tiempo de ejecución y más)
- Gráficos y estadísticas
Documento oficial: Flor - Herramienta de monitoreo de apio
Instalación:
$ pip install flower
Uso:
http://localhost:5555
¿Cómo se puede verificar si una tarea se está ejecutando en apio (específicamente, estoy usando apio-django)?
He leído la documentación y he buscado en Google, pero no puedo ver una llamada como:
my_example_task.state() == RUNNING
Mi caso de uso es que tengo un servicio externo (Java) para la transcodificación. Cuando envío un documento para ser transcodificado, quiero verificar si la tarea que ejecuta ese servicio se está ejecutando, y si no, para (re) iniciarlo.
Estoy usando las versiones estables actuales - 2.4, creo.
Cada objeto Task
tiene una propiedad .request
, que contiene el objeto AsyncRequest
. En consecuencia, la siguiente línea proporciona el estado de una task
Tarea:
task.AsyncResult(task.request.id).state
Encontré información útil en el
Guía de los trabajadores de Celery Project inspeccionar a los trabajadores
Para mi caso, estoy revisando para ver si se está ejecutando Apio.
inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = ''FAILURE''
else:
state = str(task.state)
Puede jugar con inspeccionar para satisfacer sus necesidades.
La creación de un objeto AsyncResult
partir de la identificación de la tarea es la recomendada en las FAQ para obtener el estado de la tarea cuando lo único que tiene es la identificación de la tarea.
Sin embargo, a partir de Apio 3.x, hay advertencias significativas que pueden morder a las personas si no les prestan atención. Realmente depende del escenario de uso específico.
Por defecto, Aplery no registra un estado "en ejecución".
Para que Aplery registre que una tarea se está ejecutando, debe establecer task_track_started
en True
. Aquí hay una tarea simple que prueba esto:
@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state
Cuando task_track_started
es False
, que es el valor predeterminado, el estado del espectáculo es PENDING
aunque la tarea haya comenzado. Si configura task_track_started
en True
, entonces el estado se STARTED
.
El estado PENDING
significa "No sé".
Un AsyncResult
con el estado PENDING
no significa nada más que el hecho de que Aplery no conoce el estado de la tarea. Esto podría deberse a varias razones.
Por un lado, AsyncResult
se puede construir con identificadores de tareas no válidos. Dichas "tareas" se considerarán pendientes por parte de Aplery:
>>> task.AsyncResult("invalid").status
''PENDING''
Ok, nadie va a AsyncResult
identificadores obviamente no válidos a AsyncResult
. Bastante bien, pero también tiene por efecto que AsyncResult
también considere una tarea que se ha ejecutado con éxito pero que Celery ha olvidado como PENDING
. De nuevo, en algunos casos de uso, esto puede ser un problema. Parte del problema depende de cómo se configura Celery para mantener los resultados de las tareas, ya que depende de la disponibilidad de las "lápidas" en el backend de resultados. ("Tombstones" es el término utilizado en la documentación de Aplery para los fragmentos de datos que registran cómo finalizó la tarea). Usar AsyncResult
no funcionará en absoluto si task_ignore_result
es True
. Un problema más molesto es que Celery expira las lápidas por defecto. La configuración result_expires
por defecto está establecida en 24 horas. Por lo tanto, si inicia una tarea y registra la identificación en el almacenamiento a largo plazo, y más 24 horas después, crea un AsyncResult
con ella, el estado será PENDING
.
Todas las "tareas reales" comienzan en el estado PENDING
. Entonces, PENDING
de una tarea puede significar que la tarea fue solicitada pero nunca progresó más allá de esto (por la razón que sea). O podría significar que la tarea funcionó, pero Aplery olvidó su estado.
¡Ay! AsyncResult
no funcionará para mí. ¿Que más puedo hacer?
Prefiero hacer un seguimiento de los objetivos que realizar un seguimiento de las tareas . Sí conservo algo de información de tareas, pero es realmente secundario hacer un seguimiento de los objetivos. Los objetivos se almacenan en almacenamiento independiente de Celery. Cuando una solicitud necesita realizar un cálculo depende de que se haya logrado algún objetivo, comprueba si el objetivo ya se ha alcanzado; si es así, utiliza este objetivo en caché; de lo contrario, inicia la tarea que afectará al objetivo y lo envía a el cliente que hizo que la solicitud HTTP sea una respuesta que indica que debe esperar un resultado.
Los nombres de las variables y los hipervínculos anteriores son para Apio 4.x. En 3.x, las variables y los hipervínculos correspondientes son: CELERY_TRACK_STARTED
, CELERY_IGNORE_RESULT
, CELERY_TASK_RESULT_EXPIRES
.
Regrese el task_id (que se da desde .delay ()) y luego pregunte a la instancia de apio sobre el estado:
x = method.delay(1,2)
print x.task_id
Al preguntar, obtenga un nuevo AsyncResult usando este task_id:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
También puede crear estados personalizados y actualizar su ejecución de tareas de valor. Este ejemplo es de documentos:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state=''PROGRESS'',
meta={''current'': i, ''total'': len(filenames)})
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
Tratar:
task.AsyncResult(task.request.id).state
esto proporcionará el estado de la tarea de apio. Si la tarea de apio ya está bajo el estado de FALLA arrojará una excepción:
raised unexpected: KeyError(''exc_type'',)
Una vieja pregunta, pero recientemente me encontré con este problema.
Si intentas obtener el task_id, puedes hacerlo así:
import celery
from celery_app import add
from celery import uuid
task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
Ahora sabe exactamente lo que es el task_id y ahora puede usarlo para obtener el AsyncResult:
# grab the AsyncResult
result = celery.result.AsyncResult(task_id)
# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf
# print the AsyncResult''s status
print result.status
SUCCESS
# print the result returned
print result.result
4
para tareas simples, podemos usar http://flower.readthedocs.io/en/latest/screenshots.html y http://policystat.github.io/jobtastic/ para hacer el monitoreo.
y para tareas complicadas, digamos una tarea que trata con muchos otros módulos. Recomendamos registrar manualmente el progreso y el mensaje en la unidad de tarea específica.