rx-java - examples - rxjava observable
Entregar el primer artÃculo inmediatamente, ''rebotar'' siguientes artÃculos (6)
Actualizar:
De los comentarios de @lopar una mejor manera sería:
Observable.from(items).publish(publishedItems -> publishedItems.limit(1).concatWith(publishedItems.skip(1).debounce(1, TimeUnit.SECONDS)))
Algo como esto funcionaría:
String[] items = {"one", "two", "three", "four", "five", "six", "seven", "eight"};
Observable<String> myObservable = Observable.from(items);
Observable.concat(myObservable.first(), myObservable.skip(1).debounce(1, TimeUnit.SECONDS))
.subscribe(s -> System.out.println(s));
Considere el siguiente caso de uso:
- Necesito entregar el primer artículo tan pronto como sea posible.
- Necesidad de rebotar siguientes eventos con 1 segundo tiempo de espera
Terminé implementando un operador personalizado basado en OperatorDebounceWithTime
luego usándolo de esta manera
.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
CustomOperatorDebounceWithTime
entrega el primer elemento inmediatamente y luego utiliza la lógica del operador OperatorDebounceWithTime
para rebotar los elementos posteriores.
¿Hay una manera más fácil de lograr el comportamiento descrito? Vamos a omitir el operador de compose
, no resuelve el problema. Estoy buscando una manera de lograr esto sin implementar operadores personalizados.
La respuesta de LordRaydenMK y lopar tiene un problema: siempre pierdes el segundo elemento. Supongo que nadie se dio cuenta de esto antes porque, si tiene una rebaja, normalmente tiene muchos eventos y el segundo se descarta con la rebota de todos modos. La forma correcta de no perder ningún evento es:
observable
.publish(published ->
published
.limit(1)
.concatWith(published.debounce(1, TimeUnit.SECONDS)));
Y no te preocupes, no vas a tener ningún evento duplicado. Si no está seguro, puede ejecutar este código y comprobarlo usted mismo:
Observable.just(1, 2, 3, 4)
.publish(published ->
published
.limit(1)
.concatWith(published))
.subscribe(System.out::println);
Las respuestas de @LortRaydenMK y @lopar son las mejores, pero quería sugerir otra cosa en caso de que funcionara mejor para usted o para alguien en una situación similar.
Hay una variante de debounce()
que toma una función que decide durante cuánto tiempo se debe rebotar este elemento en particular. Especifica esto devolviendo un observable que se completa después de un período de tiempo. Su función podría devolver empty()
para el primer elemento y timer()
para el resto. Algo como (sin probar):
String[] items = {"one", "two", "three", "four", "five", "six"};
Observable.from(items)
.debounce(item -> item.equals("one")
? Observable.empty()
: Observable.timer(1, TimeUnit.SECONDS));
El truco es que esta función tendría que saber qué elemento es el primero. Tu secuencia puede saber eso. Si no es así, es posible que tenga que zip()
con range()
o algo así. Mejor en ese caso usar la solución en la otra respuesta.
Ngrx - solución rxjs, dividir el tubo en dos
onMyAction$ = this.actions$
.pipe(ofType<any>(ActionTypes.MY_ACTION);
lastTime = new Date();
@Effect()
onMyActionWithAbort$ = this.onMyAction$
.pipe(
filter((data) => {
const result = new Date() - this.lastTime > 200;
this.lastTime = new Date();
return result;
}),
switchMap(this.DoTheJob.bind(this))
);
@Effect()
onMyActionWithDebounce$ = this.onMyAction$
.pipe(
debounceTime(200),
filter(this.preventDuplicateFilter.bind(this)),
switchMap(this.DoTheJob.bind(this))
);
Una solución simple que usa RxJava 2.0, traducida de la respuesta para la misma pregunta para RxJS , que combina throttleFirst y rebote, luego elimina los duplicados.
private <T> ObservableTransformer<T, T> debounceImmediate() {
return observable -> observable.publish(p ->
Observable.merge(p.throttleFirst(1, TimeUnit.SECONDS),
p.debounce(1, TimeUnit.SECONDS)).distinctUntilChanged());
}
@Test
public void testDebounceImmediate() {
Observable.just(0, 100, 200, 1500, 1600, 1800, 2000, 10000)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(LocalDateTime.now() + " T=" + v))
.compose(debounceImmediate())
.blockingSubscribe(v -> System.out.println(LocalDateTime.now() + " Debounced: " + v));
}
El enfoque de usar limit () o take () no parece manejar flujos de datos de larga vida, donde quizás desee observar continuamente, pero aún así actuar de inmediato para el primer evento visto durante un tiempo.
Use la versión de debounce
que toma una función e implemente la función de esta manera:
.debounce(new Func1<String, Observable<String>>() {
private AtomicBoolean isFirstEmission = new AtomicBoolean(true);
@Override
public Observable<String> call(String s) {
// note: standard debounce causes the first item to be
// delayed by 1 second unnecessarily, this is a workaround
if (isFirstEmission.getAndSet(false)) {
return Observable.just(s);
} else {
return Observable.just(s).delay(1, TimeUnit.SECONDS);
}
}
})
El primer elemento se emite inmediatamente. Los ítems subsiguientes se retrasan un segundo. Si un observable demorado no termina antes de que llegue el siguiente artículo, se cancela, por lo que se cumple el comportamiento de rebote esperado.