node event driven book node.js rest websocket apache-kafka microservices

node.js - book - event driven architecture microservices



¿Cómo manejar las solicitudes HTTP en un Microservice/Event Driven Architecture? (6)

¿Qué pasa con el uso de Promises ? Socket.io también podría ser una solución si quieres en tiempo real.

Echa un vistazo a CQRS también. Este patrón arquitectónico se adapta al modelo impulsado por eventos y la arquitectura de microservicio.

Aun mejor. Lea this .

Fondo:

Estoy creando una aplicación y la arquitectura propuesta es Event / Message Driven en una arquitectura de microservicio.

La forma monolítica de hacer algo es que tengo una User/HTTP request y que ejecuta algunos comandos que tienen una synchronous response directa. Por lo tanto, responder a la misma solicitud de usuario / HTTP es ''sin problemas''.

El problema:

El usuario envía una HTTP request al Servicio de UI (hay varios Servicios de UI) que dispara algunos eventos a una cola (Kafka / RabbitMQ / any). Una N de servicios recoge ese Evento / Mensaje, hace un poco de magia en el camino y luego, en algún momento, el mismo Servicio de UI debería recoger esa respuesta y devolverla al usuario que originó la solicitud HTTP. El procesamiento de la solicitud es ASYNC pero la Solicitud de User/HTTP REQUEST->RESPONSE es SYNC según su interacción típica de HTTP.

Pregunta: ¿Cómo envío una respuesta al mismo Servicio de UI que originó la acción (El servicio que está interactuando con el usuario a través de HTTP) en este mundo impulsado por Agnósticos / Eventos?

Mi investigación hasta ahora he estado mirando a mi alrededor y parece que algunas personas están resolviendo ese problema usando WebSockets.

Pero la capa de complejidad es que debe haber alguna tabla que se (RequestId->Websocket(Client-Server)) que se usa para "descubrir" qué nodo en la puerta de enlace tiene la conexión de websocket para alguna respuesta en particular. Pero incluso si entiendo el problema y la complejidad, no puedo encontrar ningún artículo que me proporcione información sobre cómo resolver este problema en la capa de implementación. Y esta aún no es una opción viable debido a integraciones de terceros como proveedores de pagos (WorldPay) que esperan REQUEST->RESPONSE , especialmente en la validación 3DS.

Así que, de alguna manera, me resisto a pensar que WebSockets es una opción. Pero incluso si los WebSockets están bien para las aplicaciones de Web, la API que se conecta a sistemas externos no es una gran arquitectura.

** ** ** Actualización: ** ** **

Incluso si el sondeo largo es una solución posible para una API de servicio web con un Location header 202 Accepted y un Location header retry-after header , no sería eficaz para un sitio web de alta concurrencia y alta capacidad. Imagine a una gran cantidad de personas que intentan obtener la actualización del estado de la transacción en CADA solicitud que realizan y usted tiene que invalidar el caché de CDN (vaya y juegue con ese problema ahora! Ja).

Pero lo más importante y relevante para mi caso es que tengo API de terceros, como los sistemas de pago, en los que los sistemas 3DS tienen redirecciones automáticas gestionadas por el proveedor de pagos y esperan un REQUEST/RESPONSE flow típico de REQUEST/RESPONSE flow , por lo que este modelo no funcionaría. Yo ni el modelo de sockets funcionaría.

Debido a este caso de uso, la HTTP REQUEST/RESPONSE debe manejarse de la manera típica en la que tengo un cliente tonto que espera que la complejidad de la precesión se maneje en el back-end.

Así que estoy buscando una solución donde externamente tengo una Request->Response típica Request->Response (SYNC) y la complejidad del estado (ASYNCrony del sistema) se maneja internamente

Un ejemplo del sondeo largo, pero este modelo no funcionaría para API de terceros, como el proveedor de pagos en 3DS Redirects que no están bajo mi control.

POST /user Payload {userdata} RETURNs: HTTP/1.1 202 Accepted Content-Type: application/json; charset=utf-8 Date: Mon, 27 Nov 2018 17:25:55 GMT Location: https://mydomain/user/transaction/status/:transaction_id Retry-After: 10 GET https://mydomain/user/transaction/status/:transaction_id


A continuación se muestra un ejemplo muy claro de cómo podría implementar el servicio de UI para que funcione con un flujo de solicitud / respuesta HTTP normal. Utiliza la clase node.js events.EventEmitter para "enrutar" las respuestas al controlador HTTP correcto.

