tutorial single rxjava2 rxjava examples example create java rx-java delay

single - RxJava retraso para cada elemento de la lista emitida



rxjava tutorial (15)

Creo que es exactamente lo que necesitas. Echale un vistazo:

long startTime = System.currentTimeMillis(); Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS) .timestamp(TimeUnit.MILLISECONDS) .subscribe(emitTime -> { System.out.println(emitTime.time() - startTime); });

Estoy luchando por implementar algo que supuse que sería bastante simple en Rx.

Tengo una lista de elementos, y quiero que cada elemento se emita con retraso.

Parece que el operador Rx delay () simplemente cambia la emisión de todos los elementos por el retraso especificado, no por cada elemento individual.

Aquí hay un código de prueba. Agrupa elementos en una lista. Cada grupo debería tener un retraso aplicado antes de ser emitido.

Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .delay(50, TimeUnit.MILLISECONDS) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();

El resultado es:

154ms [5] 155ms [2] 155ms [1] 155ms [3] 155ms [4]

Pero lo que esperaría ver es algo como esto:

174ms [5] 230ms [2] 285ms [1] 345ms [3] 399ms [4]

¿Qué estoy haciendo mal?


Creo que quieres esto:

Observable.range(1, 5) .delay(50, TimeUnit.MILLISECONDS) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();

De esta forma, retrasará los números que ingresan al grupo en lugar de retrasar la lista reducida en 5 segundos.


Hay otra forma de hacerlo usando concatMap ya que concatMap devuelve los elementos de origen observables. entonces podemos agregar demora en ese observable.

Aquí lo que he intentado.

Observable.range(1, 5) .groupBy(n -> n % 5) .concatMap(integerIntegerGroupedObservable -> integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS)) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();


La forma más sencilla de hacer esto parece ser simplemente usar concatMap y envolver cada elemento en un Obserable demorado.

long startTime = System.currentTimeMillis(); Observable.range(1, 5) .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS)) .doOnNext(i-> System.out.println( "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms")) .toCompletable().await();

Huellas dactilares:

Item: 1, Time: 51ms Item: 2, Time: 101ms Item: 3, Time: 151ms Item: 4, Time: 202ms Item: 5, Time: 252ms


Para introducir el retraso entre cada elemento emitido es útil:

List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d")); Observable.fromIterable(letters) .concatMap(item -> Observable.interval(1, TimeUnit.SECONDS) .take(1) .map(second -> item)) .subscribe(System.out::println);

Más buenas opciones en https://github.com/ReactiveX/RxJava/issues/3505


Para los usuarios de kotlin, escribí una función de extensión para el enfoque ''zip con intervalo''

import io.reactivex.Observable import io.reactivex.functions.BiFunction import java.util.concurrent.TimeUnit fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> = Observable.zip( this, Observable.interval(interval, timeUnit), BiFunction { item, _ -> item } )

Funciona de la misma manera, pero esto lo hace reutilizable. Ejemplo:

Observable.range(1, 5) .delayEach(1, TimeUnit.SECONDS)


Para retrasar cada grupo, puede cambiar su flatMap() para devolver un Observable que retrase la emisión del grupo.

Observable .range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> Observable .timer(50, TimeUnit.MILLISECONDS) .flatMap(t -> g.toList()) ) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();


Puede agregar un retraso entre los elementos emitidos utilizando flatMap, maxConcurrent y delay ()

Aquí hay un ejemplo: emitir 0..4 con retraso

@Test fun testEmitWithDelays() { val DELAY = 500L val COUNT = 5 val latch = CountDownLatch(1) val startMoment = System.currentTimeMillis() var endMoment : Long = 0 Observable .range(0, COUNT) .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1 .subscribe( { println("... value: $it, ${System.currentTimeMillis() - startMoment}") }, {}, { endMoment = System.currentTimeMillis() latch.countDown() }) latch.await() assertTrue { endMoment - startMoment >= DELAY * COUNT } } ... value: 0, 540 ... value: 1, 1042 ... value: 2, 1544 ... value: 3, 2045 ... value: 4, 2547


Puede implementar un operador rx personalizado como MinRegularIntervalDelayOperator y luego usarlo con la función de lift

Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .lift(new MinRegularIntervalDelayOperator<Integer>(50L)) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();


Puedes usar

Observable.interval(1, TimeUnit.SECONDS) .map(new Function<Long, Integer>() { @Override public Integer apply(Long aLong) throws Exception { return aLong.intValue() + 1; } }) .startWith(0) .take(listInput.size()) .subscribe(new Consumer<Integer>() { @Override public void accept(Integer index) throws Exception { Log.d(TAG, "---index of your list --" + index); } });

Este código anterior no duplica el valor (índice). "Estoy seguro"


Simplemente compartiendo un enfoque simple para emitir cada elemento en una colección con un intervalo:

Observable.just(1,2,3,4,5) .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item) .subscribe(System.out::println);

Cada artículo se emitirá cada 500 milisegundos


Una forma de hacerlo es usar zip para combinar su observable con un Interval observable para retrasar la salida.

Observable.zip(Observable.range(1, 5) .groupBy(n -> n % 5) .flatMap(g -> g.toList()), Observable.interval(50, TimeUnit.MILLISECONDS), (obs, timer) -> obs) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();


Una forma no tan limpia es hacer que el retraso cambie con la iteración usando el operador .delay (Func1).

Observable.range(1, 5) .delay(n -> n*50) .groupBy(n -> n % 5) .flatMap(g -> g.toList()) .doOnNext(item -> { System.out.println(System.currentTimeMillis() - timeNow); System.out.println(item); System.out.println(" "); }).toList().toBlocking().first();


deberías poder lograr esto usando el operador Timer . Lo intenté con delay pero no pude lograr el resultado deseado. Observe las operaciones anidadas realizadas en el operador de flatmap .

Observable.range(1,5) .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS) .map(y -> x)) // attach timestamp .timestamp() .subscribe(timedIntegers -> Log.i(TAG, "Timed String: " + timedIntegers.value() + " " + timedIntegers.time()));


Observable.just("A", "B", "C", "D", "E", "F") .flatMap { item -> Thread.sleep(2000) Observable.just( item ) } .subscribe { println( it ) }