single rxjava qué observables métodos how curso componen androidhive java android scala promise rx-java

qué - RxJava: encadenamiento observables



rxjava java (3)

¿Es posible implementar algo así como el próximo encadenamiento usando RxJava:

loginObservable() .then( (someData) -> { // returns another Observable<T> with some long operation return fetchUserDataObservable(someData); }).then( (userData) -> { // it should be called when fetching user data completed (with userData of type T) cacheUserData(userData); }).then( (userData) -> { // it should be called after all previous operations completed displayUserData() }).doOnError( (error) -> { //do something })

Encontré esta biblioteca muy interesante, pero no puedo entender cómo encadenar las solicitudes donde las demás dependen de las anteriores.


Claro, RxJava es compatible con .map que hace esto. De la Wiki RxJava:

Básicamente, sería:

loginObservable() .switchMap( someData -> fetchUserDataObservable(someData) ) .map( userData -> cacheUserData(userData) ) .subscribe(new Subscriber<YourResult>() { @Override public void onCompleted() { // observable stream has ended - no more logins possible } @Override public void onError(Throwable e) { // do something } @Override public void onNext(YourType yourType) { displayUserData(); } });


Esta es la publicación más importante cuando se observan las cadenas observables en Google RxJava, así que agregaré otro caso común en el que no desea transformar los datos que recibe, pero lo encadenaré con otra acción (por ejemplo, configurar los datos en una base de datos). Utilice .flatmap() . Aquí hay un ejemplo

mDataManager.fetchQuotesFromApi(limit) .subscribeOn(mSchedulerProvider.io()) .observeOn(mSchedulerProvider.ui()) .onErrorResumeNext(Function { Observable.error<List<Quote>>(it) }) //OnErrorResumeNext and Observable.error() would propagate the error to the next level. So, whatever error occurs here, would get passed to onError() on the UI side .flatMap { t: List<Quote> -> //Chain observable as such mDataManager.setQuotesToDb(t).subscribe({}, { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } }, { d { "Done server set" } }) Observable.just(t) } .subscribeBy( onNext = {}, onError = { mvpView?.showError("No internet connection") }, onComplete = { d { "onComplete(): done with fetching quotes from api" } } )

Esto es RxKotlin2, pero la idea es la misma con RxJava y RxJava2:

Explicación rápida:

  • tratamos de obtener algunos datos (citas en este ejemplo) de una API con mDataManager.fetchQuotesFromApi()
  • Suscribimos el observable para hacer cosas en el hilo .io() y mostrar resultados en el hilo .ui() .
  • onErrorResumeNext() se asegura de que cualquier error que encontremos al captar datos quede atrapado en este método. Quiero finalizar toda la cadena cuando haya un error allí, así que devuelvo un Observable.error()
  • .flatmap() es la parte de encadenamiento. Quiero poder establecer los datos que obtenga de la API en mi base de datos. No estoy transformando los datos que recibí usando .map() , simplemente estoy haciendo algo más con esos datos sin transformarlos.
  • Me suscribo a la última cadena de observables. Si ocurriera un error con la obtención de datos (primer observable), se manejaría (en este caso, se propagaría al suscrito onError() ) con onErrorResumeNext()
  • Soy muy consciente de que me estoy suscribiendo al DB observable (dentro de flatmap() ). Cualquier error que ocurra a través de este observable NO se propagará a los últimos métodos subscribeBy() , ya que se maneja dentro del método subscribe() dentro de la cadena .flatmap() .

El código proviene de este proyecto que se encuentra aquí: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt


intente usar scan ()

Flowable.fromArray(array).scan(...).subscribe(...)