rxjava2 rxjava react example curso rx-java

rx java - react - ¿Cómo manejar la paginación con RxJava?



rxjava wikipedia (4)

¡Era hard rock!) Así que hemos pedido a la red:
getUsersByKeyword(String query, int limit, int offset)
y esta solicitud devuelve por ejemplo
List< Result >
Si usamos RetroFit para redes, esa solicitud se verá:
Observable< List< Result >> getUsersByKeyword(String query, int limit, int offset)
Como resultado queremos obtener todos los Result del servidor.
Así se verá así

int page = 50; int limit = page; Observable .range(0, Integer.MAX_VALUE - 1) .concatMap(new Func1<Integer, Observable<List<Result>>>() { @Override public Observable<List<Result>> call(Integer integer) { return getUsersByKeyword(query, integer * page, limit); } }) .takeWhile(new Func1<List<Result>, Boolean>() { @Override public Boolean call(List<Result> results) { return !results.isEmpty(); } }) .scan(new Func2< List<Result>, List<Result>, List<Result>>() { @Override public List<Result> call(List<Result> results, List< Result> results2) { List<Result> list = new ArrayList<>(); list.addAll(results); list.addAll(results2); return list; } }) .last() .subscribe(new Subscriber<List<Result>>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(List<Results> results) { } });

Código fue probado!

Estoy buscando convertir mi aplicación de Android para usar Rxjava para solicitudes de red. Actualmente accedo a un servicio web similar a:

getUsersByKeyword(String query, int limit, int offset)

Como lo entiendo, los Observables son una interfaz "push" en lugar de una "pull". Así que aquí es cómo entiendo las cosas para trabajar:

  • La aplicación se registra con el servicio, obteniendo Observable para consulta
  • los resultados son empujados a la aplicación
  • aplicación se ocupa de los resultados
  • Cuando la aplicación quiere más resultados ...?

Aquí es donde las cosas se rompen para mí. Anteriormente, solo le pedía al servicio web exactamente lo que quiero, volver a realizar la consulta con el desplazamiento. Pero en este caso, eso implicaría crear otro Observable y suscribirse a él, como derrotar el punto.

¿Cómo debo manejar la paginación en mi aplicación? (Es una aplicación de Android, pero no creo que sea relevante).


He hecho esto y en realidad no es tan difícil.

El enfoque es modelar cada primera solicitud (desplazamiento 0) en un firstRequestsObservable. Para hacerlo más fácil, puede hacerlo como un PublishSubject al que invocar onNext() para onNext() la siguiente solicitud, pero existen formas más inteligentes de no hacerlo por el sujeto (por ejemplo, si las solicitudes se realizan al hacer clic en un botón, entonces El requestObservable es el clickObservable asignado a través de algunos operadores).

Una vez que haya firstRequestsObservable en su lugar, puede hacer responseObservable mediante flatMapping desde firstRequestsObservable y así sucesivamente, para realizar la llamada de servicio.

Ahora viene el truco: haga otro observable llamado subsequentRequestsObservable que se asigna desde responseObservable , incrementando el desplazamiento (para este propósito es bueno incluir, en los datos de respuesta, el desplazamiento de la solicitud de origen). Una vez que introduzcas este observable, ahora tienes que cambiar la definición de responseObservable para que dependa también de las subsequentRequestsObservable . Entonces obtienes una dependencia circular como esta:

firstRequestsObservable -> responseObservable -> followingRequestsObservable -> responseObservable -> subsecuenteRequestsObservable -> ...

Para interrumpir este ciclo, es probable que desee incluir un operador de filter en la definición de subsequentRequestsObservable , filtrando aquellos casos en que la compensación pasaría el límite "total". La dependencia circular también significa que necesita que uno de ellos sea un Sujeto, de lo contrario sería imposible declarar los observables. Recomiendo responseObservable para ser ese sujeto.

Entonces, en general, primero inicialice responseObservable como un Asunto, luego declare firstRequestsObservable, luego declare subsecuenteRequestsObservable como el resultado de pasar responseObservable a través de algunos operadores. responseObservable puede ser "alimentado" usando onNext .


