see node net last exchange create and java notifications redis message-queue dataflow

java - node - MQ para procesar, agregar y publicar datos de forma asíncrona



rabbitmq net core (3)

Las descripciones de las tareas de cola suenan parcialmente como sistemas basados ​​en " patrones de integración empresarial " como Apache Camel .

Un mensaje retrasado se puede expresar mediante constantes

from("seda:b").delay(1000).to("mock:result");

o variables, por ejemplo un valor de encabezado de mensaje

from("seda:a").delay().header("MyDelay").to("mock:result");

Algunos antecedentes, antes de llegar a la verdadera pregunta:

Estoy trabajando en una aplicación de fondo que consta de varios módulos diferentes. Cada módulo es, actualmente, una aplicación Java de línea de comandos, que se ejecuta "a pedido" (más detalles más adelante).

Cada módulo es un "paso", parte de un proceso más grande que se puede considerar como un flujo de datos; el primer paso recopila archivos de datos de una fuente externa y los empuja / los carga en algunas tablas de bases de datos SQL; luego los siguientes pasos, basados ​​en diferentes condiciones y eventos (tiempo, presencia de datos en el DB, mensajes y elaboraciones hechas a través de un servicio web / interfaz web), tomar datos de (1 o más) tablas de BD, procesarlos, y escríbalos en diferentes tablas. Los pasos se ejecutan en tres servidores diferentes y leen datos de tres DB diferentes, pero solo escriben en una única base de datos. El objetivo es agregar datos, calcular métricas y estadísticas.

Actualmente, cada módulo se ejecuta periódicamente (desde unos minutos / horas para los primeros módulos, hasta unos pocos días para los últimos en la cadena, que necesitan agregar más datos y, por lo tanto, esperar "más tiempo" para estar disponibles), utilizando un cronjob. Se ejecuta un módulo (actualmente, una aplicación de consola java) y comprueba la base de datos nueva no procesada en una ventana de fecha y hora determinada, y hace su trabajo.

El problema: funciona, pero ... Necesito expandirlo y mantenerlo, y este enfoque comienza a mostrar sus límites.

  1. No me gusta confiar en "encuestas"; es un desperdicio, teniendo en cuenta que la información de los módulos anteriores podría ser suficiente para "contar" otros módulos en la cadena cuando la información que necesitan esté disponible, y que puedan continuar.
  2. Es "lento": los varios días de retraso para los módulos de la cadena están ahí porque tenemos que asegurarnos de que los datos lleguen y sean procesados ​​por los módulos anteriores. Entonces "paramos" estos módulos hasta que estemos seguros de que tenemos todos los datos. Las nuevas incorporaciones requieren el cálculo en tiempo real (no duro, pero "lo antes posible") de algunas métricas. Un muy buen ejemplo es lo que sucede aquí, en SO, ¡con insignias! :) Necesito obtener algo realmente similar.

Para resolver el segundo problema, voy a introducir cálculos "parciales" o "incrementales": mientras tenga un conjunto de información relevante, la procesaré. Luego, cuando llega otra información vinculada, calculo la diferencia y actualizo los datos en consecuencia, pero también debo notificar a otros módulos (dependientes).

Las preguntas)

- 1) ¿Cuál es la mejor manera de hacerlo? - 2) Relacionado: ¿cuál es la mejor manera de "notificar" a otros módulos (ejecutables Java, en mi caso) que hay datos relevantes disponibles?

Puedo ver tres maneras:

  • agregue otras tablas "sin datos" al DB, en las que cada módulo escriba "Hola, he hecho esto y está disponible". Cuando el cronjob inicia otro módulo, lee la (s) tabla (s), decide que puede calcular el subconjunto xxx y lo hace. Y así
  • use Message Queues, como ZeroMQ, (o Apache Camel, como se sugiere @mjn) en lugar de tablas de DB
  • utilice un almacén de clave-valor, como Redis, en lugar de tablas de DB

Editar: Estoy convencido de que un enfoque basado en colas es el camino a seguir, agregué la opción "mesa + sondeo" para completar, pero ahora entiendo que es solo una distracción (obviamente, todos van a responder "sí, use las colas" , el sondeo es malo "- ¡y con razón!). Así que permítanme reformular la pregunta para: ¿Cuáles son las ventajas / desventajas de usar un MQ sobre una tienda de clave-valor con pub / sub como Redis?

  • 3) ¿hay alguna solución que me ayude a deshacerme por completo de los cronjobs?

