python - run - Los informes arrojaron resultados de una larga tarea de apio
django-celery-results (6)
Problema
He segmentado una tarea de larga duración en subtareas lógicas, por lo que puedo informar los resultados de cada subtarea a medida que se completa. Sin embargo, estoy tratando de informar los resultados de una tarea que nunca se completará de manera efectiva (en lugar de eso, proporcionará valores a medida que avanza), y estoy luchando para hacerlo con mi solución existente.
Fondo
Estoy construyendo una interfaz web para algunos programas Python que he escrito. Los usuarios pueden enviar trabajos a través de formularios web y luego volver a consultar para ver el progreso del trabajo.
Digamos que tengo dos funciones, a las que se accede a través de formularios separados:
-
med_func
: tarda aproximadamente 1 minuto en ejecutarse, los resultados se pasan arender()
, lo que produce datos adicionales. -
long_func
: Devuelve un generador. Cadayield
toma el orden de 30 minutos, y debe informarse al usuario. Hay tantos rendimientos que podemos considerar este iterador como infinito (terminando solo cuando se revoked ).
Código, implementación actual
Con med_func
, med_func
resultados de la siguiente manera:
Al enviar el formulario, AsyncResult
un AsyncResult
en una sesión de Django :
task_result = med_func.apply_async([form], link=render.s())
request.session["task_result"] = task_result
La vista de Django para la página de resultados accede a este AsyncResult
. Cuando una tarea se completa, los resultados se guardan en un objeto que se pasa como contexto a una plantilla de Django.
def results(request):
""" Serve (possibly incomplete) results of a session''s latest run. """
session = request.session
try: # Load most recent task
task_result = session["task_result"]
except KeyError: # Already cleared, or doesn''t exist
if "results" not in session:
session["status"] = "No job submitted"
else: # Extract data from Asynchronous Tasks
session["status"] = task_result.status
if task_result.ready():
session["results"] = task_result.get()
render_task = task_result.children[0]
# Decorate with rendering results
session["render_status"] = render_task.status
if render_task.ready():
session["results"].render_output = render_task.get()
del(request.session["task_result"]) # Don''t need any more
return render_to_response(''results.html'', request.session)
Esta solución solo funciona cuando la función termina realmente. No puedo encadenar las subtareas lógicas de long_func
, porque hay un número desconocido de yield
s (cada iteración del bucle long_func
puede no producir un resultado).
Pregunta
¿Hay alguna forma sensata de acceder a los objetos cedidos desde una tarea de apio extremadamente larga, para que puedan mostrarse antes de que se agote el generador?
La respuesta de Pablo es genial. Como alternativa al uso de mark_as_started
, puede usar Task
método update_state
Task
. En última instancia, hacen lo mismo, pero el nombre "update_state" es un poco más apropiado para lo que intentas hacer. Opcionalmente, puede definir un estado personalizado que indique que su tarea está en progreso (he llamado a mi estado personalizado "PROGRESO"):
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.update_state(state=''PROGRESS'', meta={''progress'': progress})
def view_function(request):
task_id = request.session[''task_id'']
task = AsyncResult(task_id)
progress = task.info[''progress'']
# do something with your current progress
Para que Celery sepa cuál es el estado actual de la tarea, establece algunos metadatos en el resultado que tenga. Puede usarlo para almacenar otros tipos de metadatos.
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.backend.mark_as_started(
report_progress.request.id,
progress=progress)
def view_function(request):
task_id = request.session[''task_id'']
task = AsyncResult(task_id)
progress = task.info[''progress'']
# do something with your current progress
No arrojaría una tonelada de datos allí, pero funciona bien para rastrear el progreso de una tarea de larga duración.
Parte de apio:
def long_func(*args, **kwargs):
i = 0
while True:
yield i
do_something_here(*args, **kwargs)
i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
the_progress = 0
for the_progress in long_func(**kwargs):
cache.set(''celery-task-%s'' % task_id, the_progress)
Lado del cliente web, tarea inicial:
r = test_yield_task.apply_async()
request.session[''task_id''] = r.task_id
Prueba de último valor cedido:
v = cache.get(''celery-task-%s'' % session.get(''task_id''))
if v:
do_someting()
Si no te gusta usar el caché, o si es imposible, puedes usar db, file o cualquier otro lugar en el que el trabajador del apio y el servidor tengan ambos accesos. Con el caché es la solución más simple, pero los trabajadores y el servidor tienen que usar el mismo caché.
Personalmente, me gustaría ver la hora de inicio, la duración, el progreso (número de elementos obtenidos), la hora de finalización (o ETA), el estado y cualquier otra información útil. Sería bueno si se pareciera a una pantalla relacionada, tal vez como ps
en Linux. Es, después de todo, un estado de proceso.
Podría incluir algunas opciones para pausar o eliminar la tarea y / o para "abrirla" y mostrar información detallada sobre los niños o los resultados.
Un par de opciones a considerar:
1 - grupos de tareas. Si puede enumerar todas las sub tareas desde el momento de la invocación, puede aplicar el grupo como un todo, que devuelve un objeto TaskSetResult que puede usar para monitorear los resultados del grupo en su totalidad, o de tareas individuales en el grupo. - consulte esto cuando sea necesario cuando necesite verificar el estado.
2 - devoluciones de llamada. Si no puede enumerar todas las sub tareas (¡o incluso si puede!), Puede definir un enlace / devolución de llamada que sea el último paso de la tarea: se llama cuando finaliza el resto de la tarea. El gancho estaría en contra de un URI en su aplicación que ingiera el resultado y lo haga disponible a través de DB o API interna de la aplicación.
Una combinación de estos podría resolver su desafío.
Vea también este gran preso PyCon de uno de los ingenieros de Instagram.
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
En la marca de video a las 16:00, explica cómo estructuran largas listas de sub-tareas.