tutorial rxjava2 rxjava library android pagination reactive-programming rx-java

rxjava2 - rxjava android tutorial



La recolección lenta de objetos paginados usando RxJava (1)

Casi me he vendido a RxJava, que es un compañero perfecto para Retrofit, pero estoy luchando por un patrón común mientras migro mi código: para ahorrar ancho de banda, me gustaría buscar objetos (paginados) de forma perezosa de mi servicio web, según sea necesario. , mientras que mi vista de lista (o recyclerview) se desplaza usando programación reactiva.

Mi código anterior estaba haciendo el trabajo a la perfección, pero la programación reactiva parece merecer la pena.

Escuchar el desplazamiento de listview / recyclerview (y otras cosas aburridas) no es la preocupación y obtener un Observable es fácil usando Retrofit:

@GET("/api/messages") Observable<List<Message>> getMessages(@Path("offset") int offset, @Path("limit") int limit);

Simplemente no puedo descifrar el patrón a usar en la programación reactiva.

El operador de Concat parece un buen punto de partida, junto con ConnectableObservable en algún momento para diferir las emisiones y tal vez flatMap , pero ¿cómo?

EDITAR:

Aquí está mi solución actual (ingenua):

public interface Paged<T> { boolean isLoading(); void cancel(); void next(int count); void next(int count, Scheduler scheduler); Observable<List<T>> asObservable(); boolean hasCompleted(); int position(); }

Y mi implementación usando un tema:

public abstract class SimplePaged<T> implements Paged<T> { final PublishSubject<List<T>> subject = PublishSubject.create(); private volatile boolean loading; private volatile int offset; private Subscription subscription; @Override public boolean isLoading() { return loading; } @Override public synchronized void cancel() { if(subscription != null && !subscription.isUnsubscribed()) subscription.unsubscribe(); if(!hasCompleted()) subject.onCompleted(); subscription = null; loading = false; } @Override public void next(int count) { next(count, null); } @Override public synchronized void next(int count, Scheduler scheduler) { if (isLoading()) throw new IllegalStateException("you can''t call next() before onNext()"); if(hasCompleted()) throw new IllegalStateException("you can''t call next() after onCompleted()"); loading = true; Observable<List<T>> obs = onNextPage(offset, count).single(); if(scheduler != null) obs = obs.subscribeOn(scheduler); // BEWARE! onNext/onError/onComplete will happens on that scheduler! subscription = obs.subscribe(this::onNext, this::onError, this::onComplete); } @Override public Observable<List<T>> asObservable() { return subject.asObservable(); } @Override public boolean hasCompleted() { return subject.hasCompleted(); } @Override public int position() { return offset; } /* Warning: functions below may be called from another thread */ protected synchronized void onNext(List<T> items) { if (items != null) offset += items.size(); loading = false; if (items == null || items.size() == 0) subject.onCompleted(); else subject.onNext(items); } protected synchronized void onError(Throwable t) { loading = false; subject.onError(t); } protected synchronized void onComplete() { loading = false; } abstract protected Observable<List<T>> onNextPage(int offset, int count); }


Esta es una de las pocas formas posibles de manejar la paginación reactiva. Supongamos que tenemos un método getNextPageTrigger que devuelve un Observable emite un objeto de evento cuando el oyente de desplazamiento (o cualquier entrada) quiere que se cargue una nueva página. En la vida real, probablemente tenga el operador de debounce del debounce , pero además de eso, nos aseguraremos de que solo se active después de que se cargue la última página.

También definimos un método para desenvolver los mensajes de su lista:

Observable<Message> getPage(final int page) { return service.getMessages(page * PAGE_SIZE, PAGE_SIZE) .flatMap(messageList -> Observable.from(messageList)); }

Entonces podemos hacer la lógica de búsqueda real:

// Start with the first page. getPage(0) // Add on each incremental future page. .concatWith(Observable.range(1, Integer.MAX_VALUE) // Uses a little trick to get the next page to wait for a signal to load. // By ignoring all actual elements emitted and casting, the trigger must // complete before the actual page request will be made. .concatMap(page -> getNextPageTrigger().limit(1) .ignoreElements() .cast(Message.class) .concatWith(getPage(page))) // Then subscribe, etc..

Todavía faltan un par de cosas potencialmente importantes:

1 - Obviamente, no se sabe cuándo dejar de buscar páginas adicionales, lo que significa que una vez que llega al final, dependiendo de lo que devuelva el servidor, puede seguir apareciendo errores o resultados vacíos cada vez que se desencadena el desplazamiento. Los enfoques para resolver esto dependen de cómo le indica al cliente que no hay más páginas para cargar.

2 - Si necesita reintentos de error, sugeriría que retryWhen operador retryWhen . De lo contrario, los errores de red comunes podrían causar un error en la carga de una página para propagarse.