javascript - ejemplo - Separe los valores observables por tiempo específico en RxJS
rxjs en 30 dias (8)
Basándose en las soluciones zip de farincz y user3587412, esta es la forma en que funciona en RxJS v6
const { zip, from, timer } = require("rxjs")
const { map } = require("rxjs/operators")
const input = [1, 2, 3, 4, 5]
const delay = 2000
zip(
from(input),
timer(0, delay)
).pipe(
map(([ delayedInput, _timer ]) => delayedInput) // throw away timer index
).subscribe(
console.log
)
¿Cuál sería la forma más idiomática de obtener los valores de un Observable por un período de tiempo específico? Por ejemplo, digamos que tengo un Observable creado a partir de una gran matriz y quiero generar un valor cada 2 segundos. ¿Es una combinación de interval
y selectMany
la mejor manera?
Creo que el uso de zip produce un código mejor y más legible, aún usando solo 3 observables.
var items = [''A'', ''B'', ''C''];
Rx.Observable.zip(
Rx.Observable.fromArray(items),
Rx.Observable.timer(2000, 2000),
function(item, i) { return item;}
)
De acuerdo que zip es un enfoque limpio. Aquí hay una función reutilizable para generar un flujo de intervalo para una matriz:
function yieldByInterval(items, time) {
return Rx.Observable.from(items).zip(
Rx.Observable.interval(time),
function(item, index) { return item; }
);
}
// test
yieldByInterval([''A'', ''B'', ''C''], 2000)
.subscribe(console.log.bind(console));
Esto se basa en la respuesta de farincz , pero es un poco más corto utilizando .zip
como método de instancia.
Además, utilicé Rx.Observable.from()
porque Rx.Observable.fromArray()
está en deprecated .
Para RxJS 5:
Rx.Observable.from([1, 2, 3, 4, 5])
.zip(Rx.Observable.timer(0, 2000), x => x)
.subscribe(x => console.log(x));
Para RxJS v6 obtener el siguiente con un retraso de 2 segundos.
Ejemplo 1. concatMap:
import {of} from ''rxjs'';
import {concatMap, delay} from ''rxjs/operators'';
of(1, 2, 3, 4, 5)
.pipe(
concatMap(x => of(x)
.pipe(
delay(2000))
)
)
.subscribe({
next(value) {
console.log(value);
}
});
Ejemplo 2. mapa + concatAll:
import {of} from ''rxjs'';
import {concatAll, delay, map} from ''rxjs/operators'';
of(1, 2, 3, 4, 5)
.pipe(
map(x => of(x)
.pipe(
delay(2000))
),
concatAll()
)
.subscribe({
next(value) {
console.log(value);
}
});
Para su ejemplo específico, la idea es asignar cada valor de la matriz a un observable que produzca su resultado después de un retraso, y luego concatenar el flujo de observables resultante:
var delayedStream = Rx.Observable
.fromArray([1, 2, 3, 4, 5])
.map(function (value) { return Rx.Observable.return(value).delay(2000); })
.concatAll();
Otros ejemplos pueden hacer uso de timer
o interval
. Solo depende
Por ejemplo, si su matriz es realmente grande, entonces lo anterior causará una buena cantidad de presión en la memoria (porque está creando N
observables para una N
realmente grande). Aquí hay una alternativa que usa el interval
para caminar perezosamente por la matriz:
var delayedStream = Rx.Observable
.interval(2000)
.take(reallyBigArray.length) // end the observable after it pulses N times
.map(function (i) { return reallyBigArray[i]; });
Este proporcionará el siguiente valor de la matriz cada 2 segundos hasta que haya iterado en toda la matriz.
Si bien la respuesta de Brandon comprende la esencia de la idea, aquí hay una versión que produce el primer elemento inmediatamente y luego pone tiempo entre los siguientes elementos.
var delay = Rx.Observable.empty().delay(2000);
var items = Rx.Observable.fromArray([1,2,3,4,5])
.map(function (x) {
return Rx.Observable.return(x).concat(delay); // put some time after the item
})
.concatAll();
Actualizado para los nuevos RxJS:
var delay = Rx.Observable.empty().delay(2000);
var items = Rx.Observable.fromArray([1,2,3,4,5])
.concatMap(function (x) {
return Rx.Observable.of(x).concat(delay); // put some time after the item
});