remove nodejs node example javascript node.js asynchronous reactive-programming rxjs

javascript - nodejs - rxjs package



ProgramaciĆ³n reactiva: RxJS vs EventEmitter en Node.js (3)

Recientemente comencé a buscar en las RxJS y RxJava (de Netflix) que trabajan sobre el concepto de Programación reactiva.

Node.js funciona sobre la base de bucles de eventos, que le proporciona todo el arsenal para la programación asincrónica y las bibliotecas de nodos subsecuentes como "clúster" lo ayudan a obtener el mejor rendimiento de su máquina multinúcleo. Y Node.js también le proporciona la funcionalidad EventEmitter donde puede suscribirse a eventos y actuar de forma asincrónica.

Por otro lado, si entiendo correctamente RxJS (y la Programación reactiva en general) funciona según el principio de las secuencias de eventos, suscribirse a las secuencias de eventos, transformando los datos de la secuencia de eventos de forma asíncrona.

Entonces, la pregunta es qué significa usar paquetes Rx en Node.js. Cuán diferente es el bucle de eventos del Nodo, el emisor del evento y las suscripciones a las transmisiones y suscripciones del Rx.


Mi respuesta ~ hace dos años era incorrecta y no puedo editarla significativamente ni eliminarla. Los observables no son como EventEmitters. En algunos casos, pueden actuar como EventEmitters, es decir, cuando son multidifundidos utilizando temas RxJS, pero generalmente no actúan como EventEmitters.

En resumen, un sujeto RxJS es como un EventEmitter, pero un RxJS Observable es una interfaz más genérica. Los observables son más similares a las funciones con cero argumentos.

Considera lo siguiente:

function foo() { console.log(''Hello''); return 42; } var x = foo.call(); // same as foo() console.log(x); var y = foo.call(); // same as foo() console.log(y);

Por supuesto, todos esperamos ver como salida:

"Hello" 42 "Hello" 42

Puedes escribir el mismo comportamiento arriba, pero con Observables:

var foo = Rx.Observable.create(function (observer) { console.log(''Hello''); observer.next(42); }); foo.subscribe(function (x) { console.log(x); }); foo.subscribe(function (y) { console.log(y); });

Y el resultado es el mismo:

"Hello" 42 "Hello" 42

Esto se debe a que tanto las funciones como los Observables son cálculos perezosos. Si no llama a la función, el console.log(''Hello'') no sucederá. También con Observables, si no "llama" ( subscribe ), console.log(''Hello'') no sucederá. Además, "llamar" o "suscribirse" es una operación independiente: dos llamadas a funciones desencadenan dos efectos secundarios por separado, y dos suscripciones a Observable desencadenan dos efectos secundarios distintos. A diferencia de EventEmitters que comparten los efectos secundarios y tienen una ejecución ansiosa independientemente de la existencia de suscriptores, los Observables no tienen ejecución compartida y son flojos.

Hasta el momento, no hay diferencia entre el comportamiento de una función y un Observable. Esta pregunta de se hubiera redactado mejor como "RxJS Observables vs functions?".

Algunas personas afirman que los Observables son asincrónicos. Eso no es verdad. Si rodea una llamada de función con registros, como esta:

console.log(''before''); console.log(foo.call()); console.log(''after'');

Obviamente verá la salida:

"before" "Hello" 42 "after"

Y este es el mismo comportamiento con Observables:

console.log(''before''); foo.subscribe(function (x) { console.log(x); }); console.log(''after'');

Y el resultado:

"before" "Hello" 42 "after"

Lo que demuestra que la suscripción de foo fue completamente sincrónica, al igual que una función.

Entonces, ¿cuál es realmente la diferencia entre un Observable y una función?

Los observables pueden "devolver" valores múltiples a lo largo del tiempo , algo que las funciones no pueden. No puedes hacer esto:

