multithreading spring akka event-driven-design project-reactor

multithreading - ¿Qué tipo de "EventBus" usar en Spring? Incorporado, Reactor, Akka?



event-driven-design project-reactor (3)

Vamos a comenzar una nueva aplicación de Spring 4 en unas pocas semanas. Y nos gustaría usar alguna arquitectura orientada a eventos. Este año leí aquí y allá sobre "Reactor" y mientras lo buscaba en la web, me topé con "Akka".

Entonces por ahora tenemos 3 opciones:

No pude encontrar una comparación real de esos.

Por ahora solo necesitamos algo como:

  • X registra para escuchar el Event E
  • Y registra para escuchar el Event E
  • Z envía un Event E

Y luego X e Y recibirán y manejarán el evento.

Lo más probable es que usemos esto de forma asíncrona, pero seguramente también habrá algunos escenarios sincrónicos. Y lo más probable es que enviemos siempre una clase como evento. (Las muestras de Reactor hacen principalmente uso de patrones de cadenas y cadenas, pero también admiten objetos).

Por lo que yo entiendo, ApplicationEvent funciona sincrónico por defecto y Reactor funciona de manera asincrónica. Y Reactor también permite usar el método await() para que sea un poco sincrónico. Akka proporciona más o menos lo mismo que Reactor , pero también admite Remoting.

Acerca del método await() Reactor: ¿puede esperar a que se completen varios hilos? ¿O tal vez incluso un conjunto parcial de esos hilos? Si tomamos el ejemplo de arriba:

  • X registra para escuchar el Event E
  • Y registra para escuchar el Event E
  • Z envía un Event E

¿Es posible hacerlo sincrónico diciendo: Espere a que X e Y completen? ¿Y es posible hacerlo esperar solo para X , pero no para Y ?

Tal vez también hay algunas alternativas? ¿Qué tal, por ejemplo, JMS?

¡Muchas preguntas, pero espero que puedas dar algunas respuestas!

¡Gracias!

EDITAR: Ejemplos de casos de uso

  1. Cuando se dispara un evento específico, me gustaría crear 10000 correos electrónicos. Cada correo electrónico debe ser generado con contenido específico del usuario. Así que crearía muchos subprocesos (max = núcleos de CPU del sistema) que crean los correos y no bloquean el hilo de la persona que llama, porque esto puede demorar algunos minutos.

  2. Cuando se desencadena un evento específico, me gustaría recopilar información de una cantidad desconocida de servicios. Cada búsqueda lleva unos 100 ms. Aquí podría imaginarme usando Reactor''s await , porque necesito esa información para continuar mi trabajo en el hilo principal.

  3. Cuando se dispara un evento específico, me gustaría realizar algunas operaciones según la configuración de la aplicación. Por lo tanto, la aplicación debe ser capaz de (des) registrar dinámicamente / gestores de eventos. Ellos harán sus propias cosas con el Evento y no me importa. Así que crearía un hilo para cada uno de esos manejadores y simplemente continuaría haciendo mi trabajo en el hilo principal.

  4. Desacoplamiento simple: básicamente conozco todos los receptores, pero simplemente no quiero llamar a todos los receptores en mi código. Esto debería hacerse mayormente sincrónicamente.

Parece que necesito un ThreadPool o un RingBuffer. ¿Los frameworks tienen RingBuffers dinámicos, que crecen en tamaño si es necesario?


Defina cuidadosamente lo que quiere del marco. Si un marco tiene más características de las que necesita, no siempre es bueno. Más características significa más errores, más código para aprender y menos rendimiento.

Algunas características que te preocupan son:

  • la naturaleza de los actores (hilos u objetos ligeros)
  • capacidad de trabajar en un clúster de máquina (Akka)
  • colas de mensajes persistentes (JMS)
  • características específicas como señales (eventos sin información), transiciones (objetos para combinar mensajes de diferentes puertos en eventos complejos, ver Redes de Petri), etc.

Tenga cuidado con las funciones síncronas como "espera": bloquea todo el hilo y es peligroso cuando los actores se ejecutan en un grupo de subprocesos (falta de hilo).

Más marcos para mirar:

Fork-Join Pool - en algunos casos, permite await sin pasar hambre

