tutorial example ejemplos code java multithreading rabbitmq messaging channel

java - example - rabbitmq tutorial



RabbitMQ por ejemplo: mĂșltiples hilos, canales y colas (3)

Acabo de leer los documentos API de Java de RabbitMQ , y me pareció muy informativo y directo. El ejemplo de cómo configurar un Channel simple para publicar / consumir es muy fácil de seguir y comprender. Pero es un ejemplo muy simple / básico, y me dejó una pregunta importante: ¿cómo puedo configurar 1+ Channels para publicar / consumir ay desde múltiples colas?

Digamos que tengo un servidor RabbitMQ con 3 colas: logging , security_events y customer_orders . Por lo tanto, necesitaríamos un solo Channel para poder publicar / consumir en las 3 colas, o más probablemente, tener 3 Channels separados, cada uno dedicado a una única cola.

Además de esto, las mejores prácticas de RabbitMQ dictan que configuremos 1 Channel por hilo de consumidor. Para este ejemplo, digamos que security_events está bien con solo 1 hilo de consumidor, pero tanto el logging como el customer_order necesitan 5 hilos para manejar el volumen. Entonces, si lo entiendo correctamente, ¿eso significa que necesitamos:

  • 1 Channel y 1 hilo de consumidor para publicar / consumir hacia y desde security_events ; y
  • 5 Channels y 5 hilos de consumo para publicar / consumir desde y hacia el logging ; y
  • ¿5 Channels y 5 hilos de consumo para publicar / consumir hacia y desde customer_orders ?

Si mi entendimiento está equivocado aquí, por favor comience por corregirme. De cualquier forma, ¿podría algún veterano de RabbitMQ cansado de la batalla ayudarme a "conectar los puntos" con un ejemplo de código decente para configurar editores / consumidores que cumplan con mis requisitos aquí? ¡Gracias por adelantado!


¿Cómo puedo configurar 1+ canales para publicar / consumir ay desde múltiples colas?

Puede implementar el uso de hilos y canales. Todo lo que necesita es una forma de categorizar cosas, es decir, todos los elementos de la cola del inicio de sesión, todos los elementos de la cola de security_events, etc. La catagorización se puede lograr usando una clave de enrutamiento.

es decir: cada vez que agrega un elemento a la cola, especifica la clave de enrutamiento. Se adjuntará como un elemento de propiedad. De esta forma puede obtener los valores de un evento en particular, por ejemplo, el registro .

El siguiente ejemplo de código explica cómo hacerlo en el lado del cliente.

P.ej:

La clave de enrutamiento se usa para identificar el tipo de canal y recuperar los tipos.

Por ejemplo, si necesita obtener todos los canales sobre el tipo de inicio de sesión, debe especificar la clave de enrutamiento como inicio de sesión o alguna otra palabra clave para identificarlo.

Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); string routingKey="login"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

Puedes mirar here para más detalles sobre la Categorización.

Subprocesos Parte

Una vez que la parte de publicación termina, puede ejecutar la parte de la secuencia.

En esta parte puede obtener los datos publicados sobre la base de la categoría. es decir; clave de enrutamiento que en su caso es el registro, security_events y customer_orders, etc.

mira en el ejemplo para saber cómo recuperar los datos en hilos.

P.ej :

ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //**The threads part is as follows** channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); // This part will biend the queue with the severity (login for eg:) for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, routingKey); } boolean autoAck = false; channel.basicConsume(queueName, autoAck, "myConsumerTag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String routingKey = envelope.getRoutingKey(); String contentType = properties.contentType; long deliveryTag = envelope.getDeliveryTag(); // (process the message components here ...) channel.basicAck(deliveryTag, false); } });

Ahora se crea un hilo que procesa los datos en la cola del tipo de inicio de sesión (clave de enrutamiento). De esta forma puedes crear múltiples hilos. Cada porción tiene un propósito diferente.

mira aquí para más detalles sobre la parte de los hilos ...


¿Por qué implementar todo por ti mismo?

Intenta usar algún tipo de marco de integración. Digamos que Camel ya tiene un montón de conectores en varios sistemas, Rabbit MQ incluido Camel Rabbit MQ .

Solo tienes que definir tus rutas. Por ejemplo:

Desea consumir los mensajes de la cola de registro de 5 consumidores simultáneos en un archivo.

from("rabbitmq://localhost/Logging ?concurrentConsumers=5") .to("file://yourLoggingFile")

Hay muchas options para configurar el consumidor de archivos. Como puede ver, puede definir cuántos consumidores deben generarse simplemente colocando concurrentConsumers=5 en su URI. Si lo desea, puede crear su propio productor o consumidor implementando la interfaz del procesador.

Es un marco muy versátil y poderoso que puede hacer un montón de trabajo simplemente utilizando los componentes proporcionados. La web del proyecto contiene muchos ejemplos y documentación.


Creo que tienes varios problemas con la comprensión inicial. Francamente, estoy un poco sorprendido de ver lo siguiente: both need 5 threads to handle the volume . ¿Cómo se identificó que necesita ese número exacto? ¿Tiene alguna garantía 5 hilos será suficiente?

RabbitMQ está afinado y probado en el tiempo, por lo que se trata de un diseño adecuado y un procesamiento de mensajes eficiente.

Intentemos revisar el problema y encontrar una solución adecuada. Por cierto, la cola de mensajes en sí misma no brindará ninguna garantía de que tenga una solución realmente buena. Tienes que entender lo que estás haciendo y también hacer algunas pruebas adicionales.

Como definitivamente sabes, hay muchos diseños posibles:

