tutorial rxkotlin rxjava example java android rx-java

rxkotlin - Rxjava Android cómo usar el operador Zip



rxjava zip example (7)

Tengo muchos problemas para comprender el operador zip en RxJava para mi proyecto de Android. Problema Necesito poder enviar una solicitud de red para cargar un video Luego necesito enviar una solicitud de red para cargar una imagen para ir con ella, finalmente necesito agregar una descripción y usar las respuestas de las dos solicitudes anteriores para cargar el video. URL de ubicación del video y la imagen junto con la descripción de mi servidor.

Asumí que el operador zip sería perfecto para esta tarea, ya que entendí que podíamos tomar la respuesta de dos observables (solicitudes de video e imagen) y usarlos para mi tarea final. Pero parece que no puedo lograr que esto ocurra como lo imagino.

Estoy buscando a alguien que responda cómo se puede hacer esto conceptualmente con un poco de código psuedo. Gracias


Aquí tengo un ejemplo que hice usando Zip de forma asíncrona, por si tienes curiosidad

/** * Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. * By default Rx is not async, only if you explicitly use subscribeOn. */ @Test public void testAsyncZip() { scheduler = Schedulers.newThread(); scheduler1 = Schedulers.newThread(); scheduler2 = Schedulers.newThread(); long start = System.currentTimeMillis(); Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) .concat(s3)) .subscribe(result -> showResult("Async in:", start, result)); } /** * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline */ @Test public void testZip() { long start = System.currentTimeMillis(); Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) .concat(s3)) .subscribe(result -> showResult("Sync in:", start, result)); } public void showResult(String transactionType, long start, String result) { System.out.println(result + " " + transactionType + String.valueOf(System.currentTimeMillis() - start)); } public Observable<String> obString() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "Hello"); } public Observable<String> obString1() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> " World"); } public Observable<String> obString2() { return Observable.just("") .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "!"); } public Observable<String> obAsyncString() { return Observable.just("") .observeOn(scheduler) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "Hello"); } public Observable<String> obAsyncString1() { return Observable.just("") .observeOn(scheduler1) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> " World"); } public Observable<String> obAsyncString2() { return Observable.just("") .observeOn(scheduler2) .doOnNext(val -> { System.out.println("Thread " + Thread.currentThread() .getName()); }) .map(val -> "!"); }

Puede ver más ejemplos aquí https://github.com/politrons/reactive


El operador de Zip empareja estrictamente los elementos emitidos por los observables. Espera a que lleguen ambos (o más) elementos y luego los fusiona. Entonces sí, esto sería adecuado para sus necesidades.

Func2 para encadenar el resultado de los dos primeros observables. Tenga en cuenta que este enfoque sería más simple si usa Retrofit ya que su interfaz api puede devolver un observable. De lo contrario, necesitaría crear su propio observable.

// assuming each observable returns response in the form of String Observable<String> movOb = Observable.create(...); // if you use Retrofit Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...), Observable.zip(movOb, picOb, new Func2<String, String, MyResult>() { @Override public MyResult call(String movieUploadResponse, String picUploadResponse) { // analyze both responses, upload them to another server // and return this method with a MyResult type return myResult; } } ) // continue chaining this observable with subscriber // or use it for something else


Esta es mi implementación usando Single.zip y rxJava2

Traté de hacerlo lo más fácil de entender posible