Esquema de la implementación:

  1. Conectar productor / cliente consumidor a Kafka

    1. El productor se utiliza para enviar los datos de solicitud a los microservicios internos.
    2. El consumidor se utiliza para escuchar los datos de los microservicios, lo que significa que la solicitud se ha procesado y asumo que esos elementos de Kafka también contienen los datos que deben devolverse al cliente HTTP.
  2. Crear un despachador de eventos global de la clase EventEmitter

  3. Registre un controlador de solicitud HTTP que
    1. Crea un UUID para la solicitud y lo incluye en la carga útil enviada a Kafka
    2. Registra un detector de eventos con nuestro distribuidor de eventos donde se utiliza el UUID como el nombre del evento que escucha.
  4. Comience a consumir el tema Kafka y recupere el UUID que el controlador de solicitudes HTTP está esperando y emita un evento para él. En el código de ejemplo no estoy incluyendo ninguna carga útil en el evento emitido, pero normalmente querría incluir algunos datos de los datos de Kafka como un argumento para que el controlador HTTP pueda devolverlos al cliente HTTP.

¡Tenga en cuenta que traté de mantener el código lo más pequeño posible, omitiendo el error y el tiempo de espera, etc.!

También tenga en cuenta que kafkaProduceTopic y kafkaConsumTopic son los mismos temas para simplificar las pruebas, no es necesario que otro servicio / función produzca en el tema de consumo de UI Service .

El código asume que los paquetes kafka-node y uuid se han instalado npm y que se puede acceder a Kafka en localhost:9092

