enablewebsocketmessagebroker baeldung spring spring-mvc spring-websocket

baeldung - Spring Websocket en un cluster tomcat



websocket angular 6 spring boot (3)

La escala horizontal de WebSockets es en realidad muy diferente a la escala horizontal de las aplicaciones basadas en HTTP sin estado / sin estado.

Aplicación HTTP de Stateless escalable horizontalmente : simplemente active algunas instancias de aplicaciones en diferentes máquinas y coloque un equilibrador de carga frente a ellas. Existen soluciones de equilibradores de carga bastante diferentes, como HAProxy, Nginx, etc. Si se encuentra en un entorno de nube como AWS, también podría haber gestionado soluciones como Elastic Load Balancer.

Aplicación HTTP de estado de escalado horizontal : sería genial si pudiéramos tener todas las aplicaciones sin estado cada vez, pero desafortunadamente eso no siempre es posible. Por lo tanto, cuando se trata de aplicaciones HTTP con estado, debe preocuparse por la sesión HTTP, que es básicamente un almacenamiento local para cada cliente diferente donde el servidor web puede almacenar datos que se mantienen a través de diferentes solicitudes HTTP (como cuando se trata de un Carro). Bueno, en este caso, al escalar horizontalmente, debe tener en cuenta que, como dije, es un almacenamiento LOCAL , por lo que el Servidor A no podrá manejar una sesión HTTP que esté en el ServidorB. En otras palabras, si por alguna razón el Servidor B que está siendo atendido por el Cliente1 comienza a ser atendido por el Servidor B, su sesión HTTP se perderá (¡y su carrito de compras se habrá ido!). Las razones pueden ser un fallo de nodo o incluso un despliegue. Para solucionar este problema, no puede mantener las sesiones HTTP solo localmente, es decir, debe almacenarlas en otro componente externo. Hay varios componentes que podrían manejar esto, como cualquier base de datos relacional, pero eso sería en realidad una sobrecarga. Algunas bases de datos NoSQL pueden manejar este comportamiento clave-valor muy bien, como Redis. Ahora, con la sesión HTTP almacenada en Redis, si un cliente comienza a recibir servicio de otro servidor, recuperará la sesión HTTP del cliente de Redis y la cargará en su memoria, por lo que todo seguirá funcionando y el usuario no perderá su cuenta. Sesión HTTP más. Puede usar Spring Session para almacenar fácilmente la sesión HTTP en Redis.

Aplicación WebSocket de escala horizontal : cuando se establece una conexión WebSocket, el servidor debe mantener la conexión abierta con el cliente para que puedan intercambiar datos en ambas direcciones. Cuando un cliente escucha un destino como "/topic/public.messages", decimos que el cliente está suscrito a este destino. En Spring, cuando utiliza el enfoque simpleBroker , las suscripciones se guardan en la memoria , ¿qué sucede, por ejemplo, si el Servidor A sirve a Client1 y desea enviar un mensaje usando WebSocket al Cliente2 que atiende a ServerB? ¡Usted ya sabe la respuesta! El mensaje no se enviará a Client2 porque Server1 ni siquiera conoce la suscripción de Client2. Por lo tanto, para resolver este problema, nuevamente debe externalizar las suscripciones de WebSockets. Como está utilizando STOMP como subprotocolo, necesita un componente externo que pueda actuar como un agente STOMP externo. Hay muchas herramientas capaces de hacer esto, pero sugeriría RabbitMQ. Ahora, debe cambiar su configuración de Spring para que no mantenga las suscripciones en la memoria . En su lugar, delegará las suscripciones a un agente STOMP externo. Puede lograrlo fácilmente con algunas configuraciones básicas como enableStompBrokerRelay . Lo importante a tener en cuenta es que la sesión HTTP es diferente a la sesión de WebSocket . El uso de Spring Session para almacenar la sesión HTTP en Redis no tiene absolutamente nada que ver con la escala horizontal de WebSockets .

He codificado una aplicación de chat web completa con Spring Boot (y mucho más) que utiliza RabbitMQ como un corredor de STOMP externo completo y es pública en GitHub, así que clone, ejecute la aplicación en su máquina y vea los detalles del código.

Cuando se trata de una pérdida de conexión de WebSocket, no hay mucho que Spring pueda hacer. En realidad, el lado del cliente debe solicitar la reconexión mediante la implementación de una función de devolución de llamada de reconexión, por ejemplo (ese es el flujo de reconocimiento de WebSocket, el cliente debe iniciar el intercambio, no el servidor). Hay algunas bibliotecas del lado del cliente que pueden manejar esto de forma transparente para usted. Ese no es el caso SockJS. En la aplicación de chat también implementé esta función de reconexión.

