single rxjava2 just generate examples create array rx-java rx-android

rx-java - rxjava2 - rxjs observable just



Cómo detener y reanudar Observable.interval emitiendo ticks (5)

Aquí hay otra forma de hacer esto, creo.
Cuando verifique el código fuente, encontrará intervalo () utilizando la clase OnSubscribeTimerPeriodically . El código clave a continuación.

@Override public void call(final Subscriber<? super Long> child) { final Worker worker = scheduler.createWorker(); child.add(worker); worker.schedulePeriodically(new Action0() { long counter; @Override public void call() { try { child.onNext(counter++); } catch (Throwable e) { try { worker.unsubscribe(); } finally { Exceptions.throwOrReport(e, child); } } } }, initialDelay, period, unit); }

Entonces, verás, si quieres canalizar el bucle, ¿qué hay de lanzar una nueva excepción en onNext () ? Código de ejemplo a continuación.

Observable.interval(1000, TimeUnit.MILLISECONDS) .subscribe(new Action1<Long>() { @Override public void call(Long aLong) { Log.i("abc", "onNext"); if (aLong == 5) throw new NullPointerException(); } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { Log.i("abc", "onError"); } }, new Action0() { @Override public void call() { Log.i("abc", "onCompleted"); } });

Entonces verás esto:

08-08 11:10:46.008 28146-28181/net.bingyan.test I/abc: onNext 08-08 11:10:47.008 28146-28181/net.bingyan.test I/abc: onNext 08-08 11:10:48.008 28146-28181/net.bingyan.test I/abc: onNext 08-08 11:10:49.008 28146-28181/net.bingyan.test I/abc: onNext 08-08 11:10:50.008 28146-28181/net.bingyan.test I/abc: onNext 08-08 11:10:51.008 28146-28181/net.bingyan.test I/abc: onNext 08-08 11:10:51.018 28146-28181/net.bingyan.test I/abc: onError

Esto emitirá una garrapata cada 5 segundos.

Observable.interval(5, TimeUnit.SECONDS, Schedulers.io()) .subscribe(tick -> Log.d(TAG, "tick = "+tick));

Para detenerlo puedes usar

Schedulers.shutdown();

Pero luego todos los programadores se detienen y no es posible reanudar el tictac más tarde. ¿Cómo puedo detener y reanudar la emisión "con gracia"?


Aquí hay una posible solución:

class TickHandler { private AtomicLong lastTick = new AtomicLong(0L); private Subscription subscription; void resume() { System.out.println("resumed"); subscription = Observable.interval(5, TimeUnit.SECONDS, Schedulers.io()) .map(tick -> lastTick.getAndIncrement()) .subscribe(tick -> System.out.println("tick = " + tick)); } void stop() { if (subscription != null && !subscription.isUnsubscribed()) { System.out.println("stopped"); subscription.unsubscribe(); } } }


Hace algún tiempo, también estaba buscando soluciones tipo "temporizador" de RX, pero ninguna de ellas cumplió con mis expectativas. Así que ahí puedes encontrar mi propia solución:

AtomicLong elapsedTime = new AtomicLong(); AtomicBoolean resumed = new AtomicBoolean(); AtomicBoolean stopped = new AtomicBoolean(); public Flowable<Long> startTimer() { //Create and starts timper resumed.set(true); stopped.set(false); return Flowable.interval(1, TimeUnit.SECONDS) .takeWhile(tick -> !stopped.get()) .filter(tick -> resumed.get()) .map(tick -> elapsedTime.addAndGet(1000)); } public void pauseTimer() { resumed.set(false); } public void resumeTimer() { resumed.set(true); } public void stopTimer() { stopped.set(true); } public void addToTimer(int seconds) { elapsedTime.addAndGet(seconds * 1000); }


Lo siento, esto es en RxJS en lugar de RxJava, pero el concepto será el mismo. learn-rxjs.io esto de learn-rxjs.io y aquí está en codepen .

La idea es que comiences con dos secuencias de eventos de clic, startClick$ y stopClick$ . Cada clic que se produce en la stopClick$ se asigna a un observable vacío, y los clics en startClick$ se asignan a la secuencia interval$ . Las dos corrientes resultantes se merge -d juntas en un observable-de-observables. En otras palabras, cada vez que se haga clic, se emitirá un nuevo observable de uno de los dos tipos. El observable resultante pasará por el switchMap , que comienza a escuchar este nuevo observable y deja de escuchar lo que estaba escuchando antes. Switchmap también comenzará a fusionar los valores de este nuevo observable en su flujo existente.

Después del cambio, la scan solo ve el valor de "incremento" emitido por el interval$ , y no ve ningún valor cuando se ha hecho clic en "detener".

Y hasta que se produzca el primer clic, startWith comenzará a emitir valores desde $interval , solo para poner las cosas en marcha:

const start = 0; const increment = 1; const delay = 1000; const stopButton = document.getElementById(''stop''); const startButton = document.getElementById(''start''); const startClick$ = Rx.Observable.fromEvent(startButton, ''click''); const stopClick$ = Rx.Observable.fromEvent(stopButton, ''click''); const interval$ = Rx.Observable.interval(delay).mapTo(increment); const setCounter = newValue => document.getElementById("counter").innerHTML = newValue; setCounter(start); const timer$ = Rx.Observable // a "stop" click will emit an empty observable, // and a "start" click will emit the interval$ observable. // These two streams are merged into one observable. .merge(stopClick$.mapTo(Rx.Observable.empty()), startClick$.mapTo(interval$)) // until the first click occurs, merge will emit nothing, so // use the interval$ to start the counter in the meantime .startWith(interval$) // whenever a new observable starts, stop listening to the previous // one and start emitting values from the new one .switchMap(val => val) // add the increment emitted by the interval$ stream to the accumulator .scan((acc, curr) => curr + acc, start) // start the observable and send results to the DIV .subscribe((x) => setCounter(x));

Y aquí está el HTML.

<html> <body> <div id="counter"></div> <button id="start"> Start </button> <button id="stop"> Stop </button> </body> </html>


val switch = new java.util.concurrent.atomic.AtomicBoolean(true) val tick = new java.util.concurrent.atomic.AtomicLong(0L) val suspendableObservable = Observable. interval(5 seconds). takeWhile(_ => switch.get()). repeat. map(_ => tick.incrementAndGet())

Puede establecer switch a false para suspender el tictac y true para reanudarlo.