const http = require(''http''); const EventEmitter = require(''events''); const kafka = require(''kafka-node''); const uuidv4 = require(''uuid/v4''); const kafkaProduceTopic = "req-res-topic"; const kafkaConsumeTopic = "req-res-topic"; class ResponseEventEmitter extends EventEmitter {} const responseEventEmitter = new ResponseEventEmitter(); var HighLevelProducer = kafka.HighLevelProducer, client = new kafka.Client(), producer = new HighLevelProducer(client); var HighLevelConsumer = kafka.HighLevelConsumer, client = new kafka.Client(), consumer = new HighLevelConsumer( client, [ { topic: kafkaConsumeTopic } ], { groupId: ''my-group'' } ); var s = http.createServer(function (req, res) { // Generate a random UUID to be used as the request id that // that is used to correlated request/response requests. // The internal micro-services need to include this id in // the "final" message that is pushed to Kafka and consumed // by the ui service var id = uuidv4(); // Send the request data to the internal back-end through Kafka // In real code the Kafka message would be a JSON/protobuf/... // message, but it needs to include the UUID generated by this // function payloads = [ { topic: kafkaProduceTopic, messages: id}, ]; producer.send(payloads, function (err, data) { if(err != null) { console.log("Error: ", err); return; } }); responseEventEmitter.once(id, () => { console.log("Got the response event for ", id); res.write("Order " + id + " has been processed/n"); res.end(); }) }); s.timeout = 10000; s.listen(8080); // Listen to the Kafka topic that streams messages // indicating that the request has been processed and // emit an event to the request handler so it can finish. // In this example the consumed Kafka message is simply // the UUID of the request that has been processed (which // is also the event name that the response handler is // listening to). // // In real code the Kafka message would be a JSON/protobuf/... message // which needs to contain the UUID the request handler generated. // This Kafka consumer would then have to deserialize the incoming // message and get the UUID from it. consumer.on(''message'', function (message) { responseEventEmitter.emit(message.value); });


Buena pregunta. Mi respuesta a esto es, introducir flujos síncronos en el sistema.

Estoy usando rabbitMq, así que no sé nada sobre kafka, pero debes buscar el flujo síncrono de kafka.

WebSockets parece un overkiil.

Espero que ayude.


Como esperaba, la gente trata de encajar todo en un concepto, incluso si no encaja allí. Esto no es una crítica, es una observación de mi experiencia y después de leer su pregunta y otras respuestas.

Sí, tienes razón en que la arquitectura de microservicios se basa en patrones de mensajería asíncronos. Sin embargo, cuando hablamos de UI, hay 2 casos posibles en mi mente:

  1. La interfaz de usuario necesita una respuesta inmediata (por ejemplo, operaciones de lectura o los comandos en los que el usuario espera la respuesta de inmediato). Estos no tienen que ser asíncronos . ¿Por qué agregaría una sobrecarga de mensajería y asincronía si la respuesta se requiere en la pantalla de inmediato? No tiene sentido. Se supone que la arquitectura de microservicio resuelve problemas en lugar de crear nuevos agregando una sobrecarga.

  2. La IU se puede reestructurar para tolerar una respuesta demorada (por ejemplo, en lugar de esperar el resultado, la IU solo puede enviar un comando, recibir un acuse de recibo y dejar que el usuario haga otra cosa mientras se prepara la respuesta). En este caso, puede introducir asincronía. El servicio de puerta de enlace (con el que la UI interactúa directamente) puede organizar el procesamiento asíncrono (espera eventos completos, etc.) y, cuando está listo, puede comunicarse con la IU. He visto IU utilizando SignalR en tales casos, y el servicio de puerta de enlace era una API que aceptaba conexiones de socket. Si el navegador no admite sockets, debería recurrir al sondeo idealmente. De todos modos, el punto importante es que esto solo puede funcionar con una contingencia: la IU puede tolerar respuestas retrasadas .

Si los microservicios son realmente relevantes en su situación (caso 2), entonces estructure el flujo de la interfaz de usuario en consecuencia, y no debe haber un desafío en los microservicios en el back-end. En ese caso, su pregunta se reduce a aplicar la arquitectura controlada por eventos al conjunto de servicios (el borde es el microservicio de puerta de enlace que conecta las interacciones de UI y eventos controlados por eventos). Este problema (servicios controlados por eventos) es solucionable y usted lo sabe. Solo debe decidir si puede volver a pensar cómo funciona su interfaz de usuario.


Desafortunadamente, creo que es probable que tengas que usar encuestas largas o sitios web para lograr algo como esto. Debe "enviar" algo al usuario o mantener abierta la solicitud http hasta que algo vuelva.

Para manejar la devolución de los datos al usuario real, puede usar algo como socket.io . Cuando un usuario se conecta, socket.io crea una identificación. Cada vez que un usuario se conecta, asigna el ID de usuario a la identificación que le proporciona socket.io. Una vez que cada solicitud tiene un ID de usuario adjunto, puede emitir el resultado al cliente correcto. El flujo sería algo como esto:

Orden de solicitudes web (POST con datos y userId)

el servicio ui coloca el pedido en la cola (este orden debe tener ID de usuario)

x Número de servicios que se ejecutan en el pedido (pasando UserId a lo largo de cada vez)

ui servicio consume desde el tema. En algún momento, los datos aparecen sobre el tema. Los datos que consume tienen el ID de usuario, el servicio ui busca el mapa para averiguar a qué zócalo emitir.

Cualquiera que sea el código que se esté ejecutando en su interfaz de usuario también tendría que estar impulsado por eventos, por lo que trataría con un impulso de datos sin el contexto de la solicitud original. Podrías usar algo como redux para esto. Esencialmente, el servidor creará acciones de redux en el cliente, ¡funciona bastante bien!

Espero que esto ayude.


Desde una perspectiva más general, al recibir la solicitud, puede registrar un suscriptor en la cola en el contexto de la solicitud actual (es decir, cuando el objeto de la solicitud está dentro del alcance) que recibe un acuse de recibo de los servicios responsables a medida que finalizan sus trabajos (como una máquina de estado). que mantiene el avance del número total de operaciones). Cuando se alcanza el estado de terminación, devuelve la respuesta y elimina el oyente. Creo que esto funcionará en cualquier cola de mensajes de estilo pub / sub. Aquí hay una demostración muy simplificada de lo que estoy sugiriendo.

// a stub for any message queue using the pub sub pattern let Q = { pub: (event, data) => {}, sub: (event, handler) => {} } // typical express request handler let controller = async (req, res) => { // initiate saga let sagaId = uuid() Q.pub("saga:register-user", { username: req.body.username, password: req.body.password, promoCode: req.body.promoCode, sagaId: sagaId }) // wait for user to be added let p1 = new Promise((resolve, reject) => { Q.sub("user-added", ack => { resolve(ack) }) }) // wait for promo code to be applied let p2 = new Promise((resolve, reject) => { Q.sub("promo-applied", ack => { resolve(ack) }) }) // wait for both promises to finish successfully try { var sagaComplete = await Promise.all([p1, p2]) // respond with some transformation of data res.json({success: true, data: sagaComplete}) } catch (e) { logger.error(''saga failed due to reasons'') // rollback asynchronously Q.pub(''rollback:user-added'', {sagaId: sagaId}) Q.pub(''rollback:promo-applied'', {sagaId: sagaId}) // respond with appropriate status res.status(500).json({message: ''could not complete saga. Rolling back side effects''}) } }

Como probablemente pueda ver, este parece ser un patrón general que se puede abstraer en un marco para reducir la duplicación de código y administrar inquietudes transversales. De esto trata esencialmente el patrón de la saga . El cliente solo esperará el tiempo que sea necesario para finalizar las operaciones requeridas (que es lo que sucedería incluso si todo estuviera sincronizado), más la latencia agregada debido a la comunicación entre servicios. Asegúrese de no bloquear el hilo si está utilizando un sistema basado en bucle de eventos como NodeJS o Python Tornado.

El simple uso de un mecanismo de empuje basado en socket web no mejora necesariamente la eficiencia o el rendimiento de su sistema. Sin embargo, se recomienda que envíe mensajes al cliente mediante una conexión de socket porque hace que su arquitectura sea más general (incluso sus clientes se comportan como lo hacen sus servicios), coherente y permite una mejor separación de inquietudes. También le permitirá escalar independientemente el servicio push sin preocuparse por la lógica de negocios. El patrón de la saga se puede ampliar para habilitar las reversiones en caso de fallas parciales o tiempos de espera y hace que su sistema sea más manejable.