tutorial exchange example español code python message-queue rabbitmq amqp

python - exchange - ¿Cómo configurar la detección de tiempo de espera en un servidor RabbitMQ?



rabbitmq tutorial español (3)

Estoy probando RabbitMQ con this enlace python.

Una cosa que noté es que si mato a un consumidor de manera impura (emulando un programa bloqueado), el servidor pensará que este consumidor todavía está allí por mucho tiempo. El resultado de esto es que todos los demás mensajes serán ignorados.

Por ejemplo, si mata a un consumidor 1 vez y se vuelve a conectar, se ignorarán 1/2 mensajes. Si matas a otro consumidor, los 2/3 mensajes serán ignorados. Si matas a un tercero, los mensajes 3/4 se ignorarán y así sucesivamente.

He intentado activar los agradecimientos, pero no parece estar ayudando. La única solución que he encontrado es detener manualmente el servidor y restablecerlo.

¿Hay alguna manera mejor?

Cómo recrear este escenario

  • Ejecutar rabbitmq.

  • Desarchivar esta biblioteca .

  • Descargue el consumidor y editor here . Ejecuta amqp_consumer.py dos veces. Ejecute amqp_publisher.py, suministrando algunos datos y observe que funciona como se esperaba. Se reciben mensajes de estilo round robin.

  • Elimine uno de los procesos de consumo con kill -9 o el administrador de tareas.

  • Ahora, cuando publiques un mensaje, se perderá el 50% de los mensajes.


Por favor, proporcione algunos detalles más con respecto a los componentes que ha declarado. Normalmente (e independientemente de la implementación del cliente) una cola con las propiedades

  • exclusivo y
  • borrado automático

debe eliminarse tan pronto como se interrumpa la conexión entre el cliente declarante y el agente. Sin embargo, esto no te ayudará con las colas compartidas. Por favor, detalle un poco qué es exactamente lo que estás tratando de modelar.


RabbitMQ no tiene un tiempo de espera en los acuses de recibo por parte del cliente de que se ha procesado un mensaje: consulte esta publicación (todo el tema puede ser de interés). Algunos puntos destacados del post:

El modelo de ack de AMQP para suscripciones y "extracción" son idénticos. En ambos casos, el mensaje se mantiene en el servidor, pero no está disponible para otros consumidores hasta que se haya reconocido (y se elimine), se haya eliminado (con basic.reject; aunque RabbitMQ no lo implemente) o el canal / la conexión está cerrada (en cuyo momento el mensaje queda disponible para otros consumidores).

y (mi énfasis)

No hay tiempo de espera en espera de acks. Por lo general, eso no es un problema, ya que los casos comunes de falta de reconocimiento (falla de la red o del cliente) provocarán la caída de la conexión (y, por lo tanto, desencadenarán el comportamiento descrito anteriormente). Aún así, un tiempo de espera podría ser útil para, digamos, tratar con consumidores vivos pero que no responden . Eso ha surgido en discusión antes. ¿Hay algún caso de uso específico en mente que requiera tal funcionalidad?

El problema podría estar ocurriendo debido a que en un modelo de extracción de clientes, es más difícil para el servidor detectar una conexión rota (a diferencia de un consumidor vivo pero que no responde), especialmente porque el servidor parece feliz de esperar por un acuse de recibo.

Actualización: En Linux, puede adjuntar controladores de señal para SIGTERM y / o SIGKILL y / o SIGINT y, con suerte, cerrar la conexión de forma ordenada desde el cliente. En Windows, creo que cerrar desde el Administrador de tareas invoca la API de TerminateProcess Win32, sobre lo que MSDN dice:

Si TerminateProcess finaliza un proceso, todos los subprocesos del proceso finalizan de inmediato sin posibilidad de ejecutar código adicional. Esto significa que el subproceso no ejecuta código en bloques de manejador de terminación. Además, no se notifica a ninguna DLL adjunta que el proceso se está separando.

Esto significa que puede ser difícil atrapar la terminación y cerrar de forma ordenada.

Podría valer la pena seguir en la lista de RabbitMQ con su propio caso de uso para un tiempo de espera de acuse de recibo.


No veo amqp_consumer.py o amqp_producer.py en el amqp_consumer.py amqp_producer.py , por lo que reproducir la falla es complicado.

RabbitMQ finaliza las conexiones, liberando sus mensajes no reconocidos para su reenvío a otros clientes, siempre que el sistema operativo indique que un socket se ha cerrado. Sus síntomas son muy extraños, ya que incluso un kill -9 debería hacer que el socket TCP se limpie correctamente.

Algunas personas han notado problemas con los sockets que sobreviven más tiempo del que deberían cuando se ejecutan con un firewall o dispositivo NAT entre los clientes de AMQP y el servidor. ¿Podría ser un problema aquí, o está ejecutando todo en localhost? Además, ¿en qué sistema operativo está ejecutando los diversos componentes del sistema?

ETA: De su comentario a continuación, supongo que mientras ejecuta el servidor en Linux, puede ejecutar los clientes en Windows. Si este es el caso, podría ser que el controlador de Windows TCP no esté cerrando los sockets correctamente, lo que es diferente del comportamiento de kill-9 en Unix. (En Unix, el kernel cerrará correctamente las conexiones TCP en cualquier proceso muerto).

Si ese es el caso, entonces la mala noticia es que RabbitMQ solo puede liberar recursos cuando el socket está cerrado, por lo que si el sistema operativo del cliente no lo hace, no hay nada que pueda hacer. Esto es lo mismo que casi todos los demás servicios basados ​​en TCP que existen.

Sin embargo, la buena noticia es que AMQP admite una opción de "latido del corazón" para estos casos, en los que el tejido de red no es confiable. Podrías intentar habilitar los latidos del corazón. Cuando están habilitados, si el servidor no recibe ningún tráfico dentro de un intervalo configurable, decide que la conexión debe estar muerta.

La mala noticia , sin embargo, es que no creo que py-amqplib sea compatible con los latidos del corazón en este momento. Vale la pena intentarlo, sin embargo!