side read nodejs node example javascript node.js reactive-programming rxjs bacon.js

javascript - read - Node.js Streams vs. Observables



stream javascript (1)

Tanto los observables como los flujos de node.js le permiten resolver el mismo problema subyacente: procesar de forma asíncrona una secuencia de valores. La principal diferencia entre los dos, creo, está relacionada con el contexto que motivó su aparición. Ese contexto se refleja en la terminología y la API.

En el lado de los Observables , tiene una extensión para EcmaScript que presenta el modelo de programación reactiva. Intenta llenar el vacío entre la generación de valor y la asincronía con los conceptos minimalistas y componibles de Observer y Observable .

En el lado de node.js y Streams , deseaba crear una interfaz para el procesamiento asíncrono y de rendimiento de flujos de red y archivos locales. La terminología se deriva de ese contexto inicial y obtienes pipe , chunk , encoding , flush , Duplex , Buffer , etc. Al tener un enfoque pragmático que proporciona soporte explícito para casos de uso particulares, pierdes algo de capacidad para componer cosas porque no es tan uniforme. Por ejemplo, usa push en una secuencia Readable y write en un Writable aunque, conceptualmente, está haciendo lo mismo: publicar un valor.

Entonces, en la práctica, si observa los conceptos, y si utiliza la opción { objectMode: true } , puede hacer coincidir Observable con la secuencia Readable y Observer con la secuencia Writable . Incluso puede crear algunos adaptadores simples entre los dos modelos.

var Readable = require(''stream'').Readable; var Writable = require(''stream'').Writable; var util = require(''util''); var Observable = function(subscriber) { this.subscribe = subscriber; } var Subscription = function(unsubscribe) { this.unsubscribe = unsubscribe; } Observable.fromReadable = function(readable) { return new Observable(function(observer) { function nop() {}; var nextFn = observer.next ? observer.next.bind(observer) : nop; var returnFn = observer.return ? observer.return.bind(observer) : nop; var throwFn = observer.throw ? observer.throw.bind(observer) : nop; readable.on(''data'', nextFn); readable.on(''end'', returnFn); readable.on(''error'', throwFn); return new Subscription(function() { readable.removeListener(''data'', nextFn); readable.removeListener(''end'', returnFn); readable.removeListener(''error'', throwFn); }); }); } var Observer = function(handlers) { function nop() {}; this.next = handlers.next || nop; this.return = handlers.return || nop; this.throw = handlers.throw || nop; } Observer.fromWritable = function(writable, shouldEnd, throwFn) { return new Observer({ next: writable.write.bind(writable), return: shouldEnd ? writable.end.bind(writable) : function() {}, throw: throwFn }); }

Es posible que haya notado que cambié algunos nombres y usé los conceptos más simples de Observer y Subscription , presentados aquí, para evitar la sobrecarga de las responsabilidades hechas por Observables en Generator . Básicamente, la Subscription permite darse de baja del Observable . De todos modos, con el código anterior puedes tener una pipe .

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));

En comparación con process.stdin.pipe(process.stdout) , lo que tiene es una forma de combinar, filtrar y transformar flujos que también funcionan para cualquier otra secuencia de datos. Puede lograrlo con transmisiones Readable , de Transform y de Writable pero la API favorece la subclasificación en lugar de encadenar Readable electrónicos Readable y aplicar funciones. En el modelo Observable , por ejemplo, los valores de transformación corresponden a la aplicación de una función de transformador a la corriente. No requiere un nuevo subtipo de Transform .

Observable.just = function(/*... arguments*/) { var values = arguments; return new Observable(function(observer) { [].forEach.call(values, function(value) { observer.next(value); }); observer.return(); return new Subscription(function() {}); }); }; Observable.prototype.transform = function(transformer) { var source = this; return new Observable(function(observer) { return source.subscribe({ next: function(v) { observer.next(transformer(v)); }, return: observer.return.bind(observer), throw: observer.throw.bind(observer) }); }); }; Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify) .subscribe(Observer.fromWritable(process.stdout))

¿La conclusión? Es fácil introducir el modelo reactivo y el concepto Observable cualquier lugar. Es más difícil implementar una biblioteca completa en torno a ese concepto. Todas esas pequeñas funciones necesitan trabajar juntas de manera consistente. Después de todo, el proyecto ReactiveX todavía lo está logrando. Pero si realmente necesita enviar el contenido del archivo al cliente, lidiar con la codificación y comprimirlo, entonces el soporte está allí, en NodeJS, y funciona bastante bien.

Después de aprender sobre Observables , los encuentro bastante similares a las secuencias de Node.js. Ambos tienen un mecanismo de notificación al consumidor cada vez que llegan nuevos datos, se produce un error o no hay más datos (EOF).

Me encantaría aprender sobre las diferencias conceptuales / funcionales entre los dos. ¡Gracias!