rxjava2 - rxjava android tutorial
La recolección lenta de objetos paginados usando RxJava (1)
Casi me he vendido a RxJava, que es un compañero perfecto para Retrofit, pero estoy luchando por un patrón común mientras migro mi código: para ahorrar ancho de banda, me gustaría buscar objetos (paginados) de forma perezosa de mi servicio web, según sea necesario. , mientras que mi vista de lista (o recyclerview) se desplaza usando programación reactiva.
Mi código anterior estaba haciendo el trabajo a la perfección, pero la programación reactiva parece merecer la pena.
Escuchar el desplazamiento de listview / recyclerview (y otras cosas aburridas) no es la preocupación y obtener un Observable es fácil usando Retrofit:
@GET("/api/messages")
Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);
Simplemente no puedo descifrar el patrón a usar en la programación reactiva.
El operador de Concat
parece un buen punto de partida, junto con ConnectableObservable
en algún momento para diferir las emisiones y tal vez flatMap
, pero ¿cómo?
EDITAR:
Aquí está mi solución actual (ingenua):
public interface Paged<T> {
boolean isLoading();
void cancel();
void next(int count);
void next(int count, Scheduler scheduler);
Observable<List<T>> asObservable();
boolean hasCompleted();
int position();
}
Y mi implementación usando un tema:
public abstract class SimplePaged<T> implements Paged<T> {
final PublishSubject<List<T>> subject = PublishSubject.create();
private volatile boolean loading;
private volatile int offset;
private Subscription subscription;
@Override
public boolean isLoading() {
return loading;
}
@Override
public synchronized void cancel() {
if(subscription != null && !subscription.isUnsubscribed())
subscription.unsubscribe();
if(!hasCompleted())
subject.onCompleted();
subscription = null;
loading = false;
}
@Override
public void next(int count) {
next(count, null);
}
@Override
public synchronized void next(int count, Scheduler scheduler) {
if (isLoading())
throw new IllegalStateException("you can''t call next() before onNext()");
if(hasCompleted())
throw new IllegalStateException("you can''t call next() after onCompleted()");
loading = true;
Observable<List<T>> obs = onNextPage(offset, count).single();
if(scheduler != null)
obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler!
subscription = obs.subscribe(this::onNext, this::onError, this::onComplete);
}
@Override
public Observable<List<T>> asObservable() {
return subject.asObservable();
}
@Override
public boolean hasCompleted() {
return subject.hasCompleted();
}
@Override
public int position() {
return offset;
}
/* Warning: functions below may be called from another thread */
protected synchronized void onNext(List<T> items) {
if (items != null)
offset += items.size();
loading = false;
if (items == null || items.size() == 0)
subject.onCompleted();
else
subject.onNext(items);
}
protected synchronized void onError(Throwable t) {
loading = false;
subject.onError(t);
}
protected synchronized void onComplete() {
loading = false;
}
abstract protected Observable<List<T>> onNextPage(int offset, int count);
}
Esta es una de las pocas formas posibles de manejar la paginación reactiva. Supongamos que tenemos un método getNextPageTrigger
que devuelve un Observable
emite un objeto de evento cuando el oyente de desplazamiento (o cualquier entrada) quiere que se cargue una nueva página. En la vida real, probablemente tenga el operador de debounce
del debounce
, pero además de eso, nos aseguraremos de que solo se active después de que se cargue la última página.
También definimos un método para desenvolver los mensajes de su lista:
Observable<Message> getPage(final int page) {
return service.getMessages(page * PAGE_SIZE, PAGE_SIZE)
.flatMap(messageList -> Observable.from(messageList));
}
Entonces podemos hacer la lógica de búsqueda real:
// Start with the first page.
getPage(0)
// Add on each incremental future page.
.concatWith(Observable.range(1, Integer.MAX_VALUE)
// Uses a little trick to get the next page to wait for a signal to load.
// By ignoring all actual elements emitted and casting, the trigger must
// complete before the actual page request will be made.
.concatMap(page -> getNextPageTrigger().limit(1)
.ignoreElements()
.cast(Message.class)
.concatWith(getPage(page))) // Then subscribe, etc..
Todavía faltan un par de cosas potencialmente importantes:
1 - Obviamente, no se sabe cuándo dejar de buscar páginas adicionales, lo que significa que una vez que llega al final, dependiendo de lo que devuelva el servidor, puede seguir apareciendo errores o resultados vacíos cada vez que se desencadena el desplazamiento. Los enfoques para resolver esto dependen de cómo le indica al cliente que no hay más páginas para cargar.
2 - Si necesita reintentos de error, sugeriría que retryWhen
operador retryWhen
. De lo contrario, los errores de red comunes podrían causar un error en la carga de una página para propagarse.