python - results - rabbitmq and celery
Descubra si existe una tarea de apio (6)
Apio no escribe un estado cuando se envía la tarea, esto es en parte una optimización (ver http://docs.celeryproject.org/en/latest/userguide/tasks.html#state ).
Si realmente lo necesita, es simple agregar:
from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish
@after_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):
# the task may not exist if sent using `send_task` which
# sends tasks by name, so fall back to the default result backend
# if that is the case.
task = current_app.tasks.get(sender)
backend = task.backend if task else current_app.backend
backend.store_result(body[''id''], None, "SENT")
Luego puede probar el estado PENDIENTE para detectar que una tarea no se ha enviado (aparentemente):
>>> result.state != "PENDING"
¿Es posible averiguar si existe una tarea con una determinada identificación de la tarea? Cuando intento obtener el estado, siempre tendré pendiente.
>>> AsyncResult(''...'').status
''PENDING''
Quiero saber si una identificación de tarea determinada es una identificación de tarea de apio real y no una cadena aleatoria. Quiero resultados diferentes dependiendo de si hay una tarea válida para un determinado ID.
Es posible que haya habido una tarea válida en el pasado con la misma identificación, pero los resultados pueden haber sido eliminados del back-end.
AsyncResult.state devuelve PENDIENTE en caso de identificadores de tareas desconocidas.
PENDIENTE
La tarea está esperando la ejecución o es desconocida. Se supone que cualquier identificación de tarea que no se conoce está en estado pendiente.
http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending
Puede proporcionar identificadores personalizados de tareas si necesita distinguir identificadores desconocidos de los existentes:
>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
''celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd''
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
...
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
...
En este momento estoy usando el siguiente esquema:
- Obtener la identificación de la tarea.
- Establezca la clave de Memcache como ''task_% s''% task.id message ''Started''.
- Pase la identificación de la tarea al cliente.
- Ahora desde el cliente puedo monitorear el estado de la tarea (establecer desde mensajes de tareas a Memcache).
- De la tarea en listo: configurada en el mensaje de clave de Memcache ''Listo''.
- Desde el cliente en la tarea preparada: inicie una tarea especial que eliminará la clave de Memcache y realizará las acciones de limpieza necesarias.
Necesitas llamar a .get()
en el objeto AsyncTask que creas para obtener realmente el resultado del back-end.
Consulte las preguntas frecuentes sobre apio .
Para aclarar más en mi respuesta.
Cualquier cadena es técnicamente una identificación válida, no hay forma de validar la identificación de la tarea. La única forma de averiguar si existe una tarea es preguntar al servidor si sabe al respecto y para hacerlo debe usar .get()
.
Esto introduce el problema de que .get()
bloquea cuando el servidor no tiene ninguna información sobre la ID de tarea que usted suministró, esto es por diseño para permitirle comenzar una tarea y luego esperar a que se complete.
En el caso de la pregunta original, supondré que el OP quiere obtener el estado de una tarea completada previamente. Para hacer eso, puede pasar un tiempo de espera muy pequeño y detectar errores de tiempo de espera:
from celery.exceptions import TimeoutError
try:
# fetch the result from the backend
# your backend must be fast enough to return
# results within 100ms (0.1 seconds)
result = AsyncResult(''blubb'').get(timeout=0.1)
except TimeoutError:
result = None
if result:
print "Result exists; state=%s" % (result.state,)
else:
print "Result does not exist"
No hace falta decir que esto solo funciona si su backend está almacenando resultados, si no es así, no hay forma de saber si el ID de una tarea es válido o no, porque nada guarda un registro de ellos.
Aún más aclaración.
Lo que desea hacer no se puede lograr utilizando el servidor de AMQP porque no almacena resultados, sino que los reenvía .
Mi sugerencia sería cambiar a un back-end de base de datos para que los resultados estén en una base de datos que puede consultar fuera de los módulos de apio existentes. Si no existen tareas en la base de datos de resultados, puede suponer que el ID no es válido.
Por favor corrígeme si estoy equivocado.
if built_in_status_check(task_id) == ''pending''
if registry_exists(task_id) == true
print ''Pending''
else
print ''Task does not exist''
Tratar
AsyncResult(''blubb'').state
eso puede funcionar
Debería devolver algo diferente.