rxkotlin - Rxjava Android cómo usar el operador Zip
rxjava zip example (7)
Tengo muchos problemas para comprender el operador zip en RxJava para mi proyecto de Android. Problema Necesito poder enviar una solicitud de red para cargar un video Luego necesito enviar una solicitud de red para cargar una imagen para ir con ella, finalmente necesito agregar una descripción y usar las respuestas de las dos solicitudes anteriores para cargar el video. URL de ubicación del video y la imagen junto con la descripción de mi servidor.
Asumí que el operador zip sería perfecto para esta tarea, ya que entendí que podíamos tomar la respuesta de dos observables (solicitudes de video e imagen) y usarlos para mi tarea final. Pero parece que no puedo lograr que esto ocurra como lo imagino.
Estoy buscando a alguien que responda cómo se puede hacer esto conceptualmente con un poco de código psuedo. Gracias
Aquí tengo un ejemplo que hice usando Zip de forma asíncrona, por si tienes curiosidad
/**
* Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel.
* By default Rx is not async, only if you explicitly use subscribeOn.
*/
@Test
public void testAsyncZip() {
scheduler = Schedulers.newThread();
scheduler1 = Schedulers.newThread();
scheduler2 = Schedulers.newThread();
long start = System.currentTimeMillis();
Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Async in:", start, result));
}
/**
* In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
*/
@Test
public void testZip() {
long start = System.currentTimeMillis();
Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
.concat(s3))
.subscribe(result -> showResult("Sync in:", start, result));
}
public void showResult(String transactionType, long start, String result) {
System.out.println(result + " " +
transactionType + String.valueOf(System.currentTimeMillis() - start));
}
public Observable<String> obString() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obString1() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obString2() {
return Observable.just("")
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
public Observable<String> obAsyncString() {
return Observable.just("")
.observeOn(scheduler)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "Hello");
}
public Observable<String> obAsyncString1() {
return Observable.just("")
.observeOn(scheduler1)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> " World");
}
public Observable<String> obAsyncString2() {
return Observable.just("")
.observeOn(scheduler2)
.doOnNext(val -> {
System.out.println("Thread " + Thread.currentThread()
.getName());
})
.map(val -> "!");
}
Puede ver más ejemplos aquí https://github.com/politrons/reactive
El operador de Zip empareja estrictamente los elementos emitidos por los observables. Espera a que lleguen ambos (o más) elementos y luego los fusiona. Entonces sí, esto sería adecuado para sus necesidades.
Func2
para encadenar el resultado de los dos primeros observables.
Tenga en cuenta que este enfoque sería más simple si usa Retrofit ya que su interfaz api puede devolver un observable.
De lo contrario, necesitaría crear su propio observable.
// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb,
new Func2<String, String, MyResult>() {
@Override
public MyResult call(String movieUploadResponse,
String picUploadResponse) {
// analyze both responses, upload them to another server
// and return this method with a MyResult type
return myResult;
}
}
)
// continue chaining this observable with subscriber
// or use it for something else
Esta es mi implementación usando Single.zip y rxJava2
Traté de hacerlo lo más fácil de entender posible
//
// API Client Interface
//
@GET(ServicesConstants.API_PREFIX + "questions/{id}/")
Single<Response<ResponseGeneric<List<ResponseQuestion>>>> getBaseQuestions(@Path("id") int personId);
@GET(ServicesConstants.API_PREFIX + "physician/{id}/")
Single<Response<ResponseGeneric<List<ResponsePhysician>>>> getPhysicianInfo(@Path("id") int personId);
//
// API middle layer - NOTE: I had feedback that the Single.create is not needed (but I haven''t yet spent the time to improve it)
//
public Single<List<ResponsePhysician>> getPhysicianInfo(int personId) {
return Single.create(subscriber -> {
apiClient.getPhysicianInfo(appId)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
ResponseGeneric<List<ResponsePhysician>> responseBody = response.body();
if(responseBody != null && responseBody.statusCode == 1) {
if (!subscriber.isDisposed()) subscriber.onSuccess(responseBody.data);
} else if(response.body() != null && response.body().status != null ){
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
});
});
}
public Single<List<ResponseQuestion>> getHealthQuestions(int personId){
return Single.create(subscriber -> {
apiClient.getBaseQuestions(personId)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(response -> {
ResponseGeneric<List<ResponseQuestion>> responseBody = response.body();
if(responseBody != null && responseBody.data != null) {
if (!subscriber.isDisposed()) subscriber.onSuccess(response.body().data);
} else if(response.body() != null && response.body().status != null ){
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
} else {
if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
}
}, throwable -> {
throwable.printStackTrace();
if(!subscriber.isDisposed()) subscriber.onError(throwable);
});
});
}
//please note that ResponseGeneric is just an outer wrapper of the returned data - common to all API''s in this project
public class ResponseGeneric<T> {
@SerializedName("Status")
public String status;
@SerializedName("StatusCode")
public float statusCode;
@SerializedName("Data")
public T data;
}
//
// API end-use layer - this gets close to the UI so notice the oberver is set for main thread
//
private static class MergedResponse{// this is just a POJO to store all the responses in one object
public List<ResponseQuestion> listQuestions;
public List<ResponsePhysician> listPhysicians;
public MergedResponse(List<ResponseQuestion> listQuestions, List<ResponsePhysician> listPhysicians){
this.listQuestions = listQuestions;
this.listPhysicians = listPhysicians;
}
}
// example of Single.zip() - calls getHealthQuestions() and getPhysicianInfo() from API Middle Layer
private void downloadHealthQuestions(int personId) {
addRxSubscription(Single
.zip(getHealthQuestions(personId), getPhysicianInfo(personId), MergedResponse::new)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(response -> {
if(response != null) {
Timber.i(" - total health questions downloaded %d", response.listQuestions.size());
Timber.i(" - physicians downloaded %d", response.listPhysicians.size());
if (response.listPhysicians != null && response.listPhysicians.size()>0) {
// do your stuff to process response data
}
if (response.listQuestions != null && response.listQuestions.size()>0) {
// do your stuff to process response data
}
} else {
// process error - show message
}
}, error -> {
// process error - show network error message
}));
}
He estado buscando una respuesta simple sobre cómo usar el operador Zip y qué hacer con los Observables que creo para pasárselos, me preguntaba si debería llamar a subscribe () para cada observable o no, ninguno de estos las respuestas fueron fáciles de encontrar, tuve que resolverlo por mí mismo, así que aquí hay un ejemplo simple para usar el operador Zip en 2 Observables:
@Test
public void zipOperator() throws Exception {
List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4);
List<String> letters = Arrays.asList("a", "b", "c", "d", "e");
Observable<Integer> indexesObservable = Observable.fromIterable(indexes);
Observable<String> lettersObservable = Observable.fromIterable(letters);
Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems())
.subscribe(printMergedItems());
}
@NonNull
private BiFunction<Integer, String, String> mergeEmittedItems() {
return new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer index, String letter) throws Exception {
return "[" + index + "] " + letter;
}
};
}
@NonNull
private Consumer<String> printMergedItems() {
return new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
};
}
El resultado impreso es:
[0] a
[1] b
[2] c
[3] d
[4] e
Las respuestas finales a las preguntas que en mi cabeza eran las siguientes
los Observables pasados al método zip () solo deben crearse, no necesitan tener suscriptores, solo crearlos es suficiente ... si desea que algún observable se ejecute en un programador, puede especificar esto para ese Observable ... también probé el operador zip () en Observables donde deberían esperar el resultado, y el Consumible del zip () se activó solo cuando ambos resultados estaban listos (que es el comportamiento esperado)
Un pequeño example :
Observable<String> stringObservable1 = Observable.just("Hello", "World");
Observable<String> stringObservable2 = Observable.just("Bye", "Friends");
Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() {
@Override
public String apply(@NonNull String s, @NonNull String s2) throws Exception {
return s + " - " + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
Esto imprimirá:
Hello - Bye
World - Friends
Utiliza el
zip
de
rxjava
con
Java 8
:
Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...
Observable<ZipResponse> response = Observable.zip(movies, picture, ZipResponse::new);
class ZipResponse {
private MovieResponse movieResponse;
private PictureResponse pictureResponse;
ZipResponse(MovieResponse movieResponse, PictureResponse pictureResponse) {
this.movieResponse = movieResponse;
this.pictureResponse = pictureResponse;
}
public MovieResponse getMovieResponse() {
return movieResponse;
}
public void setMovieResponse(MovieResponse movieResponse) {
this.movieResponse= movieResponse;
}
public PictureResponse getPictureResponse() {
return pictureResponse;
}
public void setPictureResponse(PictureResponse pictureResponse) {
this.pictureResponse= pictureResponse;
}
}
zip
operador
zip
permite componer un resultado a partir de los resultados de dos observables diferentes.
Tendrá que dar am lambda que creará un resultado a partir de los datos emitidos por cada observable.
Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...
Observable<Response> response = movies.zipWith(picture, (movie, pic) -> {
return new Response("description", movie.getName(), pic.getUrl());
});