start queues manager how exchange create commands check and rabbitmq amqp

queues - RabbitMQ: mensaje persistente con intercambio de tema



rabbitmqctl rabbitmqadmin (2)

Soy muy nuevo en RabbitMQ.

He configurado un intercambio de ''temas''. Los consumidores pueden comenzar después del editor. Me gustaría que los consumidores puedan recibir mensajes que se han enviado antes de que estuvieran activos y que aún no se hayan consumido.

El intercambio está configurado con los siguientes parámetros:

exchange_type => ''topic'' durable => 1 auto_delete => 0 passive => 0

Los mensajes se publican con este parámetro:

delivery_mode => 2

Los consumidores usan get () para recuperar los mensajes del intercambio.

Lamentablemente, cualquier mensaje publicado antes de que se haya terminado el cliente se pierde. He usado diferentes combinaciones.

Supongo que mi problema es que el intercambio no contiene mensajes. Tal vez necesito tener una cola entre el editor y la cola. Pero esto no parece funcionar con un intercambio de ''temas'' donde los mensajes son enrutados por una clave.

Alguna idea de cómo debería proceder. Uso el enlace de Perl Net :: RabbitMQ (no debería importar) y RabbitMQ 2.2.0.


Como lo menciona Brian, un intercambio no almacena mensajes y es principalmente responsable de enrutar mensajes a otro / a intercambio / s o cola / s. Si el intercambio no está vinculado a una cola, todos los mensajes enviados a ese intercambio se perderán

No debería necesitar declarar colas de clientes fijas en el script del publicador, ya que esto podría no ser escalable. Las colas pueden ser creadas dinámicamente por sus editores y enrutadas internamente mediante el enlace de intercambio a intercambio.

RabbitMQ admite enlaces de intercambio a intercambio que permitirán flexibilidad de topología, desacoplamiento y otros beneficios. Puede leer más aquí en RabbitMQ Exchange to Exchange Bindings [AMPQ]

RabbitMQ Exchange a enlace de Exchange

Ejemplo de código de Python para crear enlace de intercambio a intercambio con persistencia si no hay un consumidor presente utilizando la cola.

#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host=''localhost'')) channel = connection.channel() #Declares the entry exchange to be used by all producers to send messages. Could be external producers as well channel.exchange_declare(exchange=''data_gateway'', exchange_type=''fanout'', durable=True, auto_delete=False) #Declares the processing exchange to be used.Routes messages to various queues. For internal use only channel.exchange_declare(exchange=''data_distributor'', exchange_type=''topic'', durable=True, auto_delete=False) #Binds the external/producer facing exchange to the internal exchange channel.exchange_bind(destination=''data_distributor'',source=''data_gateway'') ##Create Durable Queues binded to the data_distributor exchange channel.queue_declare(queue=''trade_db'',durable=True) channel.queue_declare(queue=''trade_stream_service'',durable=True) channel.queue_declare(queue=''ticker_db'',durable=True) channel.queue_declare(queue=''ticker_stream_service'',durable=True) channel.queue_declare(queue=''orderbook_db'',durable=True) channel.queue_declare(queue=''orderbook_stream_service'',durable=True) #Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present channel.queue_bind(queue=''orderbook_db'',exchange=''data_distributor'',routing_key=''*.*.orderbook'') channel.queue_bind(queue=''orderbook_stream_service'',exchange=''data_distributor'',routing_key=''*.*.orderbook'') channel.queue_bind(queue=''ticker_db'',exchange=''data_distributor'',routing_key=''*.*.ticker'') channel.queue_bind(queue=''ticker_stream_service'',exchange=''data_distributor'',routing_key=''*.*.ticker'') channel.queue_bind(queue=''trade_db'',exchange=''data_distributor'',routing_key=''*.*.trade'') channel.queue_bind(queue=''trade_stream_service'',exchange=''data_distributor'',routing_key=''*.*.trade'')


Necesita una cola duradera para almacenar mensajes si no hay consumidores conectados disponibles para procesar los mensajes en el momento en que se publican.

Un intercambio no almacena mensajes, pero una cola puede. La parte confusa es que los intercambios se pueden marcar como "duraderos", pero lo único que realmente significa es que el intercambio en sí seguirá estando allí si reinicia su intermediario, pero eso no significa que los mensajes enviados a ese intercambio se conservarán automáticamente.

Dado que, aquí hay dos opciones:

  1. Realice un paso administrativo antes de iniciar sus editores para crear la (s) cola (s) usted mismo. Puede usar la interfaz de usuario web o las herramientas de línea de comando para hacer esto. Asegúrese de crearlo como una cola duradera para que almacene los mensajes que se enrutan a él, incluso si no hay consumidores activos.
  2. Suponiendo que sus consumidores estén codificados para declarar (y, por lo tanto, crear) automáticamente sus intercambios y colas al inicio (y que los declaran como duraderos), simplemente ejecute a todos sus consumidores al menos una vez antes de comenzar a publicar. Eso asegurará que todas sus colas se creen correctamente. Luego puede cerrar a los consumidores hasta que realmente los necesiten porque las colas almacenarán de forma persistente cualquier mensaje futuro enviado a ellos.

Yo iría por el # 1. Puede que no haya muchos pasos para realizar y siempre puede crear los pasos necesarios para que puedan repetirse. Además, si todos los consumidores van a sacar de la misma cola (en lugar de tener una cola dedicada cada uno), en realidad se trata de un gasto administrativo mínimo.

Las colas son algo que debe administrarse y controlarse adecuadamente. De lo contrario, podría terminar con consumidores deshonestos que declaren colas duraderas, utilizándolas durante unos minutos, pero nunca más. Poco después tendrá una cola de crecimiento permanente sin nada que reduzca su tamaño, y un inminente apocalipsis de intermediario.