Sistemas de flujo de trabajo científico

Dataflow framework para Java : señales, transiciones

ADD-ON : dos tipos de actores.

En general, el sistema de trabajo paralelo se puede representar como un gráfico, donde los nodos activos se envían mensajes entre sí. En Java, como en la mayoría de los otros lenguajes principales, los nodos activos (actores) pueden implementarse como hilos o tareas (ejecutables o invocables) ejecutados por un grupo de subprocesos. Normalmente, parte de los actores son hilos y parte son tareas. Ambos enfoques tienen sus ventajas y desventajas, por lo que es vital elegir la implementación más adecuada para cada actor en el sistema. En resumen, los hilos pueden bloquear (y esperar eventos) pero consumen mucha memoria para sus pilas. Las tareas no pueden bloquear, pero usan stacks compartidos (de subprocesos en un grupo).

Si una tarea llama a una operación de bloqueo, excluye un hilo agrupado del servicio. Si se bloquean muchas tareas, pueden excluir todos los hilos, lo que causa un punto muerto: las tareas que pueden desbloquear tareas bloqueadas no se pueden ejecutar. Este tipo de punto muerto se llama inanición de hilos . Si, en un intento de evitar la falta de hilo, configure el grupo de subprocesos como ilimitado, simplemente convertimos las tareas en subprocesos, perdiendo las ventajas de las tareas.

Para eliminar llamadas a operaciones de bloqueo en tareas, la tarea se debe dividir en dos (o más): las primeras llamadas de tarea bloquean la operación y las salidas, y el resto se formatea como una tarea asíncrona iniciada cuando finaliza la operación de bloqueo. Por supuesto, la operación de bloqueo debe tener una interfaz asincrónica alternativa. Por lo tanto, por ejemplo, en lugar de leer el socket de forma síncrona, se deben usar las bibliotecas NIO o NIO2.

Desafortunadamente, la biblioteca estándar de Java carece de contrapartes asíncronas para instalaciones de sincronización populares como colas y semáforos. Afortunadamente, son fáciles de implementar desde cero (consulte el marco de Dataflow para Java para ver ejemplos).

Entonces, hacer cálculos puramente con tareas sin bloqueo es posible, pero aumenta el tamaño del código. El asesoramiento evidente es utilizar subprocesos siempre que sea posible y tareas solo para cálculos masivos simples.


No estoy seguro de poder responder adecuadamente a tu pregunta en este pequeño espacio. ¡Pero lo intentaré! :)

El sistema ApplicationEvent de Spring y Reactor son muy distintos en lo que respecta a la funcionalidad. ApplicationEvent enrutamiento ApplicationEvent se basa en el tipo manejado por ApplicationListener . Algo más complicado que eso y usted tendrá que implementar la lógica usted mismo (eso no es necesariamente algo malo, sin embargo). Reactor, sin embargo, proporciona una capa de enrutamiento completa que también es muy ligera y completamente extensible. Cualquier similitud en la función entre los dos extremos tiene en cuenta su capacidad para suscribirse y publicar eventos, lo cual es realmente una característica de cualquier sistema basado en eventos. Además, no olvide el nuevo módulo de spring-messaging primavera con Spring 4. Es un subconjunto de las herramientas disponibles en Spring Integration y también proporciona abstracciones para crear una arquitectura basada en eventos.

Reactor lo ayudará a resolver un par de problemas clave que de otra manera tendría que gestionar usted mismo:

Selector de coincidencia : Reactor hace la coincidencia de Selector , que abarca un rango de coincidencias, desde una .equals(Object other) simple de .equals(Object other) a una coincidencia de plantillas de URI más compleja que permite la extracción de marcador de posición. También puede ampliar los selectores incorporados con su propia lógica personalizada para que pueda usar objetos enriquecidos como claves de notificación (como objetos de dominio, por ejemplo).

API Stream y Promise : Mencionaste la Promise API ya con referencia al método .await() , que realmente está destinado a código existente que espera un comportamiento de bloqueo. Al escribir un código nuevo con Reactor, no se puede enfatizar lo suficiente como para usar composiciones y devoluciones de llamada para utilizar de manera efectiva los recursos del sistema al no bloquear los hilos. Bloquear al que llama casi nunca es una buena idea en una arquitectura que depende de un pequeño número de subprocesos para ejecutar un gran volumen de tareas. Los futuros simplemente no son escalables en la nube, por lo que las aplicaciones modernas aprovechan las soluciones alternativas.

