tutorial example asincrono django message-queue celery django-celery

example - django asincrono



¿Cómo saber si una tarea ya se ha puesto en cola en django-celery? (3)

Aquí está mi configuración:

  • django 1.3
  • apio 2.2.6
  • django-apio 2.2.4
  • djkombu 0.9.2

En mi archivo settings.py tengo

BROKER_BACKEND = "djkombu.transport.DatabaseTransport"

es decir, solo estoy usando la base de datos para poner en cola las tareas.

Ahora en mi problema: tengo una tarea iniciada por el usuario que podría tardar unos minutos en completarse. Quiero que la tarea se ejecute solo una vez por usuario, y almacenaré en caché los resultados de la tarea en un archivo temporal, por lo que si el usuario vuelve a iniciar la tarea, simplemente devolveré el archivo en caché. Tengo un código que se ve así en mi función de vista:

task_id = "long-task-%d" % user_id result = tasks.some_long_task.AsyncResult(task_id) if result.state == celery.states.PENDING: # The next line makes a duplicate task if the user rapidly refreshes the page tasks.some_long_task.apply_async(task_id=task_id) return HttpResponse("Task started...") elif result.state == celery.states.STARTED: return HttpResponse("Task is still running, please wait...") elif result.state == celery.states.SUCCESS: if cached_file_still_exists(): return get_cached_file() else: result.forget() tasks.some_long_task.apply_async(task_id=task_id) return HttpResponse("Task started...")

Este código casi funciona. Pero tengo un problema cuando el usuario vuelve a cargar la página rápidamente. Hay una demora de 1 a 3 segundos entre el momento en que la tarea se pone en cola y cuando la tarea finalmente se retira de la cola y se entrega a un trabajador. Durante este tiempo, el estado de la tarea permanece PENDIENTE, lo que hace que la lógica de la vista inicie una tarea duplicada.

Lo que necesito es alguna forma de saber si la tarea ya se ha enviado a la cola, por lo que no termino de enviarla dos veces. ¿Hay una forma estándar de hacer esto en el apio?


No creo (como han sugerido Tomek y otros) que usar la base de datos es la forma de hacer este bloqueo. django tiene un marco de caché incorporado, que debería ser suficiente para lograr este bloqueo, y debe ser más rápido. Ver:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Django puede configurarse para usar memcached como su backend de caché, y esto puede distribuirse en múltiples máquinas ... esto me parece mejor. ¿Pensamientos?


Puedes hacer un poco de trampa almacenando el resultado manualmente en la base de datos. Déjame explicarte cómo esto ayudará.

Por ejemplo, si utiliza RDBMS (tabla con columnas - id de tarea, estado, resultado):

Ver parte:

  1. Utilice la gestión de transacciones.
  2. Use SELECT FOR UPDATE para obtener la fila donde task_id == "long-task-% d"% user_id. SELECCIONAR PARA ACTUALIZAR bloqueará otras solicitudes hasta que ésta COMPRUEBE o ROLLBACKs.
  3. Si no existe, establezca el estado en PENDIENTE e inicie ''some_long_task'', finalice la solicitud.
  4. Si el estado es PENDIENTE, informe al usuario.
  5. Si el estado es SUCCESS - establezca el estado en PENDING, inicie la tarea, devuelva el archivo señalado por la columna ''resultado''. Baso esto en el supuesto de que desea volver a ejecutar la tarea para obtener el resultado. COMETER
  6. Si el estado es ERROR: establezca el estado en PENDIENTE, inicie la tarea e informe al usuario. COMETER

Parte de la tarea:

  1. Preparar el archivo, envolver en try, catch block.
  2. En caso de éxito: ACTUALICE la fila correcta con el estado = SUCCESS, resultado.
  3. En caso de error: ACTUALICE la fila correcta con el estado = ERROR.

Resolví esto con Redis. Solo establezca una clave en redis para cada tarea y luego elimine la clave de redis en el método after_return de la tarea. Redis es ligero y rápido.