ruby-on-rails multithreading redis ruby-on-rails-4 publish-subscribe

ruby on rails - Redis+ActionController:: Los hilos en vivo no están muriendo



ruby-on-rails multithreading (6)

Antecedentes: hemos creado una función de chat en una de nuestras aplicaciones Rails existentes. Estamos utilizando el nuevo módulo ActionController::Live y ejecutando Puma (con Nginx en producción) y suscribiéndonos a los mensajes a través de Redis. Estamos utilizando el lado del cliente de EventSource para establecer la conexión de forma asincrónica.

Resumen del problema: los hilos nunca se mueren cuando se termina la conexión.

Por ejemplo, si el usuario se va, cierra el navegador o incluso va a una página diferente dentro de la aplicación, se genera un nuevo hilo (como se esperaba), pero el anterior continúa vivo.

El problema, como lo veo ahora, es que cuando ocurre cualquiera de estas situaciones, el servidor no tiene forma de saber si la conexión en el extremo del navegador finaliza, hasta que algo intenta escribir en esta secuencia interrumpida, lo que nunca ocurriría una vez que el navegador se ha alejado de la página original.

Este problema parece estar documentado en github , y preguntas similares se hacen en StackOverflow aquí (la misma pregunta es bastante exacta) y aquí (en cuanto a obtener el número de hilos activos) .

La única solución que he podido encontrar, basada en estas publicaciones, es implementar un tipo de subproceso / conexión de póker. Intentar escribir en una conexión interrumpida genera un IOError que puedo capturar y cerrar la conexión correctamente, lo que permite que el hilo muera. Este es el código de controlador para esa solución:

def events response.headers["Content-Type"] = "text/event-stream" stream_error = false; # used by flusher thread to determine when to stop redis = Redis.new # Subscribe to our events redis.subscribe("message.create", "message.user_list_update") do |on| on.message do |event, data| # when message is received, write to stream response.stream.write("messageType: ''#{event}'', data: #{data}/n/n") end # This is the monitor / connection poker thread # Periodically poke the connection by attempting to write to the stream flusher_thread = Thread.new do while !stream_error $redis.publish "message.create", "flusher_test" sleep 2.seconds end end end rescue IOError logger.info "Stream closed" stream_error = true; ensure logger.info "Events action is quitting redis and closing stream!" redis.quit response.stream.close end

(Nota: el método de events parece bloquearse en la invocación del método de subscribe . Todo lo demás (la transmisión) funciona correctamente, así que asumo que esto es normal).

(Otra nota: el concepto de hilo de enjuague tiene más sentido como un proceso de fondo de larga ejecución, un poco como un colector de hilo de basura. El problema con mi implementación anterior es que se genera un nuevo hilo para cada conexión, lo cual es inútil. intentar implementar este concepto debería hacerlo más como un único proceso, no tanto como lo he esbozado. Actualizaré esta publicación cuando lo vuelva a implementar correctamente como un proceso de fondo único).

La desventaja de esta solución es que solo hemos retrasado o disminuido el problema, no completamente resuelto. Todavía tenemos 2 hilos por usuario, además de otras solicitudes como ajax, lo que parece terrible desde una perspectiva de escalado; parece completamente inalcanzable y poco práctico para un sistema más grande con muchas conexiones simultáneas posibles.

Siento que me estoy perdiendo algo vital; Me resulta un tanto difícil de creer que Rails tiene una característica que está tan obviamente rota sin implementar un corrector de conexión personalizado como lo he hecho.

Pregunta: ¿Cómo permitimos que las conexiones / subprocesos mueran sin implementar algo cursi como un "enlace de conexión" o un recolector de subprocesos de basura?

Como siempre, avíseme si he olvidado algo.

Actualización Solo para agregar un poco de información adicional: Huetsch en github publicó este comentario señalando que SSE se basa en TCP, que normalmente envía un paquete FIN cuando la conexión está cerrada, dejando que el otro extremo (servidor en este caso) sepa que Es seguro cerrar la conexión. Huetsch señala que el navegador no está enviando ese paquete (¿quizás un error en la biblioteca de EventSource ?), O que Rails no lo está EventSource o está haciendo algo con él (definitivamente es un error en Rails, si ese es el caso). La búsqueda continúa ...

