tutorial topic queues parallel multiple exchange español python queue rabbitmq delay pika

python - queues - rabbitmq topic



¿Cómo crear una cola retrasada en RabbitMQ? (5)

¿Cuál es la forma más fácil de crear una cola de espera (o estacionamiento) con Python, Pika y RabbitMQ? He visto questions similares, pero ninguna para Python.

Me parece una idea útil al diseñar aplicaciones, ya que nos permite acelerar los mensajes que deben volver a colarse.

Siempre existe la posibilidad de que reciba más mensajes de los que puede manejar, tal vez el servidor HTTP sea lento o la base de datos se encuentre bajo demasiado estrés.

También me pareció muy útil cuando algo salió mal en los escenarios donde hay una tolerancia cero a la pérdida de mensajes, y al volver a poner en cola los mensajes que no pudieron ser manejados puede solucionar eso. También puede causar problemas donde el mensaje se pondrá en cola una y otra vez. Posiblemente causa problemas de rendimiento y registra spam.


El mensaje en la cola de Conejo se puede retrasar de 2 maneras - utilizando TIEMPO DE COLA - usando el Mensaje TTL. Si todos los mensajes en la cola se retrasarán para el tiempo fijo, utilice la cola TTL. Si cada mensaje tiene que retrasarse por un tiempo variado, use el mensaje TTL. Lo he explicado usando el módulo python3 y pika. El argumento pika BasicProperties ''expiration'' en milisegundos tiene que establecerse para retrasar el mensaje en la cola de espera. Después de establecer el tiempo de caducidad, publique el mensaje en una cola demorada ("cola no real donde los consumidores esperan consumir"), una vez que el mensaje en la cola demorada caduque, el mensaje se enrutará a una cola real utilizando el intercambio ''amq.direct''

