spring-boot hornetq

Spring Boot incrustado HornetQ clúster no reenviando mensajes



spring-boot (1)

Intento crear un clúster estático de dos aplicaciones Spring Boot con servidores HornetQ incrustados. Una aplicación / servidor manejará eventos externos y generará mensajes para ser enviados a una cola de mensajes. La otra aplicación / servidor escuchará en la cola de mensajes y procesará los mensajes entrantes. Debido a que el enlace entre las dos aplicaciones no es confiable, cada una usará solo clientes locales / inVM para producir / consumir mensajes en sus respectivos servidores, y confiando en la funcionalidad de clustering para reenviar los mensajes a la cola en el otro servidor en el clúster.

Estoy usando HornetQConfigurationCustomizer para personalizar el servidor HornetQ incrustado, porque de manera predeterminada solo viene con una InVMConnectorFactory .

He creado un par de aplicaciones de muestra que ilustran esta configuración, a lo largo de este ejemplo "ServerSend", se refiere al servidor que producirá mensajes, y "ServerReceive" hace referencia al servidor que consumirá mensajes.

pom.xml para ambas aplicaciones contiene:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-hornetq</artifactId> </dependency> <dependency> <groupId>org.hornetq</groupId> <artifactId>hornetq-jms-server</artifactId> </dependency>

DemoHornetqServerSendApplication:

