python celery

python - Recuperar la lista de tareas en una cola en Apio



celery queue (9)

Creo que la única forma de obtener las tareas que están esperando es mantener una lista de las tareas que comenzó y dejar que la tarea se elimine de la lista cuando se inicia.

Con rabbitmqctl y list_queues puede obtener una visión general de cuántas tareas están esperando, pero no las tareas en sí: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

Si lo que desea incluye la tarea que se está procesando, pero aún no se ha finalizado, puede mantener una lista de sus tareas y verificar sus estados:

from tasks import add result = add.delay(4, 4) result.ready() # True if finished

O bien, permite que Aplery almacene los resultados con CELERY_RESULT_BACKEND y verifique cuáles de sus tareas no están allí.

¿Cómo puedo recuperar una lista de tareas en una cola que aún no se han procesado?


El módulo de inspección de apio parece estar al tanto de las tareas desde la perspectiva de los trabajadores. Si desea ver los mensajes que están en la cola (que aún deben ser retirados por los trabajadores), sugiero usar pyrabbit , que puede interactuar con la api http de rabbitmq para recuperar todo tipo de información de la cola.

Puede encontrar un ejemplo aquí: Recuperar la longitud de la cola con Aplery (RabbitMQ, Django)


Llegué a la conclusión de que la mejor manera de obtener la cantidad de trabajos en una cola es usar rabbitmqctl como se ha sugerido varias veces aquí. Para permitir que cualquier usuario elegido ejecute el comando con sudo , seguí las instrucciones here (me salté la edición de la parte de perfil ya que no me importa escribir sudo antes del comando).

También agarré el grep de jamesc, cut fragmento y lo envolví en llamadas de subproceso.

from subprocess import Popen, PIPE p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE) p2 = Popen(["grep", "-e", "^celery/s"], stdin=p1.stdout, stdout=PIPE) p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE) p1.stdout.close() p2.stdout.close() print("number of jobs on queue: %i" % int(p3.communicate()[0]))


Para recuperar tareas desde el backend, use esto

from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel() name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)


Por lo que sé, Celery no da API para examinar tareas que están esperando en la cola. Esto es específico del corredor. Si utiliza Redis como intermediario para un ejemplo, entonces el examen de las tareas que están esperando en la cola de celery (predeterminado) es tan simple como:

  1. conectarse a la base de datos del corredor
  2. enumere los artículos en la lista de celery (comando LRANGE para ver un ejemplo)

Tenga en cuenta que estas son tareas QUE ESPERA que sean recogidas por los trabajadores disponibles. Es posible que su clúster tenga algunas tareas en ejecución; estas no aparecerán en esta lista, ya que ya se han seleccionado.


Si no usa tareas prioritarias, esto es bastante simple si usa Redis. Para obtener la tarea cuenta:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

Pero las tareas priorizadas usan una clave diferente en redis , por lo que la imagen completa es un poco más complicada. La imagen completa es que necesita consultar redis para cada prioridad de la tarea. En python (y del proyecto Flower), esto se ve así:

PRIORITY_SEP = ''/x06/x16'' DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9] def make_queue_name_for_pri(queue, pri): """Make a queue name for redis Celery uses PRIORITY_SEP to separate different priorities of tasks into different queues in Redis. Each queue-priority combination becomes a key in redis with names like: - batch1/x06/x163 <-- P3 queue named batch1 There''s more information about this in Github, but it doesn''t look like it will change any time soon: - https://github.com/celery/kombu/issues/422 In that ticket the code below, from the Flower project, is referenced: - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135 :param queue: The name of the queue to make a name for. :param pri: The priority to make a name with. :return: A name for the queue-priority pair. """ if pri not in DEFAULT_PRIORITY_STEPS: raise ValueError(''Priority not in priority steps'') return ''{0}{1}{2}''.format(*((queue, PRIORITY_SEP, pri) if pri else (queue, '''', ''''))) def get_queue_length(queue_name=''celery''): """Get the number of tasks in a celery queue. :param queue_name: The name of the queue you want to inspect. :return: the number of items in the queue. """ priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in DEFAULT_PRIORITY_STEPS] r = redis.StrictRedis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DATABASES[''CELERY''], ) return sum([r.llen(x) for x in priority_names])

Si desea obtener una tarea real, puede usar algo como:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

A partir de ahí, deberá deserializar la lista devuelta. En mi caso, pude lograr esto con algo como:

r = redis.StrictRedis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DATABASES[''CELERY''], ) l = r.lrange(''celery'', 0, -1) pickle.loads(base64.decodestring(json.loads(l[0])[''body'']))

Solo ten en cuenta que la deserialización puede tomar un momento, y tendrás que ajustar los comandos anteriores para trabajar con varias prioridades.


Una solución de copiar y pegar para Redis con serialización json:

def get_celery_queue_items(queue_name): import base64 import json # Get a configured instance of celery: from yourproject.celery import app as celery_app with celery_app.pool.acquire(block=True) as conn: tasks = conn.default_channel.client.lrange(queue_name, 0, -1) decoded_tasks = [] for task in tasks: j = json.loads(task) body = json.loads(base64.b64decode(j[''body''])) decoded_tasks.append(body) return decoded_tasks

Funciona con Django. Solo no olvides cambiar tu yourproject.celery .


si está usando rabbitMQ, use esto en la terminal:

sudo rabbitmqctl list_queues

imprimirá la lista de colas con el número de tareas pendientes. por ejemplo:

Listing queues ... 0b27d8c59fba4974893ec22d478a7093 0 0e0a2da9828a48bc86fe993b210d984f 0 [email protected] 0 11926b79e30a4f0a9d95df61b6f402f7 0 15c036ad25884b82839495fb29bd6395 1 [email protected] 0 celery 166 celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa 0 celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6 0

el número en la columna de la derecha es el número de tareas en la cola. en la parte superior, la cola de apio tiene 166 tareas pendientes.


EDITAR: Vea otras respuestas para obtener una lista de tareas en la cola.

Debería mirar aquí: Guía de apio - Inspección de trabajadores

Básicamente esto:

>>> from celery.task.control import inspect # Inspect all nodes. >>> i = inspect() # Show the items that have an ETA or are scheduled for later processing >>> i.scheduled() # Show tasks that are currently active. >>> i.active() # Show tasks that have been claimed by workers >>> i.reserved()

Dependiendo de lo que quieras