tutorial rxkotlin rxjava example java functional-programming reactive-programming rx-java

java - rxkotlin - rxjs



RxJava: ¿cómo componer múltiples Observables con dependencias y recopilar todos los resultados al final? (3)

Estoy aprendiendo RxJava y, como mi primer experimento, intento reescribir el código en el primer método run() en este código (citado en el blog de Netflix como un problema que RxJava puede ayudar a resolver) para mejorar su asincronicidad utilizando RxJava, es decir, no espera el resultado del primer Futuro ( f1.get() ) antes de continuar con el resto del código.

f3 depende de f1 . Veo cómo manejar esto, flatMap parece hacer el truco:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA())) .flatMap(new Func1<String, Observable<String>>() { @Override public Observable<String> call(String s) { return Observable.from(executor.submit(new CallToRemoteServiceC(s))); } });

A continuación, f4 y f5 dependen de f2 . Tengo esto:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB())) .flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer i) { Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i))); Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i))); return Observable.merge(f4Observable, f5Observable); } });

Lo que empieza a ponerse raro ( merge probablemente no es lo que quiero ...) pero me permite hacer esto al final, no es exactamente lo que quiero:

f3Observable.subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println("Observed from f3: " + s); f4And5Observable.subscribe(new Action1<Integer>() { @Override public void call(Integer i) { System.out.println("Observed from f4 and f5: " + i); } }); } });

Eso me da:

Observed from f3: responseB_responseA Observed from f4 and f5: 140 Observed from f4 and f5: 5100

que son todos los números, pero desafortunadamente obtengo los resultados en invocaciones separadas, así que no puedo reemplazar la impresión final en el código original:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

No entiendo cómo obtener acceso a ambos valores de retorno en la misma línea. Creo que probablemente hay algo de programación funcional que me estoy perdiendo. ¿Cómo puedo hacer esto? Gracias.


Creo que lo que estás buscando es switchmap. Nos encontramos con un problema similar en el que tenemos un servicio de sesión que se encarga de obtener una nueva sesión de una API, y necesitamos esa sesión antes de poder obtener más datos. Podemos agregar a la sesión observable que devuelve el sessionToken para uso en nuestra llamada de datos.

getSession devuelve un observable;

public getSession(): Observable<any>{ if (this.sessionToken) return Observable.of(this.sessionToken); else if(this.sessionObservable) return this.sessionObservable; else { // simulate http call this.sessionObservable = Observable.of(this.sessonTokenResponse) .map(res => { this.sessionObservable = null; return res.headers["X-Session-Token"]; }) .delay(500) .share(); return this.sessionObservable; } }

y getData toma eso observable y lo anexa.

public getData() { if (this.dataObservable) return this.dataObservable; else { this.dataObservable = this.sessionService.getSession() .switchMap((sessionToken:string, index:number) =>{ //simulate data http call that needed sessionToken return Observable.of(this.dataResponse) .map(res => { this.dataObservable = null; return res.body; }) .delay(1200) }) .map ( data => { return data; }) .catch(err => { console.log("err in data service", err); // return err; }) .share(); return this.dataObservable; } }

Aún necesitará un plano para combinar los observables no dependientes.

Plunkr: http://plnkr.co/edit/hiA1jP?p=info

Donde tuve la idea de usar el mapa de cambio: http://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html


Parece que todo lo que realmente necesitas es un poco más de ánimo y perspectiva sobre cómo se usa RX. Le sugiero que lea más en la documentación así como en los diagramas de mármol (sé que no siempre son útiles). También sugiero mirar en la función lift() y operadores.

  • El punto completo de un observable es concatenar el flujo de datos y la manipulación de datos en un solo objeto
  • El punto de las llamadas a map , flatMap y filter es manipular los datos en su flujo de datos
  • El punto de fusión es combinar flujos de datos.
  • El objetivo de los operadores es permitirle interrumpir un flujo constante de observables y definir sus propias operaciones en un flujo de datos. Por ejemplo, codifiqué un operador de media móvil. Eso resume n double s en un Observable de dobles para devolver un flujo de promedios móviles. El código literalmente se veía así

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

Se sentirá aliviado de que muchos de los métodos de filtrado que dan por descontado tienen todos lift() bajo el capó.

Con eso dicho; todo lo que se necesita para fusionar múltiples dependencias es:

  • cambiando todos los datos entrantes a un tipo de datos estándar usando map o flatMap
  • fusionar tipos de datos estándar a un flujo
  • usar operadores personalizados si un objeto necesita esperar en otro, o si necesita ordenar datos en la ruta. Precaución: este enfoque ralentizará la corriente
  • Usando para enumerar o suscribirse para recopilar todos esos datos

Edición: alguien convirtió el siguiente texto, que yo había agregado como una edición de la pregunta, en una respuesta, que aprecio, y entiendo que puede ser lo que el SO debe hacer, sin embargo , no considero que esta sea una respuesta porque claramente no lo es. La forma correcta de hacerlo . Nunca usaría este código ni aconsejaría a nadie que lo copie. Otras / mejores soluciones y comentarios bienvenidos!

Pude resolver esto con lo siguiente. No me di cuenta de que podías flatMap un flatMap observable más de una vez, asumí que los resultados solo podían consumirse una vez. Así que simplemente flatMap f2Observable dos veces (lo siento, cambié el nombre de algunas cosas en el código desde mi publicación original), luego zip todos los Observables y luego me suscribo a eso. Ese Map en el zip para agregar los valores no es deseable debido al tipo de malabarismo. Otras / mejores soluciones y comentarios bienvenidos! El código completo es visible en una esencia . Gracias.

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB()); Observable<Integer> f2Observable = Observable.from(f2); Observable<Integer> f4Observable = f2Observable .flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { System.out.println("Observed from f2: " + integer); Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer)); return Observable.from(f4); } }); Observable<Integer> f5Observable = f2Observable .flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer integer) { System.out.println("Observed from f2: " + integer); Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer)); return Observable.from(f5); } }); Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() { @Override public Map<String, String> call(String s, Integer integer, Integer integer2) { Map<String, String> map = new HashMap<String, String>(); map.put("f3", s); map.put("f4", String.valueOf(integer)); map.put("f5", String.valueOf(integer2)); return map; } }).subscribe(new Action1<Map<String, String>>() { @Override public void call(Map<String, String> map) { System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5")))); } });

Y esto me da el resultado deseado:

responseB_responseA => 714000