tutorial rabbitmq celery amqp pika celeryd

tutorial - celery rabbitmq



¿Cómo puedo recuperar mensajes AMQP no reconocidos de otros canales que no sean los de mi conexión? (3)

Los mensajes sin acuse de recibo son aquellos que han sido entregados a través de la red a un consumidor pero que aún no han sido rechazados o rechazados, pero que el consumidor aún no ha cerrado el canal o la conexión sobre los que originalmente los recibió. Por lo tanto, el agente no puede determinar si el consumidor simplemente está tardando mucho tiempo en procesar esos mensajes o si se olvidó de ellos. Por lo tanto, los deja en un estado no reconocido hasta que el consumidor muere o son rechazados o rechazados.

Debido a que esos mensajes aún podrían ser procesados ​​válidamente en el futuro por el consumidor aún vivo que los consumió originalmente, no puede (que yo sepa) insertar otro consumidor en la mezcla y tratar de tomar decisiones externas sobre ellos. Debe corregir a sus consumidores para que tomen decisiones sobre cada mensaje a medida que se procesan en lugar de dejar los mensajes antiguos sin acuse de recibo.

Parece que mientras más tiempo mantengo mi servidor rabbitmq funcionando, más problemas tengo con los mensajes no reconocidos. Me encantaría ponerlos nuevamente. De hecho, parece que hay un comando amqp para hacer esto, pero solo se aplica al canal que usa su conexión. Construí un pequeño script de pika para al menos probarlo, pero me falta algo o no se puede hacer de esta manera (¿qué tal con rabbitmqctl?)

import pika credentials = pika.PlainCredentials(''***'', ''***'') parameters = pika.ConnectionParameters(host=''localhost'',port=5672,/ credentials=credentials, virtual_host=''***'') def handle_delivery(body): """Called when we receive a message from RabbitMQ""" print body def on_connected(connection): """Called when we are fully connected to RabbitMQ""" connection.channel(on_channel_open) def on_channel_open(new_channel): """Called when our channel has opened""" global channel channel = new_channel channel.basic_recover(callback=handle_delivery,requeue=True) try: connection = pika.SelectConnection(parameters=parameters,/ on_open_callback=on_connected) # Loop so we can communicate with RabbitMQ connection.ioloop.start() except KeyboardInterrupt: # Gracefully close the connection connection.close() # Loop until we''re fully closed, will stop on its own connection.ioloop.start()


Si los mensajes no están disponibles, solo hay dos formas de volverlos a poner en la cola:

  1. basic.nack

    Este comando hará que el mensaje se vuelva a colocar en la cola y se vuelva a entregar.

  2. Desconectar del broker

    Esta acción forzará que todos los mensajes no resueltos de este canal se vuelvan a poner en la cola.

NOTA : basic.recover intentará volver a publicar los mensajes que no haya recibido en el mismo canal (para el mismo consumidor), que a veces es el comportamiento deseado.

Especificación de RabbitMQ para basic.recover y basic.nack

La verdadera pregunta es: ¿por qué no se reconocen los mensajes?

Posibles escenarios para causar mensajes no resueltos:

  1. El consumidor busca demasiados mensajes, luego no los procesa y los ataca lo suficientemente rápido.

    Solución: obtenga los pocos mensajes que corresponda.

  2. Biblioteca de cliente Buggy (tengo este problema actualmente con pika 0.9.13 . Si la cola tiene muchos mensajes, una cierta cantidad de mensajes quedarán bloqueados, incluso horas después.

    Solución: Tengo que reiniciar el consumidor varias veces hasta que todos los mensajes no procesados ​​desaparezcan de la cola.


Todos los mensajes no reconocidos pasarán al estado listo una vez que se detengan todos los trabajadores / consumidores.

Asegúrese de detener a todos los trabajadores confirmando con un grep en ps aux output, y deténgalos / elimínelos si los encuentra.

Si está administrando trabajadores que usan el supervisor, que se muestra como el trabajador está parado, es posible que desee comprobar si hay zombis. El supervisor informa que el trabajador debe detenerse, pero de todos modos encontrará que los procesos zombies se ejecutan cuando se copian en ps aux output. Matar a los procesos zombies llevará los mensajes a un estado listo.