javascript - mergemap - switchmap angular 4
¿Cómo funciona RxJS MergeMap? (2)
.mergeMap()
permite aplanar un observable de orden superior en un solo flujo. Por ejemplo:
Rx.Observable.from([1,2,3,4])
.map(i => getFreshApiData())
.subscribe(val => console.log(''regular map result: '' + val));
//vs
Rx.Observable.from([1,2,3,4])
.mergeMap(i => getFreshApiData())
.subscribe(val => console.log(''mergeMap result: '' + val));
function getFreshApiData() {
return Rx.Observable.of(''retrieved new data'')
.delay(1000);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>
Vea mi respuesta en esta otra pregunta para obtener una explicación detallada de los operadores .xxxMap()
: Rxjs: ¿Cómo puedo extraer varios valores dentro de una matriz y devolverlos al flujo observable de forma sincrónica?
No entiendo el propósito de mergeMap
en absoluto. He escuchado dos "explicaciones:
- "Es como SelectAll" en LINQ - no.
- "Bueno, es una combinación de RxJS
merge
andmap
" - no (o no puedo replicar esto).
Considera el siguiente código:
var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);
//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+''a'').subscribe(
next => console.log(next)
)
//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)
La última pieza llamada "Quién sabe qué" no hace más que un mapa en obs1
: ¿qué sentido tiene?
¿Qué hace realmente mergeMap
? ¿Qué es un ejemplo de un caso de uso válido? (Preferiblemente con algún código)
Artículos que no me ayudaron en absoluto (el código mergeMap de arriba es de uno de estos): 1 , 2
tl; dr; mergeMap
es mucho más poderoso que el map
. Comprender mergeMap
es la condición necesaria para acceder a la potencia total de Rx.
similitudes
tanto
mergeMap
como losmap
actúan en una sola secuencia (vs.zip
,combineLatest
)Tanto
mergeMap
como elmap
pueden transformar elementos de un flujo (frente afilter
,delay
)
diferencias
mapa
no se puede cambiar el tamaño de la secuencia de origen (supuesto: el
map
sí no sethrow
); para cada elemento de la fuente se emite exactamente un elementomapped
;map
no puede ignorar elementos (como por ejemplo, elfilter
);en el caso del programador predeterminado, la transformación se realiza de forma síncrona; para ser 100% claros: el flujo de origen puede entregar sus elementos de forma asíncrona, pero cada elemento siguiente se
mapped
inmediatamente y se vuelve a emitir;map
no puede cambiar elementos en el tiempo como, por ejemplo, eldelay
no hay restricciones en los valores de retorno
id
:x => x
mergeMap
puede cambiar el tamaño de la secuencia de origen; para cada elemento puede haber un número arbitrario (0, 1 o muchos) de nuevos elementos creados / emitidos
ofrece un control total sobre la asincronía, tanto cuando se crean / emiten nuevos elementos como la cantidad de elementos del flujo de origen que deben procesarse al mismo tiempo; por ejemplo, supongamos que la secuencia de origen emitió 10 elementos, pero
maxConcurrency
se establece en 2, luego los dos primeros elementos se procesarán de inmediato y el resto 8 se almacenará en búfer; una vez que se haya procesado uno de los elementoscomplete
d, se procesará el siguiente elemento de la secuencia de origen y así sucesivamente - es un poco complicado, pero eche un vistazo al siguiente ejemplotodos los demás operadores se pueden implementar con
mergeMap
y el constructorObservable
Puede ser utilizado para operaciones asíncronas recursivas.
los valores de retorno deben ser del tipo Observable (o Rx debe saber cómo crear observables a partir de ellos, por ejemplo, promesa, matriz)
id
:x => Rx.Observable.of(x)
analogía de matrices
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
La analogía no muestra la imagen completa y básicamente corresponde a .mergeMap
con maxConcurrency
establecido en 1. En tal caso, los elementos se ordenarán como se maxConcurrency
anteriormente, pero en general, no tiene que ser así. La única garantía que tenemos es que la emisión de nuevos elementos se ordenará por su posición en el flujo subyacente. Por ejemplo: [3,1,2,4,9,1]
y [2,3,1,1,9,4]
son válidos, pero [1,1,4,2,3,9]
no lo es ( ya que 4
se emitió después de 2
en el flujo subyacente).
Un par de ejemplos usando mergeMap
:
// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log(''mapWithMergeMap'', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log(''filterWithMergeMap'', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log(''delayWithMergeMap'', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log(''count'', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log(''emitted'', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log(''processed'', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>