En nuestra aplicación actual, usamos Spring Websockets sobre STOMP. Estamos buscando escalar horizontalmente. ¿Existen prácticas recomendadas sobre cómo debemos manejar el tráfico de websocket en múltiples instancias de Tomcat y cómo podemos mantener la información de la sesión en múltiples nodos?


Mantenga la información de la sesión en varios nodos:

Supongamos que tenemos 2 servidores host, respaldados con un equilibrador de carga.

Los websockets son una conexión de socket desde el navegador a un servidor específico host.eg host1

Ahora, si el host1 se desactiva, la conexión de socket del equilibrador de carga - el host 1 se interrumpirá. ¿Cómo Spring volverá a abrir la misma conexión websocket del equilibrador de carga al host 2? El navegador no debe abrir una nueva conexión websocket


Su requerimiento se puede dividir en 2 sub tareas:

  1. Mantenga la información de la sesión en varios nodos: puede probar el agrupamiento de Spring Sessions respaldado por Redis (consulte: HttpSession con Redis ). Esto es muy simple y ya tiene soporte para Spring Websockets (ver: Spring Session y WebSockets ).

  2. Manejar el tráfico de websockets en múltiples instancias de Tomcat: hay varias formas de hacerlo.

    • La primera forma: utilizar un agente con todas las funciones (por ejemplo: ActiveMQ) y probar una nueva función. Soporta múltiples servidores WebSocket (desde: 4.2.0 RC1)
    • La segunda forma: utilizar un agente de funciones completas e implementar un UserSessionRegistry distribuido (p. Ej., Mediante Redis: D). La implementación predeterminada DefaultUserSessionRegistry utilizando un almacenamiento en memoria.

Actualizado: escribí una implementación simple usando Redis, pruébala si estás interesado

Para configurar un broker con todas las funciones (relé de broker), puede probar:

public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer { ... @Autowired private RedisConnectionFactory redisConnectionFactory; @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableStompBrokerRelay("/topic", "/queue") .setRelayHost("localhost") // broker host .setRelayPort(61613) // broker port ; config.setApplicationDestinationPrefixes("/app"); } @Bean public UserSessionRegistry userSessionRegistry() { return new RedisUserSessionRegistry(redisConnectionFactory); } ... }

y

import java.util.Set; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.BoundHashOperations; import org.springframework.data.redis.core.BoundSetOperations; import org.springframework.data.redis.core.RedisOperations; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.messaging.simp.user.UserSessionRegistry; import org.springframework.util.Assert; /** * An implementation of {@link UserSessionRegistry} backed by Redis. * @author thanh */ public class RedisUserSessionRegistry implements UserSessionRegistry { /** * The prefix for each key of the Redis Set representing a user''s sessions. The suffix is the unique user id. */ static final String BOUNDED_HASH_KEY_PREFIX = "spring:websockets:users:"; private final RedisOperations<String, String> sessionRedisOperations; @SuppressWarnings("unchecked") public RedisUserSessionRegistry(RedisConnectionFactory redisConnectionFactory) { this(createDefaultTemplate(redisConnectionFactory)); } public RedisUserSessionRegistry(RedisOperations<String, String> sessionRedisOperations) { Assert.notNull(sessionRedisOperations, "sessionRedisOperations cannot be null"); this.sessionRedisOperations = sessionRedisOperations; } @Override public Set<String> getSessionIds(String user) { Set<String> entries = getSessionBoundHashOperations(user).members(); return (entries != null) ? entries : Collections.<String>emptySet(); } @Override public void registerSessionId(String user, String sessionId) { getSessionBoundHashOperations(user).add(sessionId); } @Override public void unregisterSessionId(String user, String sessionId) { getSessionBoundHashOperations(user).remove(sessionId); } /** * Gets the {@link BoundHashOperations} to operate on a username */ private BoundSetOperations<String, String> getSessionBoundHashOperations(String username) { String key = getKey(username); return this.sessionRedisOperations.boundSetOps(key); } /** * Gets the Hash key for this user by prefixing it appropriately. */ static String getKey(String username) { return BOUNDED_HASH_KEY_PREFIX + username; } @SuppressWarnings("rawtypes") private static RedisTemplate createDefaultTemplate(RedisConnectionFactory connectionFactory) { Assert.notNull(connectionFactory, "connectionFactory cannot be null"); StringRedisTemplate template = new StringRedisTemplate(connectionFactory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }