qué - RxJava: encadenamiento observables
rxjava java (3)
¿Es posible implementar algo así como el próximo encadenamiento usando RxJava:
loginObservable()
.then( (someData) -> {
// returns another Observable<T> with some long operation
return fetchUserDataObservable(someData);
}).then( (userData) -> {
// it should be called when fetching user data completed (with userData of type T)
cacheUserData(userData);
}).then( (userData) -> {
// it should be called after all previous operations completed
displayUserData()
}).doOnError( (error) -> {
//do something
})
Encontré esta biblioteca muy interesante, pero no puedo entender cómo encadenar las solicitudes donde las demás dependen de las anteriores.
Claro, RxJava es compatible con .map
que hace esto. De la Wiki RxJava:
Básicamente, sería:
loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber<YourResult>() {
@Override
public void onCompleted() {
// observable stream has ended - no more logins possible
}
@Override
public void onError(Throwable e) {
// do something
}
@Override
public void onNext(YourType yourType) {
displayUserData();
}
});
Esta es la publicación más importante cuando se observan las cadenas observables en Google RxJava, así que agregaré otro caso común en el que no desea transformar los datos que recibe, pero lo encadenaré con otra acción (por ejemplo, configurar los datos en una base de datos). Utilice .flatmap()
. Aquí hay un ejemplo
mDataManager.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
.onErrorResumeNext(Function { Observable.error<List<Quote>>(it) }) //OnErrorResumeNext and Observable.error() would propagate the error to the next level. So, whatever error occurs here, would get passed to onError() on the UI side
.flatMap { t: List<Quote> ->
//Chain observable as such
mDataManager.setQuotesToDb(t).subscribe({}, { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } }, { d { "Done server set" } })
Observable.just(t)
}
.subscribeBy(
onNext = {},
onError = { mvpView?.showError("No internet connection") },
onComplete = { d { "onComplete(): done with fetching quotes from api" } }
)
Esto es RxKotlin2, pero la idea es la misma con RxJava y RxJava2:
Explicación rápida:
- tratamos de obtener algunos datos (citas en este ejemplo) de una API con
mDataManager.fetchQuotesFromApi()
- Suscribimos el observable para hacer cosas en el hilo
.io()
y mostrar resultados en el hilo.ui()
. -
onErrorResumeNext()
se asegura de que cualquier error que encontremos al captar datos quede atrapado en este método. Quiero finalizar toda la cadena cuando haya un error allí, así que devuelvo unObservable.error()
-
.flatmap()
es la parte de encadenamiento. Quiero poder establecer los datos que obtenga de la API en mi base de datos. No estoy transformando los datos que recibí usando.map()
, simplemente estoy haciendo algo más con esos datos sin transformarlos. - Me suscribo a la última cadena de observables. Si ocurriera un error con la obtención de datos (primer observable), se manejaría (en este caso, se propagaría al suscrito
onError()
) cononErrorResumeNext()
- Soy muy consciente de que me estoy suscribiendo al DB observable (dentro de
flatmap()
). Cualquier error que ocurra a través de este observable NO se propagará a los últimos métodossubscribeBy()
, ya que se maneja dentro del métodosubscribe()
dentro de la cadena.flatmap()
.
El código proviene de este proyecto que se encuentra aquí: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt
intente usar scan ()
Flowable.fromArray(array).scan(...).subscribe(...)