tutorial example ejemplo curso crear cola java java-ee architecture jms ibm-mq

example - jms java ejemplo



¿Es una buena práctica usar JMS Temporary Queue para el uso sincrónico? (7)

Si usamos el mecanismo de solicitud / respuesta JMS utilizando "Temporary Queue", ¿ese código será escalable?

Por el momento, no sabemos si admitiremos 100 solicitudes por segundo o 1000 solicitudes por segundo.

El código a continuación es lo que estoy pensando en implementar. Hace uso de JMS de forma ''síncrona''. Las partes clave son donde se crea el ''Consumidor'' para señalar una ''Cola temporal'' que se creó para esta sesión. Simplemente no puedo entender si el uso de tales Temporary Queues es un diseño escalable.

destination = session.createQueue("queue:///Q1"); producer = session.createProducer(destination); tempDestination = session.createTemporaryQueue(); consumer = session.createConsumer(tempDestination); long uniqueNumber = System.currentTimeMillis() % 1000; TextMessage message = session .createTextMessage("SimpleRequestor: Your lucky number today is " + uniqueNumber); // Set the JMSReplyTo message.setJMSReplyTo(tempDestination); // Start the connection connection.start(); // And, send the request producer.send(message); System.out.println("Sent message:/n" + message); // Now, receive the reply Message receivedMessage = consumer.receive(15000); // in ms or 15 seconds System.out.println("/nReceived message:/n" + receivedMessage);

Actualizar:

Me encontré con otro patrón, mira este blog. La idea es usar colas "regulares" para enviar y recibir. Sin embargo, para las llamadas "Sincrónicas", para obtener la Respuesta deseada (es decir, que coincida con la solicitud), se crea un Consumidor que escucha la Cola de recepción usando un "Selector".

Pasos:

// 1. Create Send and Receive Queue. // 2. Create a msg with a specific ID final String correlationId = UUID.randomUUID().toString(); final TextMessage textMessage = session.createTextMessage( msg ); textMessage.setJMSCorrelationID( correlationId ); // 3. Start a consumer that receives using a ''Selector''. consumer = session.createConsumer( replyQueue, "JMSCorrelationID = ''" + correlationId + "''" );

Entonces, la diferencia en este patrón es que no creamos una nueva cola temporal para cada nueva solicitud. En su lugar, todas las respuestas llegan a una sola cola, pero use un "selector" para asegurarse de que cada cadena de solicitud reciba la única respuesta que le interese.

Creo que la desventaja aquí es que debes usar un ''selector''. Todavía no sé si eso es menos preferido o más preferido que el patrón mencionado anteriormente. ¿Pensamientos?



Crear colas temporales no es gratis. Después de todo, está asignando recursos a los intermediarios. Una vez dicho esto, si tiene un número de clientes (una JVM múltiples, varios hilos concurrentes por JVM, etc.) desconocidos (antes de la mano) potencialmente sin consolidar, es posible que no tenga otra opción. Asignar las colas de los clientes y asignarlas a los clientes se iría rápidamente.

Ciertamente, lo que has esbozado es la solución más simple posible. Y si puede obtener números reales para el volumen de transacciones y escala lo suficiente, bien.

Antes de tratar de evitar las colas temporales, me gustaría más limitar la cantidad de clientes y hacer que los clientes sean de larga duración. Es decir, crear un grupo de clientes en el lado del cliente, y hacer que los clientes del grupo creen la cola temporal, la sesión, la conexión, etc. al inicio, reutilizarlos en solicitudes posteriores y derribarlos al apagarlos. Luego, el problema de sintonía se convierte en uno de los tamaños máx. / Mín. En el grupo, el tiempo de inactividad para podar el grupo, y cuál es el comportamiento (falla frente a bloque) cuando se maximiza el grupo. A menos que esté creando un número arbitrariamente grande de JVM transitorias (en cuyo caso tiene mayores problemas de escalado solo por la sobrecarga de inicio de JVM), eso debería escalar al igual que cualquier otra cosa. Después de todo, en ese momento los recursos que está asignando reflejan el uso real del sistema. Realmente no hay oportunidad de usar menos que eso.

Lo que hay que evitar es crear y destruir un gran número gratuito de colas, sesiones, conexiones, etc. Diseña el lado del servidor para permitir la transmisión desde el principio. Luego agrupe si / cuando lo necesita. Como no, para algo no trivial, necesitarás hacerlo.


Curiosamente, la escalabilidad de esto puede ser realmente lo contrario de lo que las otras respuestas han descrito.

WebSphere MQ guarda y reutiliza objetos de cola dinámicos siempre que sea posible. Entonces, aunque el uso de una cola dinámica no es gratuito, se escala bien porque a medida que se liberan las colas, todo lo que WMQ debe hacer es pasar el manejador al siguiente subproceso que solicita una nueva instancia de cola. En un QMgr ocupado, el número de colas dinámicas permanecerá relativamente estático mientras que las manijas pasan de un hilo a otro. Estrictamente hablando, no es tan rápido como reutilizar una sola cola, pero no está mal.