function foo() { console.log(''Hello''); return 42; return 100; // dead code. will never happen }

Las funciones solo pueden devolver un valor. Observables, sin embargo, puede hacer esto:

var foo = Rx.Observable.create(function (observer) { console.log(''Hello''); observer.next(42); observer.next(100); // "return" another value observer.next(200); }); console.log(''before''); foo.subscribe(function (x) { console.log(x); }); console.log(''after'');

Con salida síncrona:

"before" "Hello" 42 100 200 "after"

Pero también puede "devolver" valores de forma asíncrona:

var foo = Rx.Observable.create(function (observer) { console.log(''Hello''); observer.next(42); observer.next(100); observer.next(200); setTimeout(function () { observer.next(300); }, 1000); });

Con salida:

"before" "Hello" 42 100 200 "after" 300

Para concluir,

  • func.call() significa " dame un valor inmediatamente (sincrónicamente) "
  • obsv.subscribe() significa " dame valores. Tal vez muchos de ellos, tal vez sincrónicamente, quizás asincrónicamente "

Así es como los Observables son una generalización de funciones (que no tienen argumentos).


Tiene razón en que las transmisiones Rx y EventEmitter son muy similares, ambas son implementaciones del patrón Observer.

La diferencia es que Rx contiene funciones para transformar y combinar secuencias de eventos. Imaginemos, por ejemplo, que queremos retrasar cada "evento de respuesta" en 2 segundos. Con EventEmitter, tendría que suscribirse a eso y hacer un tiempo de espera manualmente:

eventEmitter.on(''response'', function(res) { setTimeout(function() { /* handle res */ }, 2000); });

Con Rx, puede crear una nueva secuencia de eventos que simplemente es la secuencia original aplicada a la función de retardo:

responseStream.delay(2000).subscribe(function(res) { /* handle res */ });

delay() es una función simple, y una de las muchas otras disponibles . Hay tantas maneras diferentes de modificar flujos, que es posible programar una gran cantidad de lógica de aplicación simplemente transformando flujos con todas las funciones posibles, en lugar de confiar en la lógica de bajo nivel como setTimeout .

También vea esta exploración visual e interactiva de esas funciones.


¿Cuándo se apega un oyente a Emitter?

Con los emisores de eventos, los oyentes son notificados cada vez que ocurre un evento que les interesa. Cuando se agrega un nuevo oyente después de que ha ocurrido el evento, él no sabrá sobre el evento pasado. Además, el nuevo oyente no conocerá la historia de los sucesos anteriores. Por supuesto, podríamos programar manualmente nuestro emisor y nuestro oyente para manejar esta lógica personalizada.

Con flujos reactivos, el suscriptor recibe la secuencia de eventos que sucedió desde el principio. Entonces, el momento en que se suscribe no es estricto. Ahora puede realizar diversas operaciones en la transmisión para obtener la subsecuencia de eventos que le interesan.

La ventaja de esto sale a la luz:

  • cuando necesitamos procesar los eventos que sucedieron en el tiempo
  • orden en que sucedieron
  • patrones en los que sucedieron los eventos (digamos que después de cada evento de compra en acciones de Google, un evento de venta en acciones de Microsoft sucede en 5 minutos)

Flujos de orden superior:

Una secuencia de orden superior es una "secuencia de flujos": una secuencia cuyos valores de eventos son en sí mismos flujos.

Con los emisores de eventos, una forma de hacerlo es tener el mismo oyente conectado a múltiples emisores de eventos. Se vuelve complejo cuando necesitamos correlacionar el evento sucedido en diferentes emisores.

Con las corrientes reactivas, es muy fácil. Un ejemplo de mostjs (que es una biblioteca de programación reactiva, como RxJS pero más eficiente)

const firstClick = most.fromEvent(''click'', document).take(1); const mousemovesAfterFirstClick = firstClick.map(() => most.fromEvent(''mousemove'', document) .takeUntil(most.of().delay(5000)))

En el ejemplo anterior, correlacionamos los eventos de clic con los eventos de movimiento del mouse. Los patrones de deducción entre eventos se vuelven más fáciles de lograr cuando los eventos están disponibles como una transmisión.

Habiendo dicho eso, con EventEmitter podríamos lograr todo esto mediante la ingeniería de nuestros emisores y oyentes. Necesita más ingeniería porque no está previsto en primer lugar para tales escenarios. Mientras que las corrientes reactivas lo hacen con tanta fluidez porque está destinado a resolver tales problemas.