// // API Client Interface // @GET(ServicesConstants.API_PREFIX + "questions/{id}/") Single<Response<ResponseGeneric<List<ResponseQuestion>>>> getBaseQuestions(@Path("id") int personId); @GET(ServicesConstants.API_PREFIX + "physician/{id}/") Single<Response<ResponseGeneric<List<ResponsePhysician>>>> getPhysicianInfo(@Path("id") int personId); // // API middle layer - NOTE: I had feedback that the Single.create is not needed (but I haven''t yet spent the time to improve it) // public Single<List<ResponsePhysician>> getPhysicianInfo(int personId) { return Single.create(subscriber -> { apiClient.getPhysicianInfo(appId) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(response -> { ResponseGeneric<List<ResponsePhysician>> responseBody = response.body(); if(responseBody != null && responseBody.statusCode == 1) { if (!subscriber.isDisposed()) subscriber.onSuccess(responseBody.data); } else if(response.body() != null && response.body().status != null ){ if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status)); } else { if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message())); } }, throwable -> { throwable.printStackTrace(); if(!subscriber.isDisposed()) subscriber.onError(throwable); }); }); } public Single<List<ResponseQuestion>> getHealthQuestions(int personId){ return Single.create(subscriber -> { apiClient.getBaseQuestions(personId) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(response -> { ResponseGeneric<List<ResponseQuestion>> responseBody = response.body(); if(responseBody != null && responseBody.data != null) { if (!subscriber.isDisposed()) subscriber.onSuccess(response.body().data); } else if(response.body() != null && response.body().status != null ){ if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status)); } else { if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message())); } }, throwable -> { throwable.printStackTrace(); if(!subscriber.isDisposed()) subscriber.onError(throwable); }); }); } //please note that ResponseGeneric is just an outer wrapper of the returned data - common to all API''s in this project public class ResponseGeneric<T> { @SerializedName("Status") public String status; @SerializedName("StatusCode") public float statusCode; @SerializedName("Data") public T data; } // // API end-use layer - this gets close to the UI so notice the oberver is set for main thread // private static class MergedResponse{// this is just a POJO to store all the responses in one object public List<ResponseQuestion> listQuestions; public List<ResponsePhysician> listPhysicians; public MergedResponse(List<ResponseQuestion> listQuestions, List<ResponsePhysician> listPhysicians){ this.listQuestions = listQuestions; this.listPhysicians = listPhysicians; } } // example of Single.zip() - calls getHealthQuestions() and getPhysicianInfo() from API Middle Layer private void downloadHealthQuestions(int personId) { addRxSubscription(Single .zip(getHealthQuestions(personId), getPhysicianInfo(personId), MergedResponse::new) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(response -> { if(response != null) { Timber.i(" - total health questions downloaded %d", response.listQuestions.size()); Timber.i(" - physicians downloaded %d", response.listPhysicians.size()); if (response.listPhysicians != null && response.listPhysicians.size()>0) { // do your stuff to process response data } if (response.listQuestions != null && response.listQuestions.size()>0) { // do your stuff to process response data } } else { // process error - show message } }, error -> { // process error - show network error message })); }


He estado buscando una respuesta simple sobre cómo usar el operador Zip y qué hacer con los Observables que creo para pasárselos, me preguntaba si debería llamar a subscribe () para cada observable o no, ninguno de estos las respuestas fueron fáciles de encontrar, tuve que resolverlo por mí mismo, así que aquí hay un ejemplo simple para usar el operador Zip en 2 Observables:

@Test public void zipOperator() throws Exception { List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4); List<String> letters = Arrays.asList("a", "b", "c", "d", "e"); Observable<Integer> indexesObservable = Observable.fromIterable(indexes); Observable<String> lettersObservable = Observable.fromIterable(letters); Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems()) .subscribe(printMergedItems()); } @NonNull private BiFunction<Integer, String, String> mergeEmittedItems() { return new BiFunction<Integer, String, String>() { @Override public String apply(Integer index, String letter) throws Exception { return "[" + index + "] " + letter; } }; } @NonNull private Consumer<String> printMergedItems() { return new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } }; }

El resultado impreso es:

[0] a [1] b [2] c [3] d [4] e

Las respuestas finales a las preguntas que en mi cabeza eran las siguientes

los Observables pasados ​​al método zip () solo deben crearse, no necesitan tener suscriptores, solo crearlos es suficiente ... si desea que algún observable se ejecute en un programador, puede especificar esto para ese Observable ... también probé el operador zip () en Observables donde deberían esperar el resultado, y el Consumible del zip () se activó solo cuando ambos resultados estaban listos (que es el comportamiento esperado)


Un pequeño example :

Observable<String> stringObservable1 = Observable.just("Hello", "World"); Observable<String> stringObservable2 = Observable.just("Bye", "Friends"); Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() { @Override public String apply(@NonNull String s, @NonNull String s2) throws Exception { return s + " - " + s2; } }).subscribe(new Consumer<String>() { @Override public void accept(String s) throws Exception { System.out.println(s); } });

Esto imprimirá:

Hello - Bye World - Friends


Utiliza el zip de rxjava con Java 8 :

Observable<MovieResponse> movies = ... Observable<PictureResponse> picture = ... Observable<ZipResponse> response = Observable.zip(movies, picture, ZipResponse::new); class ZipResponse { private MovieResponse movieResponse; private PictureResponse pictureResponse; ZipResponse(MovieResponse movieResponse, PictureResponse pictureResponse) { this.movieResponse = movieResponse; this.pictureResponse = pictureResponse; } public MovieResponse getMovieResponse() { return movieResponse; } public void setMovieResponse(MovieResponse movieResponse) { this.movieResponse= movieResponse; } public PictureResponse getPictureResponse() { return pictureResponse; } public void setPictureResponse(PictureResponse pictureResponse) { this.pictureResponse= pictureResponse; } }


zip operador zip permite componer un resultado a partir de los resultados de dos observables diferentes.

Tendrá que dar am lambda que creará un resultado a partir de los datos emitidos por cada observable.

Observable<MovieResponse> movies = ... Observable<PictureResponse> picture = ... Observable<Response> response = movies.zipWith(picture, (movie, pic) -> { return new Response("description", movie.getName(), pic.getUrl()); });