@SpringBootApplication @EnableScheduling public class DemoHornetqServerSendApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${spring.hornetq.embedded.queues}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqServerSendApplication.class, args); } @Scheduled(fixedRate = 5000) private void sendMessage() { String message = "Timestamp from Server: " + System.currentTimeMillis(); System.out.println("Sending message: " + message); jmsTemplate.convertAndSend(testQueue, message); } @Bean public HornetQConfigurationCustomizer hornetCustomizer() { return new HornetQConfigurationCustomizer() { @Override public void customize(Configuration configuration) { String serverSendConnectorName = "server-send-connector"; String serverReceiveConnectorName = "server-receive-connector"; Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5445"); TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverSendConnectorName, tc); Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); acceptors.add(tc); params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5446"); tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverReceiveConnectorName, tc); List<String> staticConnectors = new ArrayList<String>(); staticConnectors.add(serverReceiveConnectorName); ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration( "my-cluster", // name "jms", // address serverSendConnectorName, // connector name 500, // retry interval true, // duplicate detection true, // forward when no consumers 1, // max hops 1000000, // confirmation window size staticConnectors, true // allow direct connections only ); configuration.getClusterConfigurations().add(conf); AddressSettings setting = new AddressSettings(); setting.setRedistributionDelay(0); configuration.getAddressesSettings().put("#", setting); } }; } }

application.properties (ServerSend):

spring.hornetq.mode=embedded spring.hornetq.embedded.enabled=true spring.hornetq.embedded.queues=jms.testqueue spring.hornetq.embedded.cluster-password=password

DemoHornetqServerReceiveApplication:

@SpringBootApplication @EnableJms public class DemoHornetqServerReceiveApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${spring.hornetq.embedded.queues}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqServerReceiveApplication.class, args); } @JmsListener(destination="${spring.hornetq.embedded.queues}") public void receiveMessage(String message) { System.out.println("Received message: " + message); } @Bean public HornetQConfigurationCustomizer hornetCustomizer() { return new HornetQConfigurationCustomizer() { @Override public void customize(Configuration configuration) { String serverSendConnectorName = "server-send-connector"; String serverReceiveConnectorName = "server-receive-connector"; Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5446"); TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverReceiveConnectorName, tc); Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); acceptors.add(tc); params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5445"); tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverSendConnectorName, tc); List<String> staticConnectors = new ArrayList<String>(); staticConnectors.add(serverSendConnectorName); ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration( "my-cluster", // name "jms", // address serverReceiveConnectorName, // connector name 500, // retry interval true, // duplicate detection true, // forward when no consumers 1, // max hops 1000000, // confirmation window size staticConnectors, true // allow direct connections only ); configuration.getClusterConfigurations().add(conf); AddressSettings setting = new AddressSettings(); setting.setRedistributionDelay(0); configuration.getAddressesSettings().put("#", setting); } }; } }

application.properties (ServerReceive):

spring.hornetq.mode=embedded spring.hornetq.embedded.enabled=true spring.hornetq.embedded.queues=jms.testqueue spring.hornetq.embedded.cluster-password=password

Después de iniciar ambas aplicaciones, el resultado del registro muestra esto:

ServerSend:

2015-04-09 11: 11: 58.471 INFO 7536 --- [principal] org.hornetq.core.server: HQ221000: el servidor en vivo está comenzando con la configuración Configuración de HornetQ (clustered = true, backup = false, sharedStore = true, journalDirectory = C: / Users **** / AppData / Local / Temp / hornetq-data / journal, bindingsDirectory = data / bindings, largeMessagesDirectory = data / largemessages, pagingDirectory = data / paging)
2015-04-09 11: 11: 58.501 INFORMACIÓN 7536 --- [principal] org.hornetq.core.server: HQ221045: libaio no está disponible, cambiando la configuración a NIO
2015-04-09 11: 11: 58.595 INFORMACIÓN 7536 --- [principal] org.hornetq.core.server: HQ221043: Adición de soporte de protocolo CORE
2015-04-09 11: 11: 58.720 INFORMACIÓN 7536 --- [principal] org.hornetq.core.server: HQ221003: intentando implementar la cola jms.queue.jms.testqueue
2015-04-09 11: 11: 59.568 INFO 7536 --- [principal] org.hornetq.core.server: HQ221020: Iniciado Netty Acceptor versión 4.0.13.Final localhost: 5445
2015-04-09 11: 11: 59.593 INFORMACIÓN 7536 --- [principal] org.hornetq.core.server: HQ221007: El servidor ya está activo
2015-04-09 11: 11: 59.593 INFO 7536 --- [principal] org.hornetq.core.server: HQ221001: HornetQ Server versión 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e -e58abf5d6944]

ServerReceive:

2015-04-09 11: 12: 04.401 INFORMACIÓN 4528 --- [principal] org.hornetq.core.server: HQ221000: el servidor en vivo está comenzando con la configuración HornetQ Configuración (clustered = true, backup = false, sharedStore = true, journalDirectory = C: / Users **** / AppData / Local / Temp / hornetq-data / journal, bindingsDirectory = data / bindings, largeMessagesDirectory = data / largemessages, pagingDirectory = data / paging)
2015-04-09 11: 12: 04.410 INFO 4528 --- [principal] org.hornetq.core.server: HQ221045: libaio no está disponible, cambiando la configuración a NIO
2015-04-09 11: 12: 04.520 INFO 4528 --- [principal] org.hornetq.core.server: HQ221043: agregando compatibilidad con el protocolo CORE
2015-04-09 11: 12: 04.629 INFO 4528 --- [principal] org.hornetq.core.server: HQ221003: intentando implementar cola jms.queue.jms.testqueue
2015-04-09 11: 12: 05.545 INFO 4528 --- [principal] org.hornetq.core.server: HQ221020: Iniciado Netty Acceptor versión 4.0.13.Final localhost: 5446
2015-04-09 11: 12: 05.578 INFO 4528 --- [principal] org.hornetq.core.server: HQ221007: El servidor ya está activo
2015-04-09 11: 12: 05.578 INFO 4528 --- [principal] org.hornetq.core.server: HQ221001: HornetQ Server versión 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e -e58abf5d6944]

Veo clustered=true en ambas salidas, y esto mostraría false si HornetQConfigurationCustomizer la configuración del clúster de HornetQConfigurationCustomizer , por lo que debe tener algún efecto.

Ahora, ServerSend muestra esto en la salida de la consola:

Enviando mensaje: Marca de tiempo del servidor: 1428574324910
Enviando mensaje: Marca de tiempo del servidor: 1428574329899
Enviando mensaje: Marca de tiempo del servidor: 1428574334904

Sin embargo, ServerReceive no muestra nada.

Parece que los mensajes no se reenvían de ServerSend a ServerReceive.

Hice algunas pruebas más, al crear otras dos aplicaciones Spring Boot (ClientSend y ClientReceive), que no tienen incrustado un servidor HornetQ y en su lugar se conectan a un servidor "nativo".

pom.xml para ambas aplicaciones de cliente contiene:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-hornetq</artifactId> </dependency>

DemoHornetqClientSendApplication:

@SpringBootApplication @EnableScheduling public class DemoHornetqClientSendApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${queue}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqClientSendApplication.class, args); } @Scheduled(fixedRate = 5000) private void sendMessage() { String message = "Timestamp from Client: " + System.currentTimeMillis(); System.out.println("Sending message: " + message); jmsTemplate.convertAndSend(testQueue, message); } }

application.properties (ClientSend):

spring.hornetq.mode=native spring.hornetq.host=localhost spring.hornetq.port=5446 queue=jms.testqueue

DemoHornetqClientReceiveApplication:

@SpringBootApplication @EnableJms public class DemoHornetqClientReceiveApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${queue}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqClientReceiveApplication.class, args); } @JmsListener(destination="${queue}") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }

application.properties (ClientReceive):

spring.hornetq.mode=native spring.hornetq.host=localhost spring.hornetq.port=5445 queue=jms.testqueue

Ahora la consola muestra esto:

ServerReveive:

Mensaje recibido: marca de tiempo del cliente: 1428574966630
Mensaje recibido: marca de tiempo del cliente: 1428574971600
Mensaje recibido: marca de tiempo del cliente: 1428574976595

ClientReceive:

Mensaje recibido: Marca de hora del servidor: 1428574969436
Mensaje recibido: marca de tiempo del servidor: 1428574974438
Mensaje recibido: marca de tiempo del servidor: 1428574979446

Si ServerSend se ejecuta por un tiempo y luego se inicia ClientReceive, también recibe todos los mensajes en cola hasta ese punto, por lo que esto muestra que los mensajes no desaparecen en ningún lugar ni se consumen en otro lugar.

Para mayor completitud, también he señalado ClientSend a ServerSend y ClientReceive a ServerReceive, para ver si hay algún problema con la agrupación y los clientes InVM, pero nuevamente no hubo ningún outout que indique que se recibió ningún mensaje en ClientReceive o ServerReceive.

Por lo tanto, parece que la entrega de mensajes a / desde cada uno de los intermediarios integrados a los clientes externos directamente conectados funciona bien, pero no se reenvían mensajes entre intermediarios en el clúster.

Entonces, después de todo esto, la gran pregunta es: ¿qué pasa con la configuración de que los mensajes no se reenvían dentro del clúster?


http://docs.jboss.org/hornetq/2.2.5.Final/user-manual/en/html/architecture.html#d0e595

"HornetQ core está diseñado como un conjunto de POJO simples, por lo que si tiene una aplicación que requiere la funcionalidad de mensajería internamente, pero no desea exponer eso como servidor HornetQ, puede crear instancias directas e incrustar servidores HornetQ en su propia aplicación".

Si está incrustándolo, no lo está exponiendo como servidor. Cada uno de sus contenedores tiene una instancia separada. Es el equivalente a iniciar 2 copias de avispón y darles el mismo nombre de cola. Uno escribe en esa cola en la primera instancia y el otro escucha la cola en la segunda instancia.

Si desea desacoplar sus aplicaciones de esta manera, necesita tener un solo lugar que actúe como servidor. Probablemente, quieres clúster. Esto no es específico de Hornet, por cierto. Encontrará este patrón a menudo.