Su aplicación podría estar estructurada con Streams o Promises, ya sea una, aunque, sinceramente, creo que encontrará la Stream más flexible. El beneficio clave es la capacidad de compilación de la API, que le permite vincular acciones juntas en una cadena de dependencia sin bloqueo. Como un ejemplo completamente improvisado basado en su caso de uso de correo electrónico, usted describe:

@Autowired Environment env; @Autowired SmtpClient client; // Using a ThreadPoolDispatcher Deferred<DomainObject, Stream<DomainObject>> input = Streams.defer(env, THREAD_POOL); input.compose() .map(new Function<DomainObject, EmailTemplate>() { public EmailTemplate apply(DomainObject in) { // generate the email return new EmailTemplate(in); } }) .consume(new Consumer<EmailTemplate>() { public void accept(EmailTemplate email) { // send the email client.send(email); } }); // Publish input into Deferred DomainObject obj = reader.readNext(); if(null != obj) { input.accept(obj); }

Reactor también proporciona el Boundary que es básicamente un CountDownLatch para bloquear a consumidores arbitrarios (para que no tenga que construir una Promise si todo lo que quiere hacer es bloquear para una finalización del Consumer ). Puede usar un Reactor procesar en ese caso y usar los métodos on() y notify() para activar la verificación del estado del servicio.

Sin embargo, para algunas cosas, parece que lo que quieres es un Future devuelto por un ExecutorService , ¿no? ¿Por qué no simplemente mantener las cosas simples? Reactor solo será de beneficio real en situaciones en las que el rendimiento de su rendimiento y la eficacia de sobrecarga sean importantes. Si está bloqueando el hilo de llamada, entonces es probable que esté borrando las ganancias de eficacia que el Reactor le dará de todos modos, por lo que podría estar mejor en ese caso utilizando un conjunto de herramientas más tradicional.

Lo bueno de la apertura de Reactor es que no hay nada que impida que los dos interactúen. Puede mezclar libremente Futures con Consumers sin estática. En ese caso, solo tenga en cuenta que solo será tan rápido como su componente más lento.


Vamos a ignorar SpringEvent ApplicationEvent ya que en realidad no está diseñado para lo que preguntas (se trata más sobre la administración del ciclo de vida de bean).

Lo que necesitas averiguar es si quieres hacerlo

  1. el modo orientado a objetos (es decir, actores, consumidores dinámicos, registrados sobre la marcha) O
  2. el modo de servicio (consumidores estáticos, registrados al inicio).

Usando su ejemplo de X e Y son ellos:

  1. instancias efímeras (1) o son
  2. objetos de servicio / singletons de larga duración (2)?

Si necesita registrar consumidores sobre la marcha, Akka es una buena opción (no estoy seguro sobre el reactor ya que nunca lo he usado). Si no desea consumir en objetos efímeros, puede usar JMS o AMQP.

También debe comprender que este tipo de bibliotecas intentan resolver dos problemas:

  1. Concurrencia (es decir, hacer cosas en paralelo en la misma máquina)
  2. Distribución (es decir, hacer cosas en paralelo en múltiples máquinas)

Reactor y Akka se centran principalmente en el n. ° 1. Akka acaba de agregar el soporte de clúster y la abstracción de actor hace que sea más fácil hacer # 2. Message Queues (JMS, AMQP) se centran en el n. ° 2.

Para mi propio trabajo, realizo la ruta de servicio y utilizo Guava EventBus y RabbitMQ fuertemente modificados. Utilizo anotaciones similares al Guava Eventbus pero también tengo anotaciones para los objetos enviados en el bus. Sin embargo, puedes usar el EventBus de Guava en modo Async como POC y luego crear el tuyo como lo hice yo.

Puede pensar que necesita tener consumidores dinámicos (1), pero la mayoría de los problemas se pueden resolver con un simple pub / sub. También administrar consumidores dinámicos puede ser complicado (por lo tanto, Akka es una buena opción porque el modelo de actor tiene todo tipo de administración para esto)