Otra actualización usando Wireshark, de hecho puedo ver paquetes FIN que se envían. Es cierto que no tengo mucho conocimiento o experiencia con el nivel de protocolo, sin embargo, por lo que puedo decir, definitivamente detecto que un paquete FIN se envía desde el navegador cuando establezco la conexión SSE usando EventSource desde el navegador, y NO se envía ningún paquete si eliminar esa conexión (lo que significa que no hay SSE). Aunque no estoy muy al tanto de mi conocimiento de TCP, esto parece indicarme que el cliente está terminando la conexión correctamente; quizás esto indica un error en Puma o Rails.

Otra actualización, @JamesBoutcher / boutcheratwest (github), me señaló una discusión en el sitio web de redis con respecto a este tema, específicamente con respecto al hecho de que el método de .(p)subscribe nunca se cierra. El póster de ese sitio señaló lo mismo que hemos descubierto aquí, que el entorno Rails nunca se notifica cuando la conexión del lado del cliente está cerrada y, por lo tanto, no puede ejecutar el método. .(p)unsubscribe . Pregunta sobre un tiempo de espera para el método de .(p)subscribe , que creo que también funcionaría, aunque no estoy seguro de qué método (el póker de conexión que he descrito anteriormente o su sugerencia de tiempo de espera) sería una solución mejor . Idealmente, para la solución de póker de conexión, me gustaría encontrar una manera de determinar si la conexión está cerrada en el otro extremo sin escribir en la transmisión. Como es ahora, como pueden ver, tengo que implementar un código del lado del cliente para manejar mi mensaje de "asentimiento" por separado, lo que creo que es intrincado y tonto como diablos.


Actualmente estoy haciendo una aplicación que gira en torno a ActionController: Live, EventSource y Puma y para aquellos que tienen problemas para cerrar transmisiones y tal, en lugar de rescatar un IOError , en Rails 4.2 necesita rescatar ClientDisconnected . Ejemplo:

def stream #Begin is not required twitter_client = Twitter::Streaming::Client.new(config_params) do |obj| # Do something end rescue ClientDisconnected # Do something when disconnected ensure # Do something else to ensure the stream is closed end

Encontré este útil consejo de esta publicación en el foro (todo el camino en la parte inferior): http://railscasts.com/episodes/401-actioncontroller-live?view=comments


Aquí está la solución con tiempo de espera que saldrá del bloqueo de Redis. (P) suscriba la llamada y elimine la pisada de conexión no utilizada.

class Stream::FixedController < StreamController def events # Rails reserve a db connection from connection pool for # each request, lets put it back into connection pool. ActiveRecord::Base.clear_active_connections! # Last time of any (except heartbeat) activity on stream # it mean last time of any message was send from server to client # or time of setting new connection @last_active = Time.zone.now # Redis (p)subscribe is blocking request so we need do some trick # to prevent it freeze request forever. redis.psubscribe("messages:*", ''heartbeat'') do |on| on.pmessage do |pattern, event, data| # capture heartbeat from Redis pub/sub if event == ''heartbeat'' # calculate idle time (in secounds) for this stream connection idle_time = (Time.zone.now - @last_active).to_i # Now we need to relase connection with Redis.(p)subscribe # chanel to allow go of any Exception (like connection closed) if idle_time > 4.minutes # unsubscribe from Redis because of idle time was to long # that''s all - fix in (almost)one line :) redis.punsubscribe end else # save time of this (last) activity @last_active = Time.zone.now end # write to stream - even heartbeat - it''s sometimes chance to # capture dissconection error before idle_time response.stream.write("event: #{event}/ndata: #{data}/n/n") end end # blicking end (no chance to get below this line without unsubscribe) rescue IOError Logs::Stream.info "Stream closed" rescue ClientDisconnected Logs::Stream.info "ClientDisconnected" rescue ActionController::Live::ClientDisconnected Logs::Stream.info "Live::ClientDisconnected" ensure Logs::Stream.info "Stream ensure close" redis.quit response.stream.close end end

Tienes que usar rojos. (P) anular la suscripción para finalizar esta llamada de bloqueo. Ninguna excepción puede romper esto.

