ruby - node - AMQP crea suscripciones a colas dinĂ¡micamente
rabbitmq message (2)
En el primer vistazo, los bits AMQP parecen estar bien, pero no quiero configurar todas las dependencias. Si proporciona un ejemplo mínimo con solo la parte de AMQP, lo verificaré.
Intento crear una aplicación de chat simple usando AMQP, Websockets y Ruby. Entiendo que este puede no ser el mejor caso de uso para entender AMQP, pero me gustaría entender dónde me estoy equivocando.
El siguiente es mi código amqp-server
require ''rubygems''
require ''amqp''
require ''mongo''
require ''em-websocket''
require ''json''
class MessageParser
# message format => "room:harry_potter, nickname:siddharth, room:members"
def self.parse(message)
parsed_message = JSON.parse(message)
response = {}
if parsed_message[''status''] == ''status''
response[:status] = ''STATUS''
response[:username] = parsed_message[''username'']
response[:roomname] = parsed_message[''roomname'']
elsif parsed_message[''status''] == ''message''
response[:status] = ''MESSAGE''
response[:message] = parsed_message[''message'']
response[:roomname] = parsed_message[''roomname''].split().join(''_'')
end
response
end
end
class MongoManager
def self.establish_connection(database)
@db ||= Mongo::Connection.new(''localhost'', 27017).db(database)
@db.collection(''rooms'')
@db
end
end
@sockets = []
EventMachine.run do
connection = AMQP.connect(:host => ''127.0.0.1'')
channel = AMQP::Channel.new(connection)
puts "Connected to AMQP broker. #{AMQP::VERSION} "
mongo = MongoManager.establish_connection("trackertalk_development")
EventMachine::WebSocket.start(:host => ''127.0.0.1'', :port => 8080) do |ws|
socket_detail = {:socket => ws}
ws.onopen do
@sockets << socket_detail
end
ws.onmessage do |message|
status = MessageParser.parse(message)
exchange = channel.fanout(status[:roomname].split().join(''_''))
if status[:status] == ''STATUS''
queue = channel.queue(status[:username], :durable => true)
unless queue.subscribed?
puts "--------- SUBSCRIBED --------------"
queue.bind(exchange).subscribe do |payload|
puts "PAYLOAD : #{payload}"
ws.send(payload)
end
else
puts "----ALREADY SUBSCRIBED"
end
# only after 0.8.0rc14
#queue = channel.queue(status[:username], :durable => true)
#AMQP::Consumer.new(channel, queue)
elsif status[:status] == ''MESSAGE''
puts "********************* Message- published ******************************"
exchange.publish(status[:message)
end
end
ws.onclose do
@sockets.delete ws
end
end
end
Utilizo el estado para indicar si el mensaje entrante es un mensaje para chat en curso o para un mensaje de estado que me requiere que haga tareas domésticas, como suscribirse a la cola.
El problema que enfrento es que cuando envío un mensaje como socket.send(JSON.stringify({status:''message'', message:''test'', roomname:''Harry Potter''}))
Se exchange.publish'' is called but it still doesn''t get pushed via the
envía a exchange.publish'' is called but it still doesn''t get pushed via the
ws.send` al navegador.
¿Hay algo fundamentalmente erróneo en mi comprensión de EventMachine y AMQP?
Aquí está el pastie para el mismo código http://pastie.org/private/xosgb8tw1w5vuroa4w7a
Mi código parece funcionar como se desea cuando elimino durable => true
de queue = channel.queue(status[:username], :durable => true)
El siguiente es un fragmento de mi vista Rails que identifica el nombre de usuario y el nombre de la habitación y lo envía como parte del mensaje a través de Websockets.
Aunque el código parece funcionar cuando elimino durable => true
, no entiendo por qué eso afecta el mensaje que se está entregando. Ignora la parte de mongo ya que todavía no reproduce ninguna parte.
También me gustaría saber si mi enfoque de AMQP y su uso es correcto
<script>
$(document).ready(function(){
var username = ''<%= @user.email %>'';
var roomname = ''Bazingaa'';
socket = new WebSocket(''ws://127.0.0.1:8080/'');
socket.onopen = function(msg){
console.log(''connected'');
socket.send(JSON.stringify({status:''status'', username:username, roomname:roomname}));
}
socket.onmessage = function(msg){
$(''#chat-log'').append(msg.data);
}
});
</script>
<div class=''block''>
<div class=''content''>
<h2 class=''title''><%= @room.name %></h2>
<div class=''inner''>
<div id="chat-log">
</div>
<div id="chat-console">
<textarea rows="5" cols="40"></textarea>
</div>
</div>
</div>
</div>
<style>
#chat-log{
color:#000;
font-weight:bold;
margin-top:1em;
width:900px;
overflow:auto;
height:300px;
}
#chat-console{
bottom:10px;
}
textarea{
width:100%;
height:60px;
}
</style>
Creo que su problema podría ser la cola que se cuelga en el intermediario entre las invocaciones de ws.onmessage. Cuando el cliente vuelve a conectar la cola y ya existe el enlace, no se llama a ws.send ().
De forma predeterminada, al crear una cola, ésta y cualquier vinculación que tenga, se mantiene hasta que el intermediario se reinicie, o le dice explícitamente al intermediario que la elimine.
Hay dos formas de cambiar esto:
- Agregar el indicador duradero al crear la cola, lo que hará que la cola permanezca aunque el broker reinicie
- Agregar el indicador auto_delete , que hará que el intermediario elimine automáticamente la entidad después de un corto período de tiempo sin tener un consumidor conectado a él
Si tiene control sobre el intermediario que está utilizando el corredor rabbitmq, una forma fácil de introspección de lo que está sucediendo en el intermediario es instalar el complemento de administración , que proporciona una interfaz web para intercambios, enlaces y colas en el intermediario.