Para que quede claro, asumo que sus preguntas son más sobre cómo aplicar RxJava en su aplicación de Android en lugar de en el back-end (aunque es posible aplicar también pero no es tan típico como el front-end). Y no estoy muy seguro de si este ejemplo es un caso de uso típico para el uso del modelo de programación funcional reactiva (RFP) excepto para las construcciones de código limpio.

Cada corriente a continuación es un observable. Personalmente, me gustaría pensarlo como una transmisión, ya que es fácil razonar acerca de los eventos. El flujo de paginación se puede representar mediante 6 flujos:

firstPageStream -f-------------------> // this is actually to produce very first event to obtain the first page result. The event can come from a touch in one of the screen that navigates to this list screen. nextPageStream -----n-------n-------> // this is the source of events coming from Next button ''touch'' actions prevPageStream ---------p-------p---> // this is the source of events coming from Previous button ''touch'' actions requestStream -r---r---r---r---r---> // this is to consume the signal from 3 streams above and spawn events (r) which create a query and pagination details i.e.: offset and limit responseStream -R---R---R---R---R---> // this is to take each r and invoke your web service getUsersByKeyword() then spawn the response (R)

Las secuencias anteriores se pueden representar en pseudo código (estilo JS) a continuación (sería bastante fácil traducir a RxJava u otros idiomas)

firstPageStream = Observable.just(0); // offset=0 nextPageStream = Observable.fromEvent(nextButton, ''touch'') .map(function() { offset = offset + limit; return offset; }); prevPageStream = Observable.fromEvent(prevButton, ''touch'') .map(function() { offset = offset - limit; // TODO some check here return offset; }); requestStream = Observable.merge(firstPageStream, nextPageStream, prevPageStream) .map(function(offsetValue) { return {offset : offsetValue, limit: limit}; }); responseStream = requestStream.flatMap(function(pagination) { return webservice.getUsersByKeyword(query, pagination.offset, pagination.limit); // assume this is async response }); responseStream.subscribe(function(result) { // use result to render the display });

PS1: No he probado el código anterior. Estoy aprendiendo RFP, así que solo trato de pensar de manera reactiva escribiendo. Bienvenido cualquier sugerencia.

PS2: Estoy muy influenciado por https://gist.github.com/staltz/868e7e9bc2a7b8c1f754 para la forma de explicar las corrientes reactivas.


Por lo tanto, si esta es una paginación unidireccional, aquí hay un patrón que puede probar. Este código no se ha ejecutado ni compilado, pero he intentado realizar una anotación excesiva para explicar lo que está sucediendo.

private static final int LIMIT = 50; // Given: Returns a stream of dummy event objects telling us when // to grab the next page. This could be from a click or wherever. Observable<NextPageEvent> getNextPageEvents(); // Given: // The search query keywords. Each emission here means a new top-level // request; Observable<String> queries; queries.switchMap((query) -> getNextPageEvents() // Ignore ''next page'' pokes when unable to take them. .onBackPressureDrop() // Seed with the first page. .startWith(new NextPageEvent()) // Increment the page number on each request. .scan(0, (page, event) -> page + 1) // Request the page from the server. .concatMap((page) -> getUsersByKeyword(query, LIMIT, LIMIT * page) // Unroll Observable<List<User> into Observable<User> .concatMap((userList) -> Observable.from(userList)) .retryWhen(/** Insert your favorite retry logic here. */)) // Only process new page requests sequentially. .scheduleOn(Schedulers.trampoline()) // Trampoline schedules on the ''current thread'', so we make sure that''s // a background IO thread. .scheduleOn(Schedulers.io());

Eso debería permitir que la señal de ''eventos de la siguiente página'' active una carga de los datos de la página siguiente cada vez, y que no salte páginas si se produce un error al cargar una. También se reinicia completamente en el nivel superior si recibe una nueva consulta de búsqueda. Si yo (¿o alguien más?) Tiene tiempo, me gustaría verificar mis suposiciones sobre el trampolín y la contrapresión y asegurarme de que bloquee cualquier intento de buscar prematuramente la página siguiente mientras se está cargando.