tutorial parallel rabbitmq

parallel - rabbitmq python



Mensaje retrasado en RabbitMQ (7)

¿Es posible enviar un mensaje a través de RabbitMQ con algún retraso? Por ejemplo, quiero caducar la sesión del cliente después de 30 minutos y envío un mensaje que se procesará después de 30 minutos.


Supongamos que usted tuviera control sobre el consumidor, podría lograr la demora en el consumidor de esta manera:

Si estamos seguros de que el enésimo mensaje en la cola siempre tiene un retardo menor que el n + 1º mensaje (esto puede ser cierto para muchos casos de uso): el productor envía timeInformation en la tarea que transmite el momento en que se necesita ejecutar este trabajo (currentTime + delay). El consumidor:

1) Lee el tiempo programado de la tarea

2) si currentTime> scheduleTime sigue adelante.

Else delay = scheduleTime - currentTime

dormir por el tiempo indicado por retraso

El consumidor siempre está configurado con un parámetro de concurrencia. Por lo tanto, los otros mensajes solo esperarán en la cola hasta que un consumidor finalice el trabajo. Entonces, esta solución podría funcionar bien aunque parezca incómoda, especialmente para grandes retrasos.


Como no tengo suficiente reputación para agregar comentarios, publicar una nueva respuesta. Esto es solo una adición a lo que ya se ha discutido en http://www.javacodegeeks.com/2012/04/rabbitmq-scheduled-message-delivery.html

Excepto en lugar de configurar ttl en los mensajes, puede configurarlo en el nivel de cola. También puedes evitar crear un nuevo intercambio solo por redirigir los mensajes a una Cola diferente. Aquí está el código Java de ejemplo:

Productor:

import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.HashMap; import java.util.Map; public class DelayedProducer { private final static String QUEUE_NAME = "ParkingQueue"; private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; public static void main(String[] args) throws Exception{ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 10000); arguments.put("x-dead-letter-exchange", ""); arguments.put("x-dead-letter-routing-key", DESTINATION_QUEUE_NAME ); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); for (int i=0; i<5; i++) { String message = "This is a sample message " + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("message "+i+" got published to the queue!"); Thread.sleep(3000); } channel.close(); connection.close(); } }

Consumidor:

import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Consumer { private final static String DESTINATION_QUEUE_NAME = "DestinationQueue"; public static void main(String[] args) throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume(DESTINATION_QUEUE_NAME, autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received ''" + message + "''"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }



Eso no es posible actualmente. Debe almacenar sus marcas de tiempo de caducidad en una base de datos o algo similar, y luego tener un programa de ayuda que lea esas marcas de tiempo y ponga en cola un mensaje.

Los mensajes demorados son una característica que se solicita con frecuencia, ya que son útiles en muchas situaciones. Sin embargo, si su necesidad es vencer las sesiones del cliente, creo que la mensajería no es la solución ideal para usted y que otro enfoque podría funcionar mejor.


Gracias a la respuesta de Norman, pude implementarlo en NodeJS.

Todo está bastante claro a partir del código. Espero que le ahorre el tiempo a alguien.

var ch = channel; ch.assertExchange("my_intermediate_exchange", ''fanout'', {durable: false}); ch.assertExchange("my_final_delayed_exchange", ''fanout'', {durable: false}); // setup intermediate queue which will never be listened. // all messages are TTLed so when they are "dead", they come to another exchange ch.assertQueue("my_intermediate_queue", { deadLetterExchange: "my_final_delayed_exchange", messageTtl: 5000, // 5sec }, function (err, q) { ch.bindQueue(q.queue, "my_intermediate_exchange", ''''); }); ch.assertQueue("my_final_delayed_queue", {}, function (err, q) { ch.bindQueue(q.queue, "my_final_delayed_exchange", ''''); ch.consume(q.queue, function (msg) { console.log("delayed - [x] %s", msg.content.toString()); }, {noAck: true}); });


Hay dos enfoques que puedes probar:

Enfoque anterior: establezca el encabezado TTL (tiempo de vida) en cada mensaje / cola (política) y luego introduzca un DLQ para manejarlo. Una vez que el ttl haya expirado, sus mensajes pasarán de DLQ a la cola principal para que su oyente pueda procesarlo.

Último enfoque: recientemente RabbitMQ creó el complemento de mensajes demorados de RabbitMQ , mediante el cual puede lograr lo mismo y este soporte disponible desde RabbitMQ-3.5.8.

Puede declarar un intercambio con el tipo x-mensaje retrasado y luego publicar mensajes con el encabezado personalizado x-delay expresando en milisegundos un tiempo de retraso para el mensaje. El mensaje se enviará a las colas respectivas después de milisegundos de retraso x

byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8"); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-delay", 5000); AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers); channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);

Más aquí: github.com/rabbitmq/rabbitmq-delayed-message-exchange


Parece que esta publicación del blog describe el uso del intercambio de letras muertas y el mensaje ttl para hacer algo similar.

El código siguiente utiliza CoffeeScript y Node.JS para acceder a Rabbit e implementar algo similar.

amqp = require ''amqp'' events = require ''events'' em = new events.EventEmitter() conn = amqp.createConnection() key = "send.later.#{new Date().getTime()}" conn.on ''ready'', -> conn.queue key, { arguments:{ "x-dead-letter-exchange":"immediate" , "x-message-ttl": 5000 , "x-expires": 6000 } }, -> conn.publish key, {v:1}, {contentType:''application/json''} conn.exchange ''immediate'' conn.queue ''right.now.queue'', { autoDelete: false , durable: true }, (q) -> q.bind(''immediate'', ''right.now.queue'') q.subscribe (msg, headers, deliveryInfo) -> console.log msg console.log headers