Por otro lado, aunque la indexación en CORRELID es rápida, el rendimiento es inverso al número de mensajes en el índice. También hace una diferencia si la profundidad de la cola comienza a construir. Cuando la aplicación cumple un GET con WAIT en una cola vacía, no hay demora. Pero en una cola profunda, el QMgr tiene que buscar en el índice de mensajes existentes para determinar que el mensaje de respuesta no está entre ellos. En su ejemplo, esa es la diferencia entre buscar un índice vacío versus un índice grande 1,000 veces por segundo.

El resultado es que 1000 colas dinámicas con un mensaje cada una pueden ser realmente más rápidas que una sola cola con 1000 subprocesos por CORRELID , dependiendo de las características de la aplicación y de la carga. Recomendaría probar esto a escala antes de comprometerme con un diseño en particular.


He estado enfrentando el mismo problema y decidí poner en común conexiones dentro de un frijol sin estado. Una conexión de cliente tiene un tempQueue y establece dentro del objeto JMSMessageExchanger (que contiene connectionFactory, Queue y tempQueue), que se vincula a una instancia de bean. Lo he probado en entornos JSE / EE. Pero no estoy seguro del comportamiento de la agrupación Glassfish JMS. ¿Cerrará realmente las conexiones JMS, obtenidas "a mano" después de que finalice el método Bean? ¿Estoy haciendo algo terriblemente incorrecto?

También he desactivado la transacción en el bean de cliente (TransactionAttributeType.NOT_SUPPORTED) para enviar mensajes de solicitud inmediatamente a la cola de solicitudes.

package net.sf.selibs.utils.amq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TemporaryQueue; import lombok.Getter; import lombok.Setter; import net.sf.selibs.utils.misc.UHelper; public class JMSMessageExchanger { @Setter @Getter protected long timeout = 60 * 1000; public JMSMessageExchanger(ConnectionFactory cf) { this.cf = cf; } public JMSMessageExchanger(ConnectionFactory cf, Queue queue) { this.cf = cf; this.queue = queue; } //work protected ConnectionFactory cf; protected Queue queue; protected TemporaryQueue tempQueue; protected Connection connection; protected Session session; protected MessageProducer producer; protected MessageConsumer consumer; //status protected boolean started = false; protected int mid = 0; public Message makeRequest(RequestProducer producer) throws Exception { try { if (!this.started) { this.init(); this.tempQueue = this.session.createTemporaryQueue(); this.consumer = this.session.createConsumer(tempQueue); } //send request Message requestM = producer.produce(this.session); mid++; requestM.setJMSCorrelationID(String.valueOf(mid)); requestM.setJMSReplyTo(this.tempQueue); this.producer.send(this.queue, requestM); //get response while (true) { Message responseM = this.consumer.receive(this.timeout); if (responseM == null) { return null; } int midResp = Integer.parseInt(responseM.getJMSCorrelationID()); if (mid == midResp) { return responseM; } else { //just get other message } } } catch (Exception ex) { this.close(); throw ex; } } public void makeResponse(ResponseProducer producer) throws Exception { try { if (!this.started) { this.init(); } Message response = producer.produce(this.session); response.setJMSCorrelationID(producer.getRequest().getJMSCorrelationID()); this.producer.send(producer.getRequest().getJMSReplyTo(), response); } catch (Exception ex) { this.close(); throw ex; } } protected void init() throws Exception { this.connection = cf.createConnection(); this.session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE); this.producer = this.session.createProducer(null); this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); this.connection.start(); this.started = true; } public void close() { UHelper.close(producer); UHelper.close(consumer); UHelper.close(session); UHelper.close(connection); this.started = false; } }

La misma clase se usa en el cliente (bean sin estado) y el servidor (@MessageDriven). RequestProducer y ResponseProducer son interfaces:

package net.sf.selibs.utils.amq; import javax.jms.Message; import javax.jms.Session; public interface RequestProducer { Message produce(Session session) throws Exception; } package net.sf.selibs.utils.amq; import javax.jms.Message; public interface ResponseProducer extends RequestProducer{ void setRequest(Message request); Message getRequest(); }

También he leído el artículo de AMQ sobre la implementación de solicitud-respuesta sobre AMQ: http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html


Tal vez sea demasiado tarde, pero esta semana pasé algunas horas para que funcione la solicitud / respuesta de sincronización dentro de JMS. ¿Qué hay de extender el QueueRequester con tiempo de espera? Lo hice y al menos las pruebas en una sola máquina (corredor en ejecución, solicitante y respuesta) mostraron que esta solución supera a las discutidas. Por otro lado, depende de la utilización de QueueConnection, lo que significa que puede verse obligado a abrir varias conexiones.


Usar el selector en la ID de correlación en una cola compartida se escalará muy bien con múltiples consumidores.

1000 solicitudes / s sin embargo serán mucho. Es posible que desee dividir la carga entre instancias diferentes si el rendimiento resulta ser un problema.

Es posible que desee detallar las solicitudes frente a los números de los clientes. Si el número de clientes es <10 y se mantendrá bastante estático, y los números de solicitud son muy altos, la solución más resistente y rápida podría ser tener colas de respuesta estáticas para cada cliente.


Usar la cola temporal costará crear dependToProductores cada vez. En lugar de utilizar un productor en caché para una respuesta a la cola estática, el método createProducer será más costoso e impactará en el rendimiento en un entorno de invocación altamente concurrente.