reactiva que programacion ejemplo javascript observable rxjs

javascript - programacion - ¿Cómo puedo hacer que una secuencia observable RxJS espere a que se complete otra antes de emitir?



que es un observable angular (8)

Digamos que tengo un observable, como tal:

var one = someObservable.take(1); one.subscribe(function(){ /* do something */ });

Entonces, tengo un segundo observable:

var two = someOtherObservable.take(1);

Ahora, quiero suscribirme a two , pero quiero asegurarme de que one haya completado antes de que se despida a los two suscriptores. ¿Qué tipo de método de almacenamiento en búfer puedo usar en two para hacer que el segundo espere a que se complete el primero?

Supongo que estoy buscando hacer una pausa de two hasta que one esté completa.


skipUntil () con last ()

skipUntil: ignora los elementos emitidos hasta que otro observable haya emitido

último: emitir el último valor de una secuencia

Tenga en cuenta que el observable pasado a skipUntil solo tiene que emitir algo para cancelar el salto, razón por la cual necesitamos poner last ().

main$.skipUntil(sequence2$.pipe(last()))

Oficial: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


Aquí hay otra posibilidad aprovechando el selector de resultados de switchMap.

var one$ = someObservable.take(1); var two$ = someOtherObservable.take(1); two$.switchMap( /** Wait for first Observable */ () => one$, /** Only return the value we''re actually interested in */ (value2, value1) => value2 ) .subscribe((value2) => { /* do something */ });


Aquí hay otro, pero me siento más directo e intuitivo (o al menos natural si estás acostumbrado a las promesas), enfoque. Básicamente, creas un Observable usando Observable.create() para envolver one y two como un solo Observable. Esto es muy similar a cómo Promise.all() funcionar Promise.all() .

var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ // observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); });

Entonces, ¿qué está pasando aquí? Primero, creamos un nuevo Observable. La función pasada a Observable.create() , llamada apropiadamente onSubscription , se pasa al observador (construido a partir de los parámetros que usted pasa a subscribe() ), que es similar a resolve y reject combinados en un solo objeto al crear una nueva Promesa. Así es como hacemos que la magia funcione.

En onSubscription , nos suscribimos al primer Observable (en el ejemplo anterior, esto se llamó one ). La forma en que manejamos el next y el error depende de usted, pero el valor predeterminado proporcionado en mi muestra debería ser apropiado en términos generales. Sin embargo, cuando recibimos el evento complete , lo que significa que one ya está hecho, podemos suscribirnos al siguiente Observable; disparando así el segundo observable después de que se complete el primero.

El observador de ejemplo proporcionado para el segundo observable es bastante simple. Básicamente, el second ahora actúa como lo que usted esperaría que two actuaran como en el OP. Más específicamente, second emitirá el primer y solo el primer valor emitido por someOtherObservable (debido a take(1) ) y luego se completará, asumiendo que no hay ningún error.

Ejemplo

Aquí hay un ejemplo completo de trabajo que puede copiar / pegar si quiere ver mi ejemplo trabajando en la vida real:

var someObservable = Observable.from([1, 2, 3, 4, 5]); var someOtherObservable = Observable.from([6, 7, 8, 9]); var first = someObservable.take(1); var second = Observable.create((observer) => { return first.subscribe( function onNext(value) { /* do something with value like: */ observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { someOtherObservable.take(1).subscribe( function onNext(value) { observer.next(value); }, function onError(error) { observer.error(error); }, function onComplete() { observer.complete(); } ); } ); }).subscribe( function onNext(value) { console.log(value); }, function onError(error) { console.error(error); }, function onComplete() { console.log("Done!"); } );

Si observa la consola, se imprimirá el ejemplo anterior:

1

6

¡Hecho!


Aquí hay una forma reutilizable de hacerlo (es mecanografiado pero puedes adaptarlo a js):

export function waitFor<T>(signal: Observable<any>) { return (source: Observable<T>) => new Observable<T>(observer => signal.pipe(first()) .subscribe(_ => source.subscribe(observer) ) ); }

y puedes usarlo como cualquier operador:

var two = someOtherObservable.pipe(waitFor(one), take(1));

Es básicamente un operador que difiere la suscripción en la fuente observable hasta que la señal observable emite el primer evento.


Puede utilizar el resultado emitido de Observable anterior gracias al operador mergeMap (o su alias flatMap ) de esta manera:

const one = Observable.of(''https://api.github.com/users''); const two = (c) => ajax(c);//ajax from Rxjs/dom library one.mergeMap(two).subscribe(c => console.log(c))


Si desea asegurarse de que el orden de ejecución se mantiene, puede usar flatMap como el siguiente ejemplo

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i)); const second = Rx.Observable.of(11).delay(500).do(i => console.log(i)); const third = Rx.Observable.of(111).do(i => console.log(i)); first .flatMap(() => second) .flatMap(() => third) .subscribe(()=> console.log(''finished''));

El resultado sería:

"1" "11" "111" "finished"


Si el segundo observable está caliente , hay otra forma de hacer una pausa / reanudar :

var pauser = new Rx.Subject(); var source1 = Rx.Observable.interval(1000).take(1); /* create source and pause */ var source2 = Rx.Observable.interval(1000).pausable(pauser); source1.doOnCompleted(function () { /* resume paused source2 */ pauser.onNext(true); }).subscribe(function(){ // do something }); source2.subscribe(function(){ // start to recieve data });

También puede usar la versión de búfer pausableBuffered para mantener los datos durante la pausa.


Un par de maneras en las que puedo pensar

//Method one var one = someObservable.take(1); var two = someOtherObservable.take(1); one.concat(two).subscribe(function() {/*do something */}); //Method two, if they need to be separate for some reason var one = someObservable.take(1); var two = someOtherObservable.take(1).publish(); two.subscribe(function(){/*do something */}); one.subscribe(function(){/*do something */}, null, two.connect.bind(two));