Mi aplicación simple con información sobre esta solución: https://github.com/piotr-kedziak/redis-subscribe-stream-puma-fix


Aquí hay una solución potencialmente más simple que no usa un latido del corazón. Después de mucha investigación y experimentación, aquí está el código que estoy usando con sinatra + sinatra sse gem (que se debe adaptar fácilmente a Rails 4):

class EventServer < Sinatra::Base include Sinatra::SSE set :connections, [] . . . get ''/channel/:channel'' do . . . sse_stream do |out| settings.connections << out out.callback { puts ''Client disconnected from sse''; settings.connections.delete(out); } redis.subscribe(channel) do |on| on.subscribe do |channel, subscriptions| puts "Subscribed to redis ##{channel}/n" end on.message do |channel, message| puts "Message from redis ##{channel}: #{message}/n" message = JSON.parse(message) . . . if settings.connections.include?(out) out.push(message) else puts ''closing orphaned redis connection'' redis.unsubscribe end end end end end

La conexión de redis bloquea el mensaje y solo acepta (p) suscribe / (p) borra los comandos. Una vez que se da de baja, la conexión redis ya no está bloqueada y puede ser liberada por el objeto del servidor web que fue instanciado por la solicitud sse inicial. Se borra automáticamente cuando recibe un mensaje en redis y la conexión con el navegador ya no existe en el conjunto de recopilación.


En lugar de enviar un latido del corazón a todos los clientes, podría ser más fácil simplemente establecer un perro guardián para cada conexión. [Gracias a @NeilJewers]

class Stream::FixedController < StreamController def events # Rails reserve a db connection from connection pool for # each request, lets put it back into connection pool. ActiveRecord::Base.clear_active_connections! redis = Redis.new watchdog = Doberman::WatchDog.new(:timeout => 20.seconds) watchdog.start # Redis (p)subscribe is blocking request so we need do some trick # to prevent it freeze request forever. redis.psubscribe("messages:*") do |on| on.pmessage do |pattern, event, data| begin # write to stream - even heartbeat - it''s sometimes chance to response.stream.write("event: #{event}/ndata: #{data}/n/n") watchdog.ping rescue Doberman::WatchDog::Timeout => e raise ClientDisconnected if response.stream.closed? watchdog.ping end end end rescue IOError rescue ClientDisconnected ensure response.stream.close redis.quit watchdog.stop end end


Sobre la base de @James Boutcher, utilicé lo siguiente en Puma agrupado con 2 trabajadores, por lo que tengo solo 1 hilo creado para el latido del corazón en config / initializers / redis.rb:

config / puma.rb

on_worker_boot do |index| puts "worker nb #{index.to_s} booting" create_heartbeat if index.to_i==0 end def create_heartbeat puts "creating heartbeat" $redis||=Redis.new heartbeat = Thread.new do ActiveRecord::Base.connection_pool.release_connection begin while true hash={event: "heartbeat",data: "heartbeat"} $redis.publish("heartbeat",hash.to_json) sleep 20.seconds end ensure #no db connection anyway end end end


Una solución que acabo de hacer (tomando prestado mucho de @teeg) que parece funcionar bien (aunque no lo haya probado en caso de falla)

config / initializers / redis.rb

$redis = Redis.new(:host => "xxxx.com", :port => 6379) heartbeat_thread = Thread.new do while true $redis.publish("heartbeat","thump") sleep 30.seconds end end at_exit do # not sure this is needed, but just in case heartbeat_thread.kill $redis.quit end

Y luego en mi controlador:

def events response.headers["Content-Type"] = "text/event-stream" redis = Redis.new(:host => "xxxxxxx.com", :port => 6379) logger.info "New stream starting, connecting to redis" redis.subscribe([''parse.new'',''heartbeat'']) do |on| on.message do |event, data| if event == ''parse.new'' response.stream.write("event: parse/ndata: #{data}/n/n") elsif event == ''heartbeat'' response.stream.write("event: heartbeat/ndata: heartbeat/n/n") end end end rescue IOError logger.info "Stream closed" ensure logger.info "Stopping stream thread" redis.quit response.stream.close end