tutorial schedulers rxkotlin rxjava2 rxjava para java rx-java reactive-programming rx-android

java - schedulers - Combina una lista de Observables y espera hasta que todo esté completo



rxkotlin (6)

TL; DR ¿Cómo convertir Task.whenAll(List<Task>) en RxJava ?

Mi código existente usa Bolts para crear una lista de tareas asincrónicas y espera hasta que todas esas tareas finalicen antes de realizar otros pasos. Esencialmente, construye una List<Task> y devuelve una Task única que se marca como completada cuando se completan todas las tareas en la lista, según el ejemplo en el sitio de Bolts .

Estoy buscando reemplazar Bolts con RxJava y estoy asumiendo que este método de construir una lista de tareas asíncronas (tamaño no conocido de antemano) y envolverlas todas en un solo Observable es posible, pero no sé cómo.

He intentado buscar en merge , zip , concat , etc. pero no puedo ponerme a trabajar en la List<Observable> que estaría creando, ya que todos parecen estar orientados a trabajar solo en dos Observables a la vez si Entiendo los documentos correctamente.

Estoy tratando de aprender RxJava y todavía soy muy nuevo, así que perdóname si esta es una pregunta obvia o se explica en los documentos en alguna parte; He intentado buscar. Cualquier ayuda sería muy apreciada.


Con Kotlin

Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean -> })

Es importante establecer el tipo para los argumentos de la función o tendrá errores de compilación

El último tipo de argumento cambia con el número de argumento: BiFunction para 2 Function3 para 3 Function4 para 4 ...


De las sugerencias propuestas, zip() realidad combina resultados observables entre sí, lo que puede o no ser lo que se desea, pero no se hizo en la pregunta. En la pregunta, todo lo que se quería era la ejecución de cada una de las operaciones, ya sea una por una o en paralelo (lo que no se especificó, pero el ejemplo de Bolts vinculado se refería a la ejecución en paralelo). Además, zip () se completará de inmediato cuando se complete cualquiera de los observables, por lo que infringe los requisitos.

Para la ejecución paralela de Observables, flatMap () presentado en la otra respuesta está bien, pero merge() sería más directo. Tenga en cuenta que la fusión se cerrará en caso de error de cualquiera de los Observables, si prefiere posponer la salida hasta que todos los observables hayan terminado, debería mirar mergeDelayError() .

Para uno por uno, creo que se debe usar el método estático Observable.concat () . Su javadoc dice así:

concat (java.lang.Iterable> secuencias) aplana un Iterable de observables en un observable, uno tras otro, sin intercalarlos

que suena como lo que buscas si no quieres una ejecución paralela.

Además, si solo le interesa completar su tarea, no devolver valores, probablemente debería buscar Completable en lugar de Observable .

TLDR: para la ejecución individual de tareas y el evento de finalización cuando se completan, creo que Completable.concat () es el más adecuado. Para la ejecución paralela, Completable.merge () o Completable.mergeDelayError () suena como la solución. El primero se detendrá inmediatamente ante cualquier error en cualquier completable, el último los ejecutará a todos, incluso si uno de ellos tiene un error, y solo entonces informa el error.


Estoy escribiendo algunos códigos de cálculo en Kotlin con JavaRx Observables y RxKotlin. Quiero observar una lista de observables para completar y, mientras tanto, darme una actualización con el progreso y el último resultado. Al final, devuelve el mejor resultado de cálculo. Un requisito adicional era ejecutar Observables en paralelo para usar todos mis núcleos de CPU. Terminé con esta solución:

@Volatile var results: MutableList<CalculationResult> = mutableListOf() fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> { return Observable.create { subscriber -> Observable.concatEager(listOfCalculations.map { calculation: Calculation -> doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result }).subscribeBy( onNext = { results.add(it) subscriber.onNext(Pair("A calculation is ready", it)) }, onComplete = { subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results)) subscriber.onComplete() }, onError = { subscriber.onError(it) } ) } }


Parece que estás buscando el operador Zip .

Hay algunas formas diferentes de usarlo, así que veamos un ejemplo. Digamos que tenemos algunos observables simples de diferentes tipos:

Observable<Integer> obs1 = Observable.just(1); Observable<String> obs2 = Observable.just("Blah"); Observable<Boolean> obs3 = Observable.just(true);

La forma más sencilla de esperarlos a todos es algo como esto:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b) .subscribe(str -> System.out.println(str));

Tenga en cuenta que en la función zip, los parámetros tienen tipos concretos que corresponden a los tipos de los observables que se están comprimiendo.

Comprimir una lista de observables también es posible, ya sea directamente:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3); Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2]) .subscribe(str -> System.out.println(str));

... o ajustando la lista en un Observable<Observable<?>> :

Observable<Observable<?>> obsObs = Observable.from(obsList); Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2]) .subscribe(str -> System.out.println(str));

Sin embargo, en ambos casos, la función zip solo puede aceptar un solo parámetro Object[] ya que los tipos de los observables en la lista no se conocen de antemano, así como su número. Esto significa que la función zip tendría que verificar el número de parámetros y emitirlos en consecuencia.

De todos modos, todos los ejemplos anteriores eventualmente imprimirán 1 Blah true

EDITAR: cuando use Zip, asegúrese de que los Observables están comprimiendo emitan la misma cantidad de elementos. En los ejemplos anteriores, los tres observables emitieron un solo elemento. Si tuviéramos que cambiarlos a algo como esto:

Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

Entonces 1, Blah, True y 2, Hello, True serían los únicos elementos pasados ​​a las funciones zip. El ítem 3 nunca se comprimirá ya que los otros observables se han completado.


Probablemente miraste el operador zip que funciona con 2 Observables.

También existe el método estático Observable.zip . Tiene una forma que debería ser útil para usted:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

Puedes ver el javadoc para más.


Puede usar flatMap en caso de que tenga una composición dinámica de tareas. Algo como esto:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) { return Observable.from(tasks) //execute in parallel .flatMap(task -> task.observeOn(Schedulers.computation())) //wait, until all task are executed //be aware, all your observable should emit onComplemete event //otherwise you will wait forever .toList() //could implement more intelligent logic. eg. check that everything is successful .map(results -> true); }

Otro buen ejemplo de ejecución paralela.

Nota: Realmente no conozco sus requisitos para el manejo de errores. Por ejemplo, qué hacer si solo falla una tarea. Creo que deberías verificar este escenario.