single - RxJava retraso para cada elemento de la lista emitida
rxjava tutorial (15)
Creo que es exactamente lo que necesitas. Echale un vistazo:
long startTime = System.currentTimeMillis();
Observable.intervalRange(1, 5, 0, 50, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.subscribe(emitTime -> {
System.out.println(emitTime.time() - startTime);
});
Estoy luchando por implementar algo que supuse que sería bastante simple en Rx.
Tengo una lista de elementos, y quiero que cada elemento se emita con retraso.
Parece que el operador Rx delay () simplemente cambia la emisión de todos los elementos por el retraso especificado, no por cada elemento individual.
Aquí hay un código de prueba. Agrupa elementos en una lista. Cada grupo debería tener un retraso aplicado antes de ser emitido.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
El resultado es:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
Pero lo que esperaría ver es algo como esto:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
¿Qué estoy haciendo mal?
Creo que quieres esto:
Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
De esta forma, retrasará los números que ingresan al grupo en lugar de retrasar la lista reducida en 5 segundos.
Hay otra forma de hacerlo usando concatMap ya que concatMap devuelve los elementos de origen observables. entonces podemos agregar demora en ese observable.
Aquí lo que he intentado.
Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
La forma más sencilla de hacer esto parece ser simplemente usar concatMap y envolver cada elemento en un Obserable demorado.
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Huellas dactilares:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
Para introducir el retraso entre cada elemento emitido es útil:
List<String> letters = new ArrayList<>(Arrays.asList("a", "b", "c", "d"));
Observable.fromIterable(letters)
.concatMap(item -> Observable.interval(1, TimeUnit.SECONDS)
.take(1)
.map(second -> item))
.subscribe(System.out::println);
Más buenas opciones en https://github.com/ReactiveX/RxJava/issues/3505
Para los usuarios de kotlin, escribí una función de extensión para el enfoque ''zip con intervalo''
import io.reactivex.Observable
import io.reactivex.functions.BiFunction
import java.util.concurrent.TimeUnit
fun <T> Observable<T>.delayEach(interval: Long, timeUnit: TimeUnit): Observable<T> =
Observable.zip(
this,
Observable.interval(interval, timeUnit),
BiFunction { item, _ -> item }
)
Funciona de la misma manera, pero esto lo hace reutilizable. Ejemplo:
Observable.range(1, 5)
.delayEach(1, TimeUnit.SECONDS)
Para retrasar cada grupo, puede cambiar su
flatMap()
para devolver un Observable que retrase la emisión del grupo.
Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Puede agregar un retraso entre los elementos emitidos utilizando flatMap, maxConcurrent y delay ()
Aquí hay un ejemplo: emitir 0..4 con retraso
@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5
val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0
Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})
latch.await()
assertTrue { endMoment - startMoment >= DELAY * COUNT }
}
... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547
Puede implementar un
operador rx personalizado
como
MinRegularIntervalDelayOperator
y luego usarlo con la función de
lift
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Puedes usar
Observable.interval(1, TimeUnit.SECONDS)
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return aLong.intValue() + 1;
}
})
.startWith(0)
.take(listInput.size())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer index) throws Exception {
Log.d(TAG, "---index of your list --" + index);
}
});
Este código anterior no duplica el valor (índice). "Estoy seguro"
Simplemente compartiendo un enfoque simple para emitir cada elemento en una colección con un intervalo:
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
Cada artículo se emitirá cada 500 milisegundos
Una forma de hacerlo es usar
zip
para combinar su observable con un
Interval
observable para retrasar la salida.
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
Una forma no tan limpia es hacer que el retraso cambie con la iteración usando el operador .delay (Func1).
Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
deberías poder lograr esto usando el operador
Timer
.
Lo intenté con
delay
pero no pude lograr el resultado deseado.
Observe las operaciones anidadas realizadas en el operador de
flatmap
.
Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));
Observable.just("A", "B", "C", "D", "E", "F")
.flatMap { item -> Thread.sleep(2000)
Observable.just( item ) }
.subscribe { println( it ) }