javascript - compat - rxjs pipe
¿Cómo funciona el operador RxJs 5 share()? (2)
No es 100% claro para mí cómo funciona el operador RxJs 5 share()
, consulte aquí los últimos documentos . Jsbin para la pregunta here .
Si creo un observable con una serie de 0 a 2, cada valor se separa por un segundo:
var source = Rx.Observable.interval(1000)
.take(5)
.do(function (x) {
console.log(''some side effect'');
});
Y si creo dos suscriptores a este observable:
source.subscribe((n) => console.log("subscriptor 1 = " + n));
source.subscribe((n) => console.log("subscriptor 2 = " + n));
Consigo esto en la consola:
"some side effect ..."
"subscriptor 1 = 0"
"some side effect ..."
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"some side effect ..."
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"some side effect ..."
"subscriptor 2 = 2"
Pensé que cada suscripción se suscribiría al mismo Observable, ¡pero no parece ser el caso! ¡Es como el acto de suscribirse crea un observable completamente separado!
Pero si el operador share()
se agrega a la fuente observable:
var source = Rx.Observable.interval(1000)
.take(3)
.do(function (x) {
console.log(''some side effect ...'');
})
.share();
Entonces conseguimos esto:
"some side effect ..."
"subscriptor 1 = 0"
"subscriptor 2 = 0"
"some side effect ..."
"subscriptor 1 = 1"
"subscriptor 2 = 1"
"some side effect ..."
"subscriptor 1 = 2"
"subscriptor 2 = 2"
Que es lo que esperaría sin el share()
.
¿Qué está pasando aquí, cómo funciona el operador share()
? ¿Cada suscripción crea una nueva cadena observable?
Tenga cuidado de utilizar RxJS v5 mientras que su enlace de documentación parece ser RxJS v4. No recuerdo detalles específicos, pero creo que el operador de share
algunos cambios, en particular cuando se trata de completar y volver a suscribirse, pero no confíe en mi palabra.
De vuelta a su pregunta, como ha demostrado en su estudio, sus expectativas no se corresponden con el diseño de la biblioteca. Los observables ejemplifican perezosamente su flujo de datos, iniciando concretamente el flujo de datos cuando un suscriptor se suscribe. Cuando un segundo suscriptor se suscribe al mismo observable, se inicia otro nuevo flujo de datos como si fuera el primer suscriptor (así que sí, cada suscripción crea una nueva cadena de observables como usted dijo). Esto es lo que se acuñó en la terminología de RxJS como un observable frío y ese es el comportamiento predeterminado para RxJS observable. Si desea un observable que envíe sus datos a los suscriptores que tiene en el momento en que los recibe, esto se considera un observable y una forma de obtener un observable es usar el operador de share
.
Puede encontrar flujos ilustrados de suscripción y datos aquí: Observables en caliente y en frío: ¿hay operadores ''en caliente'' y ''en frío''? (Esto es válido para RxJS v4, pero la mayoría es válido para v5).
compartir hace que el observable "caliente" si se cumplen estas 2 condiciones:
- el número de suscriptores> 0
- Y lo observable no se ha completado.
Escenario 1: número de suscriptores> 0 y observable no se completa antes de una nueva suscripción
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log(''observer1'')),
observer2;
setTimeout(()=>{
observer2 = shared.subscribe(log(''observer2''));
}, 3000);
// emission for both observer 1 and observer 2, with the samve value at startTime + 5 seconds
// another emission for both observers at: startTime + 10 seconds
Escenario 2: el número de suscriptores es cero antes de una nueva suscripción. Se convierte en "frío"
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log(''observer1'')),
observer2;
setTimeout(()=>{
observer1.unsubscribe();
}, 1000);
setTimeout(()=>{
observer2 = shared.subscribe(log(''observer2'')); // number of subscribers is 0 at this time
}, 3000);
// observer2''s onNext is called at startTime + 8 seconds
// observer2''s onNext is called at startTime + 13 seconds
Escenario 3: cuando observable se completó antes de una nueva suscripción. Se convierte en "frío"
var shared = rx.Observable.interval(5000).take(2).share();
var startTime = Date.now();
var log = (x) => (value) => {
console.log(`onNext for ${x}, Delay: ${Date.now() - startTime} , Value: ${value}`);
};
var observer1 = shared.subscribe(log(''observer1'')),
observer2;
setTimeout(()=>{
observer2 = shared.subscribe(log(''observer2''));
}, 12000);
// 2 emission for observable 1, at startTime + 5 secs, and at startTime + 10secs
// 2 emissions for observable 2,at startTime + 12 + 5 secs, and at startTime + 12 + 10secs