started node manager downloads ruby websocket amqp

ruby - node - AMQP crea suscripciones a colas dinĂ¡micamente



rabbitmq message (2)

En el primer vistazo, los bits AMQP parecen estar bien, pero no quiero configurar todas las dependencias. Si proporciona un ejemplo mínimo con solo la parte de AMQP, lo verificaré.

Intento crear una aplicación de chat simple usando AMQP, Websockets y Ruby. Entiendo que este puede no ser el mejor caso de uso para entender AMQP, pero me gustaría entender dónde me estoy equivocando.

El siguiente es mi código amqp-server

require ''rubygems'' require ''amqp'' require ''mongo'' require ''em-websocket'' require ''json'' class MessageParser # message format => "room:harry_potter, nickname:siddharth, room:members" def self.parse(message) parsed_message = JSON.parse(message) response = {} if parsed_message[''status''] == ''status'' response[:status] = ''STATUS'' response[:username] = parsed_message[''username''] response[:roomname] = parsed_message[''roomname''] elsif parsed_message[''status''] == ''message'' response[:status] = ''MESSAGE'' response[:message] = parsed_message[''message''] response[:roomname] = parsed_message[''roomname''].split().join(''_'') end response end end class MongoManager def self.establish_connection(database) @db ||= Mongo::Connection.new(''localhost'', 27017).db(database) @db.collection(''rooms'') @db end end @sockets = [] EventMachine.run do connection = AMQP.connect(:host => ''127.0.0.1'') channel = AMQP::Channel.new(connection) puts "Connected to AMQP broker. #{AMQP::VERSION} " mongo = MongoManager.establish_connection("trackertalk_development") EventMachine::WebSocket.start(:host => ''127.0.0.1'', :port => 8080) do |ws| socket_detail = {:socket => ws} ws.onopen do @sockets << socket_detail end ws.onmessage do |message| status = MessageParser.parse(message) exchange = channel.fanout(status[:roomname].split().join(''_'')) if status[:status] == ''STATUS'' queue = channel.queue(status[:username], :durable => true) unless queue.subscribed? puts "--------- SUBSCRIBED --------------" queue.bind(exchange).subscribe do |payload| puts "PAYLOAD : #{payload}" ws.send(payload) end else puts "----ALREADY SUBSCRIBED" end # only after 0.8.0rc14 #queue = channel.queue(status[:username], :durable => true) #AMQP::Consumer.new(channel, queue) elsif status[:status] == ''MESSAGE'' puts "********************* Message- published ******************************" exchange.publish(status[:message) end end ws.onclose do @sockets.delete ws end end end

Utilizo el estado para indicar si el mensaje entrante es un mensaje para chat en curso o para un mensaje de estado que me requiere que haga tareas domésticas, como suscribirse a la cola.

El problema que enfrento es que cuando envío un mensaje como socket.send(JSON.stringify({status:''message'', message:''test'', roomname:''Harry Potter''}))

Se exchange.publish'' is called but it still doesn''t get pushed via the envía a exchange.publish'' is called but it still doesn''t get pushed via the ws.send` al navegador.

¿Hay algo fundamentalmente erróneo en mi comprensión de EventMachine y AMQP?

Aquí está el pastie para el mismo código http://pastie.org/private/xosgb8tw1w5vuroa4w7a

Mi código parece funcionar como se desea cuando elimino durable => true de queue = channel.queue(status[:username], :durable => true)

El siguiente es un fragmento de mi vista Rails que identifica el nombre de usuario y el nombre de la habitación y lo envía como parte del mensaje a través de Websockets.

Aunque el código parece funcionar cuando elimino durable => true , no entiendo por qué eso afecta el mensaje que se está entregando. Ignora la parte de mongo ya que todavía no reproduce ninguna parte.

También me gustaría saber si mi enfoque de AMQP y su uso es correcto

<script> $(document).ready(function(){ var username = ''<%= @user.email %>''; var roomname = ''Bazingaa''; socket = new WebSocket(''ws://127.0.0.1:8080/''); socket.onopen = function(msg){ console.log(''connected''); socket.send(JSON.stringify({status:''status'', username:username, roomname:roomname})); } socket.onmessage = function(msg){ $(''#chat-log'').append(msg.data); } }); </script> <div class=''block''> <div class=''content''> <h2 class=''title''><%= @room.name %></h2> <div class=''inner''> <div id="chat-log"> </div> <div id="chat-console"> <textarea rows="5" cols="40"></textarea> </div> </div> </div> </div> <style> #chat-log{ color:#000; font-weight:bold; margin-top:1em; width:900px; overflow:auto; height:300px; } #chat-console{ bottom:10px; } textarea{ width:100%; height:60px; } </style>


Creo que su problema podría ser la cola que se cuelga en el intermediario entre las invocaciones de ws.onmessage. Cuando el cliente vuelve a conectar la cola y ya existe el enlace, no se llama a ws.send ().

De forma predeterminada, al crear una cola, ésta y cualquier vinculación que tenga, se mantiene hasta que el intermediario se reinicie, o le dice explícitamente al intermediario que la elimine.

Hay dos formas de cambiar esto:

  • Agregar el indicador duradero al crear la cola, lo que hará que la cola permanezca aunque el broker reinicie
  • Agregar el indicador auto_delete , que hará que el intermediario elimine automáticamente la entidad después de un corto período de tiempo sin tener un consumidor conectado a él

Si tiene control sobre el intermediario que está utilizando el corredor rabbitmq, una forma fácil de introspección de lo que está sucediendo en el intermediario es instalar el complemento de administración , que proporciona una interfaz web para intercambios, enlaces y colas en el intermediario.