tutorial rabbit node how examples assertqueue and node.js messaging rabbitmq amqp node-amqp

node.js - node - rabbitmq javascript api



RabbitMQ/AMQP: cola Ășnica, mĂșltiples consumidores para el mismo mensaje? (10)

Estoy comenzando a usar RabbitMQ y AMQP en general.

  • Tengo una cola de mensajes
  • Tengo múltiples consumidores, y me gustaría hacer cosas diferentes con el mismo mensaje .

La mayoría de la documentación de RabbitMQ parece estar enfocada en round-robin, es decir, donde un solo mensaje es consumido por un solo consumidor, y la carga se distribuye entre cada consumidor. Este es de hecho el comportamiento que presencio.

Un ejemplo: el productor tiene una sola cola y envía mensajes cada 2 segundos:

var amqp = require(''amqp''); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on(''ready'', function () { var sendMessage = function(connection, queue_name, payload) { var encoded_payload = JSON.stringify(payload); connection.publish(queue_name, encoded_payload); } setInterval( function() { var test_message = ''TEST ''+count sendMessage(connection, "my_queue_name", test_message) count += 1; }, 2000) })

Y aquí hay un consumidor:

var amqp = require(''amqp''); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); connection.on(''ready'', function () { connection.queue("my_queue_name", function(queue){ queue.bind(''#''); queue.subscribe(function (message) { var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log(''Recieved a message:'') console.log(payload) }) }) })

Si empiezo el consumidor dos veces, puedo ver que cada consumidor está consumiendo mensajes alternativos en el comportamiento de todos contra todos. Por ejemplo, veré los mensajes 1, 3, 5 en un terminal, 2, 4, 6 en el otro .

Mi pregunta es:

  • ¿Puedo hacer que cada consumidor reciba los mismos mensajes? Es decir, ¿ambos consumidores reciben los mensajes 1, 2, 3, 4, 5, 6? ¿Cómo se llama esto en AMQP / RabbitMQ? ¿Cómo se configura normalmente?

  • ¿Esto se hace comúnmente? ¿Debo tener la ruta de intercambio del mensaje en dos colas separadas, con un solo consumidor, en su lugar?


Creo que deberías verificar el envío de tus mensajes usando el intercambiador de abanicos . De esa forma, recibirás el mismo mensaje para diferentes consumidores, bajo la mesa RabbitMQ está creando diferentes colas para cada uno de estos nuevos consumidores / suscriptores.

Este es el enlace para ver el ejemplo del tutorial en javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html


Cuando evalúo tu caso es:

  • Tengo una cola de mensajes (su fuente para recibir mensajes, digamos q111)

  • Tengo múltiples consumidores, y me gustaría hacer cosas diferentes con el mismo mensaje.

Su problema aquí es cuando 3 mensajes son recibidos por esta cola, el mensaje 1 es consumido por un consumidor A, otros consumidores B y C consumen mensajes 2 y 3. Donde usted necesita una configuración donde rabbitmq transfiere las mismas copias de todos estos tres mensajes (1,2,3) a los tres consumidores conectados (A, B, C) simultáneamente.

Si bien se pueden hacer muchas configuraciones para lograr esto, una forma simple es usar el siguiente concepto de dos pasos:

  • Utilice un rabbitmq-shovel dinámico para recoger mensajes de la cola deseada (q111) y publíquelos en un intercambio de fanout (intercambio creado y dedicado exclusivamente para este fin).
  • Ahora reconfigure a sus consumidores A, B y C (que estaban escuchando la cola (q111)) para escuchar de este intercambio de Fanout directamente usando una cola exclusiva y anónima para cada consumidor.

Nota: Al utilizar este concepto, no consuma directamente de la cola de origen (q111), ya que los mensajes ya consumidos no se cargarán en su intercambio de Fanout.

Si cree que esto no satisface su requisito exacto ... siéntase libre de publicar sus sugerencias :-)



Las últimas dos respuestas son casi correctas. Tengo toneladas de aplicaciones que generan mensajes que deben terminar en diferentes consumidores, por lo que el proceso es muy simple.

Si desea que varios consumidores hagan el mismo mensaje, realice el siguiente procedimiento.

Cree varias colas, una para cada aplicación que recibirá el mensaje, en cada una de las propiedades de la cola, "enlazar" una etiqueta de enrutamiento con el intercambio amq.direct. Cambia tu aplicación de publicación para enviarla a amq.direct y utiliza la etiqueta de enrutamiento (no una cola). AMQP luego copiará el mensaje en cada cola con el mismo enlace. Funciona un encanto :)

Ejemplo: Digamos que tengo una cadena JSON que genero, la publico en el intercambio "amq.direct" usando la etiqueta de enrutamiento "pedido de nueva venta", tengo una cola para mi aplicación order_printer que imprime orden, tengo una hacer cola para mi sistema de facturación que enviará una copia del pedido y facturará al cliente y tengo un sistema de archivo web donde archivé pedidos por razones históricas / de cumplimiento y tengo una interfaz web de cliente donde se siguen las órdenes a medida que llega información sobre una orden.

Así que mis colas son: order_printer, order_billing, order_archive y order_tracking. Todas tienen la etiqueta de enlace "new-sales-order" unida a ellas, las 4 obtendrán los datos JSON.

Esta es una forma ideal de enviar datos sin que la aplicación de publicación sepa o no se preocupe por las aplicaciones que reciben.


Para obtener el comportamiento que desea, simplemente haga que cada consumidor consuma desde su propia cola. Tendrá que usar un tipo de intercambio no directo (tema, encabezado, fanout) para enviar el mensaje a todas las colas a la vez.


RabbitMQ / AMQP: cola única, múltiples consumidores para el mismo mensaje y actualización de página.

rabbit.on(''ready'', function () { }); sockjs_chat.on(''connection'', function (conn) { conn.on(''data'', function (message) { try { var obj = JSON.parse(message.replace(//r/g, '''').replace(//n/g, '''')); if (obj.header == "register") { // Connect to RabbitMQ try { conn.exchange = rabbit.exchange(exchange, { type: ''topic'', autoDelete: false, durable: false, exclusive: false, confirm: true }); conn.q = rabbit.queue(''my-queue-''+obj.agentID, { durable: false, autoDelete: false, exclusive: false }, function () { conn.channel = ''my-queue-''+obj.agentID; conn.q.bind(conn.exchange, conn.channel); conn.q.subscribe(function (message) { console.log("[MSG] ---> " + JSON.stringify(message)); conn.write(JSON.stringify(message) + "/n"); }).addCallback(function(ok) { ctag[conn.channel] = ok.consumerTag; }); }); } catch (err) { console.log("Could not create connection to RabbitMQ. /nStack trace -->" + err.stack); } } else if (obj.header == "typing") { var reply = { type: ''chatMsg'', msg: utils.escp(obj.msga), visitorNick: obj.channel, customField1: '''', time: utils.getDateTime(), channel: obj.channel }; conn.exchange.publish(''my-queue-''+obj.agentID, reply); } } catch (err) { console.log("ERROR ----> " + err.stack); } }); // When the visitor closes or reloads a page we need to unbind from RabbitMQ? conn.on(''close'', function () { try { // Close the socket conn.close(); // Close RabbitMQ conn.q.unsubscribe(ctag[conn.channel]); } catch (er) { console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack); } }); });


Sí, cada consumidor puede recibir los mismos mensajes. eche un vistazo a http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html rabbitmq.com/tutorials/tutorial-five-python.html

para diferentes formas de enrutar mensajes. Sé que son para Python y Java, pero es bueno entender los principios, decidir lo que estás haciendo y luego encontrar cómo hacerlo en JS. Parece que quieres hacer un fanout simple ( tutorial 3 ), que envía los mensajes a todas las colas conectadas al intercambio.

La diferencia con lo que estás haciendo y lo que quieres hacer es básicamente que vas a configurar e intercambiar o escribir fanout. Los excahnges de Fanout envían todos los mensajes a todas las colas conectadas. Cada cola tendrá un consumidor que tendrá acceso a todos los mensajes por separado.

Sí, esto se hace comúnmente, es una de las características de AMPQ.



Solo lea el tutorial de rabbitmq . Usted publica un mensaje para intercambiar, no para hacer cola; luego se enruta a colas apropiadas. En su caso, debe unir una cola separada para cada consumidor. De esta forma, pueden consumir mensajes de manera completamente independiente.


¿Puedo hacer que cada consumidor reciba los mismos mensajes? Es decir, ¿ambos consumidores reciben los mensajes 1, 2, 3, 4, 5, 6? ¿Cómo se llama esto en AMQP / RabbitMQ? ¿Cómo se configura normalmente?

No, no si los consumidores están en la misma fila. De la guía de conceptos AMQP de RabbitMQ:

es importante comprender que, en AMQP 0-9-1, los mensajes se equilibran de carga entre los consumidores.

Esto parece implicar que el comportamiento round-robin dentro de una cola es un hecho , y no configurable. Es decir, se requieren colas separadas para que muchos usuarios comprendan el mismo ID de mensaje.

¿Esto se hace comúnmente? ¿Debo tener la ruta de intercambio del mensaje en dos colas separadas, con un solo consumidor, en su lugar?

No, no es así, una sola cola / múltiples consumidores, cada uno de los cuales maneja el mismo ID de mensaje no es posible. Tener la ruta de intercambio en dos colas separadas es mucho mejor.

Como no requiero un enrutamiento demasiado complejo, un intercambio de fanouts manejará esto muy bien. No me enfoqué demasiado en los intercambios anteriores, ya que node-amqp tiene el concepto de un "intercambio predeterminado" que le permite publicar mensajes en una conexión directamente, sin embargo, la mayoría de los mensajes AMQP se publican en un intercambio específico.

Aquí está mi intercambio de fanouts, tanto de envío como de recepción:

var amqp = require(''amqp''); var connection = amqp.createConnection({ host: "localhost", port: 5672 }); var count = 1; connection.on(''ready'', function () { connection.exchange("my_exchange", options={type:''fanout''}, function(exchange) { var sendMessage = function(exchange, payload) { console.log(''about to publish'') var encoded_payload = JSON.stringify(payload); exchange.publish('''', encoded_payload, {}) } // Recieve messages connection.queue("my_queue_name", function(queue){ console.log(''Created queue'') queue.bind(exchange, ''''); queue.subscribe(function (message) { console.log(''subscribed to queue'') var encoded_payload = unescape(message.data) var payload = JSON.parse(encoded_payload) console.log(''Recieved a message:'') console.log(payload) }) }) setInterval( function() { var test_message = ''TEST ''+count sendMessage(exchange, test_message) count += 1; }, 2000) }) })