Editar: en particular, en el caso de que sea, significa: ¿hay algún mecanismo en algún almacén de MQ y / o valor-clave que me permita publicar mensajes con un "tiempo"? ¿Te gusta "entregarlo en 1 día"? Con persistencia y garantía de entrega "casi una vez", obviamente

  • 4) ¿debo compilar esta solución basada en mensajes (¿evento?) Como un servicio centralizado, ejecutándola como un daemon / servicio en uno de los servidores?
  • 5) ¿Debo abandonar esta idea de iniciar los suscriptores bajo demanda y hacer que cada módulo funcione de manera continua como daemon / servicio?
  • 6) cuáles son los pro y los contras (confiabilidad, punto único de falla vs. uso de recursos y complejidad ...)?

Editar: este es el bit que más me importa: me gustaría "hacer cola" para activar "módulos" basados ​​en mensajes en la cola, similar a la Activación de MSMQ. ¿Es una buena idea? ¿Hay algo en el mundo de Java que lo haga, debería implementarlo yo mismo (a través de un MQ o más de Redis), o debería ejecutar cada módulo como un daemon? (incluso si algunos cálculos ocurren típicamente en ráfagas, dos horas de procesamiento seguido de dos días de inactividad?)

NOTA: No puedo usar contenedores pesados ​​/ EJB (No Glassfish o similar)

Editar: Camel también parece demasiado pesado para mí. Estoy buscando algo realmente ligero aquí , tanto en términos de recursos como de complejidad del desarrollo


1> Sugiero usar una cola de mensajes, elija la cola según sus requisitos, pero para la mayoría de los casos cualquiera haría, le sugiero que elija una cola basada en el protocolo JMS (mq activo) o AMQP (conejo mq) y escriba un simple Envuelva sobre él o use los provistos por spring-> spring-jms o spring-amqp

2> Puede escribir en cola a los consumidores para que notifiquen a su sistema que llega un nuevo mensaje, por ejemplo, en rabbit puede implementar la interfaz MessageListener

public class MyListener implements MessageListener { @Override public void onMessage(Message message) { /* Handle the message */ } }

3> Si utiliza consumidores asincrónicos como en <2> puede deshacerse de todos los trabajos de sondeo y cron

4> Depende de sus requisitos -> Si tiene millones de eventos / mensajes pasando por su cola, entonces ejecutar el middleware de cola en un servidor centralizado tiene sentido.

5> Si el consumo de recursos no es un problema, entonces mantener a sus consumidores / suscriptores funcionando todo el tiempo es la manera más fácil de hacerlo. Si estos consumidores se distribuyen, puede organizarlos utilizando un servicio como el cuidador de zoos

6> Escalabilidad -> La mayoría de los sistemas de colas proporcionan una fácil distribución de mensajes, por lo tanto, siempre que sus consumidores sean apátridas, entonces se puede escalar simplemente agregando nuevos consumidores y alguna configuración.


Después de implementarlo, siento que responder mi propia pregunta puede ser bueno para las personas que vendrán a visitar en el futuro.

Al final, fui con Redis. Es realmente rápido y escalable. Y me gusta mucho su flexibilidad: es mucho más flexible que las colas de mensajes. ¿Estoy afirmando que Redis es mejor en MQs que los diversos MQ que hay? Bueno, en mi caso específico, creo que sí. El punto es: si algo no se ofrece listo para usar, puede compilarlo (generalmente, usando MULTI - ¡pero incluso puede usar LUA para una personalización más avanzada!).

Por ejemplo, seguí esta buena respuesta para implementar un pub / sub "persistente", recuperable (es decir, un pub / sub que permite a los clientes morir y volver a conectarse sin perder mensajes).

Esto me ayudó con mi escalabilidad y mis requisitos de "confiabilidad": decidí mantener cada pieza en la tubería independiente (un deamon por ahora), pero agregué un monitor que examina listas / colas en Redis; si algo no se consume (o se consume demasiado lento), el monitor genera un nuevo consumidor. También estoy pensando en ser verdaderamente "elástico" y agregar la capacidad de los consumidores para matarse cuando no hay trabajo por hacer.

Otro ejemplo: ejecución de actividades programadas. Estoy siguiendo este enfoque , que parece bastante popular, por ahora. Pero estoy ansioso por probar las notificaciones del espacio de teclas, para ver si una combinación de teclas y notificaciones que expiran puede ser un enfoque superior.

Finalmente, como una biblioteca para acceder a Redis, mi elección fue para Jedis: es popular, compatible y ofrece una interfaz agradable para implementar pub / sub como oyentes. No es el mejor enfoque (idiomático) con Scala, pero funciona bien.