tutorial shared_task results autodiscover_tasks app task rabbitmq celery celery-task

shared_task - celery worker



Eliminando todas las tareas pendientes en apio/conejomq (7)

¿Cómo puedo eliminar todas las tareas pendientes sin conocer el task_id para cada tarea?


Para apio 2.xy 3.x:

Cuando se utiliza worker con parámetro -Q para definir colas, por ejemplo

celery worker -Q queue1,queue2,queue3

entonces la celery purge no funcionará, porque no puede pasarle los parámetros de la cola. Solo eliminará la cola predeterminada. La solución es comenzar a trabajar con --purge parámetro de --purge como este:

celery worker -Q queue1,queue2,queue3 --purge

Sin embargo, esto ejecutará al trabajador.

Otra opción es usar el subcomando amqp de apio

celery amqp queue.delete queue1 celery amqp queue.delete queue2 celery amqp queue.delete queue3


1. Para purgar adecuadamente la cola de tareas de espera, debe detener a todos los trabajadores ( http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue ):

$ sudo rabbitmqctl stop

o (en caso de que el Supervisor administre RabbitMQ / Message Broker):

$ sudo supervisorctl stop all

2. ... y luego purgar las tareas de una cola específica:

$ cd <source_dir> $ celery amqp queue.purge <queue name>

3. Inicie RabbitMQ:

$ sudo rabbitmqctl start

o (en caso de que RabbitMQ sea administrado por el Supervisor):

$ sudo supervisorctl start all


De los docs :

$ celery -A proj purge

o

from proj.celery import app app.control.purge()

(EDIT: actualizado con el método actual)


Descubrí que la celery purge no funciona para mi configuración de apio más compleja. Utilizo varias colas con nombre para diferentes propósitos:

$ sudo rabbitmqctl list_queues -p celery name messages consumers Listing queues ... # Output sorted, whitespaced for readability celery 0 2 [email protected] 0 1 [email protected] 0 1 apns 0 1 [email protected] 0 1 analytics 1 1 [email protected] 0 1 bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1 bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1 celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1 celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1 celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1 celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1

La primera columna es el nombre de la cola, la segunda es la cantidad de mensajes que esperan en la cola y la tercera es el número de oyentes para esa cola. Las colas son:

  • apio - Cola para tareas de apio estándar e idempotentes
  • apns: cola para las tareas del servicio de notificación push de Apple, no tan idempotente
  • análisis: cola para análisis nocturnos de larga ejecución
  • * .pidbox - Cola para los comandos de los trabajadores, como apagar y restablecer, uno por trabajador (2 trabajadores de apio, un trabajador de apns, un trabajador de análisis)
  • bcast. * - Colas de difusión, para enviar mensajes a todos los trabajadores que escuchan una cola (en lugar de solo los primeros en tomarla)
  • apioev. * - Colas de eventos de apio, para reportar análisis de tareas

La tarea de análisis es una tarea de fuerza bruta que funciona muy bien en pequeños conjuntos de datos, pero ahora lleva más de 24 horas procesarla. Ocasionalmente, algo saldrá mal y se quedará atascado esperando en la base de datos. Necesita ser reescrito, pero hasta entonces, cuando se atasque, mate la tarea, vacíe la cola y vuelva a intentarlo. Detecto "estancamiento" al observar el recuento de mensajes para la cola de análisis, que debería ser 0 (análisis finalizados) o 1 (esperando a que termine la analítica de la noche anterior). 2 o más es malo, y recibo un correo electrónico.

celery purge ofrece borrar tareas de una de las colas de transmisión, y no veo una opción para elegir una cola con nombre diferente.

Aquí está mi proceso:

$ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C $ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery $ sudo kill <PID> $ sudo /etc/init.d/celeryd stop # Confim dead $ python manage.py celery amqp queue.purge analytics $ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0 $ sudo /etc/init.d/celeryd start



En apio 3+

http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Purgar llamado cola

celery -A proj amqp queue.purge <queue name>

Depurar cola configurada

celery -A proj purge

He purgado los mensajes, pero aún quedan mensajes en la cola? Respuesta: Las tareas son reconocidas (eliminadas de la cola) tan pronto como se ejecutan. Una vez que el trabajador ha recibido una tarea, tardará un tiempo hasta que se ejecute realmente, especialmente si hay muchas tareas que ya están esperando su ejecución. El trabajador retiene los mensajes no reconocidos hasta que cierra la conexión con el intermediario (servidor AMQP). Cuando se cierra esa conexión (por ejemplo, porque se detuvo al trabajador) el intermediario reenvía las tareas al siguiente trabajador disponible (o al mismo trabajador cuando se ha reiniciado), por lo que para purgar adecuadamente la cola de tareas de espera se tiene que detener a todos los trabajadores, y luego purgar las tareas usando apio.control.purge ().

Por lo tanto, para purgar toda la cola, los trabajadores deben detenerse.


Para apio 3.0+:

$ celery purge

Para purgar una cola específica:

$ celery -Q queue_name purge