def delay_publish(self, messages, queue, headers=None, expiration=0): """ Connect to RabbitMQ and publish messages to the queue Args: queue (string): queue name messages (list or single item): messages to publish to rabbit queue expiration(int): TTL in milliseconds for message """ delay_queue = "".join([queue, "_delay"]) logging.info(''Publishing To Queue: {queue}''.format(queue=delay_queue)) logging.info(''Connecting to RabbitMQ: {host}''.format( host=self.rabbit_host)) credentials = pika.PlainCredentials( RABBIT_MQ_USER, RABBIT_MQ_PASS) parameters = pika.ConnectionParameters( rabbit_host, RABBIT_MQ_PORT, RABBIT_MQ_VHOST, credentials, heartbeat_interval=0) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=queue, durable=True) channel.queue_bind(exchange=''amq.direct'', queue=queue) delay_channel = connection.channel() delay_channel.queue_declare(queue=delay_queue, durable=True, arguments={ ''x-dead-letter-exchange'': ''amq.direct'', ''x-dead-letter-routing-key'': queue }) properties = pika.BasicProperties( delivery_mode=2, headers=headers, expiration=str(expiration)) if type(messages) not in (list, tuple): messages = [messages] try: for message in messages: try: json_data = json.dumps(message) except Exception as err: logging.error( ''Error Jsonify Payload: {err}, {payload}''.format( err=err, payload=repr(message)), exc_info=True ) if (type(message) is dict) and (''data'' in message): message[''data''] = {} message[''error''] = ''Payload Invalid For JSON'' json_data = json.dumps(message) else: raise try: delay_channel.basic_publish( exchange='''', routing_key=delay_queue, body=json_data, properties=properties) except Exception as err: logging.error( ''Error Publishing Data: {err}, {payload}''.format( err=err, payload=json_data), exc_info=True ) raise except Exception: raise finally: logging.info( ''Done Publishing. Closing Connection to {queue}''.format( queue=delay_queue ) ) connection.close()


Encontré esto extremadamente útil al desarrollar mis aplicaciones. Como le da una alternativa a simplemente volver a poner en cola sus mensajes. Esto puede reducir fácilmente la complejidad de su código, y es una de las muchas funciones ocultas poderosas en RabbitMQ.

Pasos

Primero, debemos configurar dos canales básicos, uno para la cola principal y otro para la cola de espera. En mi ejemplo al final, incluyo un par de indicadores adicionales que no son necesarios, pero que hacen que el código sea más confiable; como confirm delivery , delivery_mode y durable . Puede encontrar más información al respecto en el manual RabbitMQ.

Después de que hemos configurado los canales, agregamos un enlace al canal principal que podemos usar para enviar mensajes desde el canal de demora a nuestra cola principal.

channel.queue_bind(exchange=''amq.direct'', queue=''hello'')

A continuación, debemos configurar nuestro canal de demora para reenviar mensajes a la cola principal una vez que hayan expirado.

delay_channel.queue_declare(queue=''hello_delay'', durable=True, arguments={ ''x-message-ttl'' : 5000, ''x-dead-letter-exchange'' : ''amq.direct'', ''x-dead-letter-routing-key'' : ''hello'' })

  • x-message-ttl (Mensaje - Time To Live)

    Esto normalmente se usa para eliminar automáticamente los mensajes antiguos en la cola después de una duración específica, pero al agregar dos argumentos opcionales podemos cambiar este comportamiento, y en su lugar tener este parámetro determinar en milisegundos cuánto tiempo permanecerán los mensajes en la cola de espera.

  • x-dead-letter-routing-key

    Esta variable nos permite transferir el mensaje a una cola diferente una vez que han expirado, en lugar del comportamiento predeterminado de eliminarlo por completo.

  • x-dead-letter-routing-key

    Esta variable determina qué Exchange utilizó para transferir el mensaje de hello_delay a hello queue.

Publicar en la cola de espera

Cuando terminemos de configurar todos los parámetros básicos de Pika, simplemente envíe un mensaje a la cola de espera utilizando la publicación básica.

delay_channel.basic_publish(exchange='''', routing_key=''hello_delay'', body="test", properties=pika.BasicProperties(delivery_mode=2))

Una vez que haya ejecutado el script, debería ver las siguientes colas creadas en su módulo de administración RabbitMQ.

Ejemplo.

import pika connection = pika.BlockingConnection(pika.ConnectionParameters( ''localhost'')) # Create normal ''Hello World'' type channel. channel = connection.channel() channel.confirm_delivery() channel.queue_declare(queue=''hello'', durable=True) # We need to bind this channel to an exchange, that will be used to transfer # messages from our delay queue. channel.queue_bind(exchange=''amq.direct'', queue=''hello'') # Create our delay channel. delay_channel = connection.channel() delay_channel.confirm_delivery() # This is where we declare the delay, and routing for our delay channel. delay_channel.queue_declare(queue=''hello_delay'', durable=True, arguments={ ''x-message-ttl'' : 5000, # Delay until the message is transferred in milliseconds. ''x-dead-letter-exchange'' : ''amq.direct'', # Exchange used to transfer the message from A to B. ''x-dead-letter-routing-key'' : ''hello'' # Name of the queue we want the message transferred to. }) delay_channel.basic_publish(exchange='''', routing_key=''hello_delay'', body="test", properties=pika.BasicProperties(delivery_mode=2)) print " [x] Sent"


FYI, cómo hacer esto en Spring 3.2.x.

<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/> <rabbit:queue-arguments id="delayQueueArguments"> <entry key="x-message-ttl"> <value type="java.lang.Long">10000</value> </entry> <entry key="x-dead-letter-exchange" value="finalDestinationTopic"/> <entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/> </rabbit:queue-arguments> <rabbit:fanout-exchange name="finalDestinationTopic"> <rabbit:bindings> <rabbit:binding queue="finalDestinationQueue"/> </rabbit:bindings> </rabbit:fanout-exchange>


Implementación de NodeJS.

Todo está bastante claro desde el código. Espero que ahorre tiempo a alguien.

var ch = channel; ch.assertExchange("my_intermediate_exchange", ''fanout'', {durable: false}); ch.assertExchange("my_final_delayed_exchange", ''fanout'', {durable: false}); // setup intermediate queue which will never be listened. // all messages are TTLed so when they are "dead", they come to another exchange ch.assertQueue("my_intermediate_queue", { deadLetterExchange: "my_final_delayed_exchange", messageTtl: 5000, // 5sec }, function (err, q) { ch.bindQueue(q.queue, "my_intermediate_exchange", ''''); }); ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { ch.bindQueue(q.queue, "my_final_delayed_exchange", ''''); ch.consume(q.queue, function (msg) { console.log("delayed - [x] %s", msg.content.toString()); }, {noAck: true}); });


Puede usar el complemento oficial de RabbitMQ: x-delay-message .

En primer lugar, descargue y copie el archivo ez en Your_rabbitmq_root_path / plugins

En segundo lugar, habilite el complemento (no es necesario reiniciar el servidor):

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

Finalmente, publique su mensaje con encabezados "x-delay" como:

headers.put("x-delay", 5000);

Darse cuenta:

No garantiza la seguridad de su mensaje, ya que si el mensaje expira justo durante el tiempo de inactividad de su servidor rabbitmq, desafortunadamente el mensaje se pierde. Así que ten cuidado cuando uses este esquema.

Disfrútalo y más información en rabbitmq-delayed-message-exchange