javascript - read - Node.js Streams vs. Observables
stream javascript (1)
Tanto los observables como los flujos de node.js le permiten resolver el mismo problema subyacente: procesar de forma asíncrona una secuencia de valores. La principal diferencia entre los dos, creo, está relacionada con el contexto que motivó su aparición. Ese contexto se refleja en la terminología y la API.
En el lado de los
Observables
, tiene una extensión para EcmaScript que presenta el modelo de programación reactiva.
Intenta llenar el vacío entre la generación de valor y la asincronía con los conceptos minimalistas y componibles de
Observer
y
Observable
.
En el lado de node.js y
Streams
, deseaba crear una interfaz para el procesamiento asíncrono y de rendimiento de flujos de red y archivos locales.
La terminología se deriva de ese contexto inicial y obtienes
pipe
,
chunk
,
encoding
,
flush
,
Duplex
,
Buffer
, etc. Al tener un enfoque pragmático que proporciona soporte explícito para casos de uso particulares, pierdes algo de capacidad para componer cosas porque no es tan uniforme.
Por ejemplo, usa
push
en una secuencia
Readable
y
write
en un
Writable
aunque, conceptualmente, está haciendo lo mismo: publicar un valor.
Entonces, en la práctica, si observa los conceptos, y si utiliza la opción
{ objectMode: true }
, puede hacer coincidir
Observable
con la secuencia
Readable
y
Observer
con la secuencia
Writable
.
Incluso puede crear algunos adaptadores simples entre los dos modelos.
var Readable = require(''stream'').Readable;
var Writable = require(''stream'').Writable;
var util = require(''util'');
var Observable = function(subscriber) {
this.subscribe = subscriber;
}
var Subscription = function(unsubscribe) {
this.unsubscribe = unsubscribe;
}
Observable.fromReadable = function(readable) {
return new Observable(function(observer) {
function nop() {};
var nextFn = observer.next ? observer.next.bind(observer) : nop;
var returnFn = observer.return ? observer.return.bind(observer) : nop;
var throwFn = observer.throw ? observer.throw.bind(observer) : nop;
readable.on(''data'', nextFn);
readable.on(''end'', returnFn);
readable.on(''error'', throwFn);
return new Subscription(function() {
readable.removeListener(''data'', nextFn);
readable.removeListener(''end'', returnFn);
readable.removeListener(''error'', throwFn);
});
});
}
var Observer = function(handlers) {
function nop() {};
this.next = handlers.next || nop;
this.return = handlers.return || nop;
this.throw = handlers.throw || nop;
}
Observer.fromWritable = function(writable, shouldEnd, throwFn) {
return new Observer({
next: writable.write.bind(writable),
return: shouldEnd ? writable.end.bind(writable) : function() {},
throw: throwFn
});
}
Es posible que haya notado que cambié algunos nombres y usé los conceptos más simples de
Observer
y
Subscription
, presentados aquí, para evitar la sobrecarga de las responsabilidades hechas por
Observables
en
Generator
.
Básicamente, la
Subscription
permite darse de baja del
Observable
.
De todos modos, con el código anterior puedes tener una
pipe
.
Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout));
En comparación con
process.stdin.pipe(process.stdout)
, lo que tiene es una forma de combinar, filtrar y transformar flujos que también funcionan para cualquier otra secuencia de datos.
Puede lograrlo con transmisiones
Readable
, de
Transform
y de
Writable
pero la API favorece la subclasificación en lugar de encadenar
Readable
electrónicos
Readable
y aplicar funciones.
En el modelo
Observable
, por ejemplo, los valores de transformación corresponden a la aplicación de una función de transformador a la corriente.
No requiere un nuevo subtipo de
Transform
.
Observable.just = function(/*... arguments*/) {
var values = arguments;
return new Observable(function(observer) {
[].forEach.call(values, function(value) {
observer.next(value);
});
observer.return();
return new Subscription(function() {});
});
};
Observable.prototype.transform = function(transformer) {
var source = this;
return new Observable(function(observer) {
return source.subscribe({
next: function(v) {
observer.next(transformer(v));
},
return: observer.return.bind(observer),
throw: observer.throw.bind(observer)
});
});
};
Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify)
.subscribe(Observer.fromWritable(process.stdout))
¿La conclusión?
Es fácil introducir el modelo reactivo y el concepto
Observable
cualquier lugar.
Es más difícil implementar una biblioteca completa en torno a ese concepto.
Todas esas pequeñas funciones necesitan trabajar juntas de manera consistente.
Después de todo, el proyecto
ReactiveX
todavía lo está logrando.
Pero si realmente necesita enviar el contenido del archivo al cliente, lidiar con la codificación y comprimirlo, entonces el soporte está allí, en NodeJS, y funciona bastante bien.
Después de aprender sobre Observables , los encuentro bastante similares a las secuencias de Node.js. Ambos tienen un mecanismo de notificación al consumidor cada vez que llegan nuevos datos, se produce un error o no hay más datos (EOF).
Me encantaría aprender sobre las diferencias conceptuales / funcionales entre los dos. ¡Gracias!