android rx-java observable

android - RXJava-hace un observable pausable(con búfer y ventana por ejemplo)



rx-java (2)

Hice algo similar para registrar eventos.
El sujeto recopila algunos eventos, y una vez en 10 segundos los empuja al servidor.

La idea principal es, por ejemplo, tienes clase Event .

public class Event { public String jsonData; public String getJsonData() { return jsonData; } public Event setJsonData(String jsonData) { this.jsonData = jsonData; return this; } }

Debes crear una cola para eventos:

private PublishSubject<Event> eventQueue = PublishSubject.create();

Puede ser BehaviorSubject , no importa

Luego, debe crear la lógica, que controlará el envío de eventos al servidor:

eventObservable = eventQueue.asObservable() .buffer(10, TimeUnit.SECONDS) //flush events every 10 seconds .toList() .doOnNext(new Action1<List<Event>>() { @Override public void call(List<Event> events) { apiClient.pushEvents(events); //push your event } }) .onErrorResumeNext(new Func1<Throwable, Observable<List<Event>>>() { @Override public Observable<List<Event>> call(Throwable throwable) { return null; //make sure, that on error will be never called } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io());

Luego, debe suscribirse y conservar la suscripción hasta que no la necesite:

eventSubscription = eventObservable.subscribe()

Inicio esto ayuda

Quiero crear observables que hagan lo siguiente:

  • búfer todos los elementos, mientras están en pausa
  • Inmediatamente emiten elementos, mientras no están en pausa.
  • El activador de pausa / reanudación debe provenir de otro observable.
  • debe ser guardado para ser utilizado por observables que no se ejecutan en el subproceso principal y debe guardarse, cambiar el estado en pausa / reanudado desde el subproceso principal

Quiero usar un BehaviorSubject<Boolean> como disparador y vincular este disparador al evento onResume y onPause una actividad. (Ejemplo de código adjunto)

Pregunta

He configurado algo, pero no funciona como es debido. Lo uso como el siguiente:

Observable o = ...; // Variant 1 o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue()) // Variant 2 // o = o.compose(RXPauser.applyPauser(getPauser())); o .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe();

Actualmente, el problema es que la Variante 1 debería funcionar bien, pero a veces, los eventos simplemente no se emiten, la válvula no emite, hasta que la válvula funciona todo (puede ser un problema de enhebrado ...). La Solución 2 es mucho más simple y parece funcionar, pero no estoy seguro de que sea realmente mejor, no lo creo. Realmente no estoy seguro, ¿por qué la solución uno falla algunas veces, así que no estoy seguro de si la solución 2 resuelve el problema (actualmente desconocido para mí) ...

¿Alguien puede decirme cuál podría ser el problema o si la solución simple debería funcionar de manera confiable? ¿O mostrarme una solución confiable?

Código

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

Funciones RXPauser

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser) { return observable -> pauser(observable, pauser); } private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser) { // this observable buffers all items that are emitted while emission is paused Observable<T> sharedSource = source.publish().refCount(); Observable<T> queue = sharedSource .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed)) .flatMap(l -> Observable.from(l)) .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t)); // this observable emits all items that are emitted while emission is not paused Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> !isResumed)) .switchMap(tObservable -> tObservable) .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t)); // combine both observables return queue.mergeWith(window) .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t)); }

Actividad

public class BaseActivity extends AppCompatActivity { private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false); public BaseActivity(Bundle savedInstanceState) { super(args); final Class<?> clazz = this.getClass(); pauser .doOnUnsubscribe(() -> { L.d(clazz, "Pauser unsubscribed!"); }) .subscribe(aBoolean -> { L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED")); }); } public PublishSubject<Boolean> getPauser() { return pauser; } @Override protected void onResume() { super.onResume(); pauser.onNext(true); } @Override protected void onPause() { pauser.onNext(false); super.onPause(); } }


Realmente puede usar el operador .buffer() pasándolo observable, definiendo cuándo detener el almacenamiento en búfer, muestra del libro:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10) .buffer(Observable.interval(250, TimeUnit.MILLISECONDS)) .subscribe(System.out::println);

del capítulo 5, ''Controlando la secuencia'': https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

Puede usar PublishSubject como Observable para alimentar elementos en su operador personalizado. Cada vez que necesite iniciar el almacenamiento en búfer, cree una instancia por Observable.defer(() -> createBufferingValve())