tutorial nodejs javascript rxjs

javascript - nodejs - RXJS Espere a que todos los observables en una matriz se completen(o se produzcan errores)



rxjs nodejs (3)

Para mí esta sample fue la mejor solución.

const source = Observable.interval(500); const example = source.sample(Observable.interval(2000)); const subscribe = example.subscribe(val => console.log(''sample'', val));

Entonces ... solo cuando se emite el segundo (ejemplo): verá el último valor emitido del primero (fuente).

En mi tarea, espero la validación de formularios y otro evento DOM.

Estoy empujando los observables en una matriz como tal ...

var tasks$ = []; tasks$.push(Observable.timer(1000)); tasks$.push(Observable.timer(3000)); tasks$.push(Observable.timer(10000));

Quiero un observable que se emita cuando todas las tareas $ se han completado. Tenga en cuenta que, en la práctica, las tareas $ no tienen un número conocido de observables.

He intentado Observable.zip(tasks$).subscribe() pero esto parece fallar en el caso de que haya solo 1 tarea, y me está llevando a creer que ZIP requiere un número par de elementos para funcionar de la manera correcta. Yo esperaría.

He intentado Observable.concat(tasks$).subscribe() pero el resultado del operador de concat parece ser una serie de observables ... por ejemplo, básicamente el mismo que la entrada. Ni siquiera se puede llamar a suscribirse en él.

en C # esto sería similar a Task.WhenAll (). en la promesa ES6 sería similar a Promise.all ().

Me he encontrado con una serie de preguntas de SO, pero todas parecen lidiar con esperar en un número conocido de transmisiones (por ejemplo, mapearlas juntas).


Si desea componer un observable que se emita cuando se forkJoin completado todos los observables de origen, puede usar forkJoin :

import { Observable } from ''rxjs/Observable''; import ''rxjs/add/observable/forkJoin''; import ''rxjs/add/operator/first''; var tasks$ = []; tasks$.push(Observable.timer(1000).first()); tasks$.push(Observable.timer(3000).first()); tasks$.push(Observable.timer(10000).first()); Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });


Usted puede hacer uso de zip .

Combina múltiples observables para crear un observable cuyos valores se calculan a partir de los valores, en orden, de cada uno de sus observables de entrada.

const obsvA = this._service.getObjA(); const obsvB = this._service.getObjB(); // or with array // const obsvArray = [obsvA, obsvB]; const zip = Observable.zip(obsvA, obsvB); // or with array // const zip = Observable.zip(...obsvArray); zip.subscribe( result => console.log(result), // result is an array with the responses [respA, respB] );

Cosas para considerar:

  • No necesita ser un número par de observables.
  • zip visualmente
  • Como se dijo here ,

    El operador zip se suscribirá a todos los observables internos, esperando que cada uno emita un valor. Una vez que esto ocurra, se emitirán todos los valores con el índice correspondiente. Esto continuará hasta que al menos un observable interno se complete.

  • Cuando uno de los observables emite un error (o ambos), la suscripción se cierra (se llama onComplete al completar), y con un método onError , solo se obtiene el primer error.
  • zip.subscribe( result => console.log(result), // result is an array with the responses [respA, respB] error => console.log(error), // will return the error message of the first observable that throws error and then finish it () => console.log (''completed after first error or if first observable finishes) );