switchmap mergemap javascript angular rxjs observable

javascript - mergemap - switchmap angular 4



¿Cómo funciona RxJS MergeMap? (2)

.mergeMap() permite aplanar un observable de orden superior en un solo flujo. Por ejemplo:

Rx.Observable.from([1,2,3,4]) .map(i => getFreshApiData()) .subscribe(val => console.log(''regular map result: '' + val)); //vs Rx.Observable.from([1,2,3,4]) .mergeMap(i => getFreshApiData()) .subscribe(val => console.log(''mergeMap result: '' + val)); function getFreshApiData() { return Rx.Observable.of(''retrieved new data'') .delay(1000); }

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

Vea mi respuesta en esta otra pregunta para obtener una explicación detallada de los operadores .xxxMap() : Rxjs: ¿Cómo puedo extraer varios valores dentro de una matriz y devolverlos al flujo observable de forma sincrónica?

No entiendo el propósito de mergeMap en absoluto. He escuchado dos "explicaciones:

  1. "Es como SelectAll" en LINQ - no.
  2. "Bueno, es una combinación de RxJS merge and map " - no (o no puedo replicar esto).

Considera el siguiente código:

var obs1 = new Rx.Observable.interval(1000); var obs2 = new Rx.Observable.interval(1000); //Just a merge and a map, works fine obs1.merge(obs2).map(x=> x+''a'').subscribe( next => console.log(next) ) //Who know what - seems to do the same thing as a plain map on 1 observable obs1.mergeMap(val => Rx.Observable.of(val + `B`)) .subscribe( next => console.log(next) )

JS Bin

La última pieza llamada "Quién sabe qué" no hace más que un mapa en obs1 : ¿qué sentido tiene?

¿Qué hace realmente mergeMap ? ¿Qué es un ejemplo de un caso de uso válido? (Preferiblemente con algún código)

Artículos que no me ayudaron en absoluto (el código mergeMap de arriba es de uno de estos): 1 , 2


tl; dr; mergeMap es mucho más poderoso que el map . Comprender mergeMap es la condición necesaria para acceder a la potencia total de Rx.

similitudes

  • tanto mergeMap como los map actúan en una sola secuencia (vs. zip , combineLatest )

  • Tanto mergeMap como el map pueden transformar elementos de un flujo (frente a filter , delay )

diferencias

mapa

  • no se puede cambiar el tamaño de la secuencia de origen (supuesto: el map sí no se throw ); para cada elemento de la fuente se emite exactamente un elemento mapped ; map no puede ignorar elementos (como por ejemplo, el filter );

  • en el caso del programador predeterminado, la transformación se realiza de forma síncrona; para ser 100% claros: el flujo de origen puede entregar sus elementos de forma asíncrona, pero cada elemento siguiente se mapped inmediatamente y se vuelve a emitir; map no puede cambiar elementos en el tiempo como, por ejemplo, el delay

  • no hay restricciones en los valores de retorno

  • id : x => x

mergeMap

  • puede cambiar el tamaño de la secuencia de origen; para cada elemento puede haber un número arbitrario (0, 1 o muchos) de nuevos elementos creados / emitidos

  • ofrece un control total sobre la asincronía, tanto cuando se crean / emiten nuevos elementos como la cantidad de elementos del flujo de origen que deben procesarse al mismo tiempo; por ejemplo, supongamos que la secuencia de origen emitió 10 elementos, pero maxConcurrency se establece en 2, luego los dos primeros elementos se procesarán de inmediato y el resto 8 se almacenará en búfer; una vez que se haya procesado uno de los elementos complete d, se procesará el siguiente elemento de la secuencia de origen y así sucesivamente - es un poco complicado, pero eche un vistazo al siguiente ejemplo

  • todos los demás operadores se pueden implementar con mergeMap y el constructor Observable

  • Puede ser utilizado para operaciones asíncronas recursivas.

  • los valores de retorno deben ser del tipo Observable (o Rx debe saber cómo crear observables a partir de ellos, por ejemplo, promesa, matriz)

  • id : x => Rx.Observable.of(x)

analogía de matrices

let array = [1,2,3] fn map mergeMap x => x*x [1,4,9] error /*expects array as return value*/ x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]

La analogía no muestra la imagen completa y básicamente corresponde a .mergeMap con maxConcurrency establecido en 1. En tal caso, los elementos se ordenarán como se maxConcurrency anteriormente, pero en general, no tiene que ser así. La única garantía que tenemos es que la emisión de nuevos elementos se ordenará por su posición en el flujo subyacente. Por ejemplo: [3,1,2,4,9,1] y [2,3,1,1,9,4] son válidos, pero [1,1,4,2,3,9] no lo es ( ya que 4 se emitió después de 2 en el flujo subyacente).

Un par de ejemplos usando mergeMap :

// implement .map with .mergeMap Rx.Observable.prototype.mapWithMergeMap = function(mapFn) { return this.mergeMap(x => Rx.Observable.of(mapFn(x))); } Rx.Observable.range(1, 3) .mapWithMergeMap(x => x * x) .subscribe(x => console.log(''mapWithMergeMap'', x)) // implement .filter with .mergeMap Rx.Observable.prototype.filterWithMergeMap = function(filterFn) { return this.mergeMap(x => filterFn(x) ? Rx.Observable.of(x) : Rx.Observable.empty()); // return no element } Rx.Observable.range(1, 3) .filterWithMergeMap(x => x === 3) .subscribe(x => console.log(''filterWithMergeMap'', x)) // implement .delay with .mergeMap Rx.Observable.prototype.delayWithMergeMap = function(delayMs) { return this.mergeMap(x => Rx.Observable.create(obs => { // setTimeout is naive - one should use scheduler instead const token = setTimeout(() => { obs.next(x); obs.complete(); }, delayMs) return () => clearTimeout(token); })) } Rx.Observable.range(1, 3) .delayWithMergeMap(500) .take(2) .subscribe(x => console.log(''delayWithMergeMap'', x)) // recursive count const count = (from, to, interval) => { if (from > to) return Rx.Observable.empty(); return Rx.Observable.timer(interval) .mergeMap(() => count(from + 1, to, interval) .startWith(from)) } count(1, 3, 1000).subscribe(x => console.log(''count'', x)) // just an example of bit different implementation with no returns const countMoreRxWay = (from, to, interval) => Rx.Observable.if( () => from > to, Rx.Observable.empty(), Rx.Observable.timer(interval) .mergeMap(() => countMoreRxWay(from + 1, to, interval) .startWith(from))) const maxConcurrencyExample = () => Rx.Observable.range(1,7) .do(x => console.log(''emitted'', x)) .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2) .do(x => console.log(''processed'', x)) .subscribe() setTimeout(maxConcurrencyExample, 3100)

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>