simple run results beat app python django celery

python - run - Los informes arrojaron resultados de una larga tarea de apio



django-celery-results (6)

Problema

He segmentado una tarea de larga duración en subtareas lógicas, por lo que puedo informar los resultados de cada subtarea a medida que se completa. Sin embargo, estoy tratando de informar los resultados de una tarea que nunca se completará de manera efectiva (en lugar de eso, proporcionará valores a medida que avanza), y estoy luchando para hacerlo con mi solución existente.

Fondo

Estoy construyendo una interfaz web para algunos programas Python que he escrito. Los usuarios pueden enviar trabajos a través de formularios web y luego volver a consultar para ver el progreso del trabajo.

Digamos que tengo dos funciones, a las que se accede a través de formularios separados:

  • med_func : tarda aproximadamente 1 minuto en ejecutarse, los resultados se pasan a render() , lo que produce datos adicionales.
  • long_func : Devuelve un generador. Cada yield toma el orden de 30 minutos, y debe informarse al usuario. Hay tantos rendimientos que podemos considerar este iterador como infinito (terminando solo cuando se revoked ).

Código, implementación actual

Con med_func , med_func resultados de la siguiente manera:

Al enviar el formulario, AsyncResult un AsyncResult en una sesión de Django :

task_result = med_func.apply_async([form], link=render.s()) request.session["task_result"] = task_result

La vista de Django para la página de resultados accede a este AsyncResult . Cuando una tarea se completa, los resultados se guardan en un objeto que se pasa como contexto a una plantilla de Django.

def results(request): """ Serve (possibly incomplete) results of a session''s latest run. """ session = request.session try: # Load most recent task task_result = session["task_result"] except KeyError: # Already cleared, or doesn''t exist if "results" not in session: session["status"] = "No job submitted" else: # Extract data from Asynchronous Tasks session["status"] = task_result.status if task_result.ready(): session["results"] = task_result.get() render_task = task_result.children[0] # Decorate with rendering results session["render_status"] = render_task.status if render_task.ready(): session["results"].render_output = render_task.get() del(request.session["task_result"]) # Don''t need any more return render_to_response(''results.html'', request.session)

Esta solución solo funciona cuando la función termina realmente. No puedo encadenar las subtareas lógicas de long_func , porque hay un número desconocido de yield s (cada iteración del bucle long_func puede no producir un resultado).

Pregunta

¿Hay alguna forma sensata de acceder a los objetos cedidos desde una tarea de apio extremadamente larga, para que puedan mostrarse antes de que se agote el generador?


La respuesta de Pablo es genial. Como alternativa al uso de mark_as_started , puede usar Task método update_state Task . En última instancia, hacen lo mismo, pero el nombre "update_state" es un poco más apropiado para lo que intentas hacer. Opcionalmente, puede definir un estado personalizado que indique que su tarea está en progreso (he llamado a mi estado personalizado "PROGRESO"):

def yielder(): for i in range(2**100): yield i @task def report_progress(): for progress in yielder(): # set current progress on the task report_progress.update_state(state=''PROGRESS'', meta={''progress'': progress}) def view_function(request): task_id = request.session[''task_id''] task = AsyncResult(task_id) progress = task.info[''progress''] # do something with your current progress


Para que Celery sepa cuál es el estado actual de la tarea, establece algunos metadatos en el resultado que tenga. Puede usarlo para almacenar otros tipos de metadatos.

def yielder(): for i in range(2**100): yield i @task def report_progress(): for progress in yielder(): # set current progress on the task report_progress.backend.mark_as_started( report_progress.request.id, progress=progress) def view_function(request): task_id = request.session[''task_id''] task = AsyncResult(task_id) progress = task.info[''progress''] # do something with your current progress

No arrojaría una tonelada de datos allí, pero funciona bien para rastrear el progreso de una tarea de larga duración.


Parte de apio:

def long_func(*args, **kwargs): i = 0 while True: yield i do_something_here(*args, **kwargs) i += 1 @task() def test_yield_task(task_id=None, **kwargs): the_progress = 0 for the_progress in long_func(**kwargs): cache.set(''celery-task-%s'' % task_id, the_progress)

Lado del cliente web, tarea inicial:

r = test_yield_task.apply_async() request.session[''task_id''] = r.task_id

Prueba de último valor cedido:

v = cache.get(''celery-task-%s'' % session.get(''task_id'')) if v: do_someting()

Si no te gusta usar el caché, o si es imposible, puedes usar db, file o cualquier otro lugar en el que el trabajador del apio y el servidor tengan ambos accesos. Con el caché es la solución más simple, pero los trabajadores y el servidor tienen que usar el mismo caché.


Personalmente, me gustaría ver la hora de inicio, la duración, el progreso (número de elementos obtenidos), la hora de finalización (o ETA), el estado y cualquier otra información útil. Sería bueno si se pareciera a una pantalla relacionada, tal vez como ps en Linux. Es, después de todo, un estado de proceso.

Podría incluir algunas opciones para pausar o eliminar la tarea y / o para "abrirla" y mostrar información detallada sobre los niños o los resultados.


Un par de opciones a considerar:

1 - grupos de tareas. Si puede enumerar todas las sub tareas desde el momento de la invocación, puede aplicar el grupo como un todo, que devuelve un objeto TaskSetResult que puede usar para monitorear los resultados del grupo en su totalidad, o de tareas individuales en el grupo. - consulte esto cuando sea necesario cuando necesite verificar el estado.

2 - devoluciones de llamada. Si no puede enumerar todas las sub tareas (¡o incluso si puede!), Puede definir un enlace / devolución de llamada que sea el último paso de la tarea: se llama cuando finaliza el resto de la tarea. El gancho estaría en contra de un URI en su aplicación que ingiera el resultado y lo haga disponible a través de DB o API interna de la aplicación.

Una combinación de estos podría resolver su desafío.