Utilizaré el diseño B como la forma más sencilla de ilustrar el problema de 1 productor N consumidores. Dado que estás tan preocupado por el rendimiento. Por cierto, como es de esperar, RabbitMQ se comporta bastante bien ( source ). Preste atención a prefetchCount , lo abordaré más adelante:

Por lo tanto, es probable que la lógica de procesamiento de mensajes sea el lugar correcto para asegurarse de que tendrá suficiente rendimiento. Naturalmente, puede abarcar un nuevo hilo cada vez que necesite procesar un mensaje, pero eventualmente tal enfoque matará su sistema. Básicamente, obtendrá más subprocesos con mayor latencia (puede consultar la ley de Amdahl si lo desea).

(ver la ley de Amdahl ilustrada )

Consejo # 1: tenga cuidado con los hilos, use ThreadPools ( details )

Un grupo de subprocesos se puede describir como una colección de objetos Ejecutables (cola de trabajos) y una conexión de subprocesos en ejecución. Estos subprocesos se ejecutan constantemente y están verificando la consulta de trabajo para un nuevo trabajo. Si hay un nuevo trabajo por hacer, ejecutan este Runnable. La clase Thread proporciona un método, por ejemplo, ejecutar (ejecutable r) para agregar un nuevo objeto Runnable a la cola de trabajo.

public class Main { private static final int NTHREDS = 10; public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(NTHREDS); for (int i = 0; i < 500; i++) { Runnable worker = new MyRunnable(10000000L + i); executor.execute(worker); } // This will make the executor accept no new threads // and finish all existing threads in the queue executor.shutdown(); // Wait until all threads are finish executor.awaitTermination(); System.out.println("Finished all threads"); } }

Consejo # 2: tenga cuidado con la sobrecarga de procesamiento de mensajes

Yo diría que esta es una técnica de optimización obvia. Es probable que envíe mensajes pequeños y fáciles de procesar. Todo el enfoque consiste en que los mensajes más pequeños se configuren y procesen continuamente. Grandes mensajes eventualmente jugarán un mal chiste, así que es mejor evitar eso.

Entonces, es mejor enviar pequeños pedazos de información, pero ¿qué hay del procesamiento? Hay una sobrecarga cada vez que envía un trabajo. El procesamiento por lotes puede ser muy útil en caso de alta velocidad de mensajes entrantes.

Por ejemplo, supongamos que tenemos una lógica de procesamiento de mensajes simple y no queremos tener transparencias específicas cada vez que se procesa un mensaje. Para optimizar ese CompositeRunnable can be introduced muy simple CompositeRunnable can be introduced :

class CompositeRunnable implements Runnable { protected Queue<Runnable> queue = new LinkedList<>(); public void add(Runnable a) { queue.add(a); } @Override public void run() { for(Runnable r: queue) { r.run(); } } }

O haz lo mismo de una manera ligeramente diferente, recopilando mensajes para procesar:

class CompositeMessageWorker<T> implements Runnable { protected Queue<T> queue = new LinkedList<>(); public void add(T message) { queue.add(message); } @Override public void run() { for(T message: queue) { // process a message } } }

De esta forma, puedes procesar mensajes de manera más efectiva.

Consejo # 3: optimizar el procesamiento de mensajes

A pesar de que sabe que puede procesar mensajes en paralelo ( Tip #1 ) y reducir los gastos generales de procesamiento ( Tip #2 ), debe hacerlo todo rápidamente. Los pasos de procesamiento redundantes, los bucles pesados, etc. pueden afectar mucho el rendimiento. Por favor, vea un interesante caso de estudio:

Mejora del rendimiento de Message Queue multiplicado por diez eligiendo el analizador XML correcto

Consejo # 4: Conexión y gestión de canales

  • Iniciar un nuevo canal en una conexión existente implica un viaje de ida y vuelta de red: iniciar una nueva conexión requiere varios.
  • Cada conexión utiliza un descriptor de archivo en el servidor. Los canales no.
  • Publicar un mensaje grande en un canal bloqueará una conexión mientras se apaga. Aparte de eso, la multiplexación es bastante transparente.
  • Las conexiones que se publican pueden bloquearse si el servidor está sobrecargado; es una buena idea separar las conexiones de publicación y de consumo.
  • Esté preparado para manejar ráfagas de mensajes

( source )

Tenga en cuenta que todos los consejos funcionan perfectamente juntos. No dude en avisarme si necesita detalles adicionales.

Ejemplo de consumidor completo ( source )

Tenga en cuenta lo siguiente:

  • channel.basicQos (prefetch) - Como viste antes, prefetchCount puede ser muy útil:

    Este comando permite al consumidor elegir una ventana de captación previa que especifica la cantidad de mensajes no reconocidos que está preparado para recibir. Al establecer el recuento de captación previa en un valor distinto de cero, el intermediario no enviará ningún mensaje al consumidor que infrinja ese límite. Para mover la ventana hacia adelante, el consumidor debe acusar recibo de un mensaje (o un grupo de mensajes).

  • ExecutorService threadExecutor : puede especificar el servicio del ejecutor configurado correctamente.

Ejemplo:

static class Worker extends DefaultConsumer { String name; Channel channel; String queue; int processed; ExecutorService executorService; public Worker(int prefetch, ExecutorService threadExecutor, , Channel c, String q) throws Exception { super(c); channel = c; queue = q; channel.basicQos(prefetch); channel.basicConsume(queue, false, this); executorService = threadExecutor; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Runnable task = new VariableLengthTask(this, envelope.getDeliveryTag(), channel); executorService.submit(task); } }

También puede verificar lo siguiente: