java - schedulers - Combina una lista de Observables y espera hasta que todo esté completo
rxkotlin (6)
TL; DR
¿Cómo convertir
Task.whenAll(List<Task>)
en
RxJava
?
Mi código existente usa Bolts para crear una lista de tareas asincrónicas y espera hasta que todas esas tareas finalicen antes de realizar otros pasos.
Esencialmente, construye una
List<Task>
y devuelve una
Task
única que se marca como completada cuando se completan
todas las
tareas en la lista, según el
ejemplo en el sitio de Bolts
.
Estoy buscando reemplazar
Bolts
con
RxJava
y estoy asumiendo que este método de construir una lista de tareas asíncronas (tamaño no conocido de antemano) y envolverlas todas en un solo
Observable
es posible, pero no sé cómo.
He intentado buscar en
merge
,
zip
,
concat
, etc. pero no puedo ponerme a trabajar en la
List<Observable>
que estaría creando, ya que todos parecen estar orientados a trabajar solo en dos
Observables
a la vez si Entiendo los documentos correctamente.
Estoy tratando de aprender
RxJava
y todavía soy muy nuevo, así que perdóname si esta es una pregunta obvia o se explica en los documentos en alguna parte;
He intentado buscar.
Cualquier ayuda sería muy apreciada.
Con Kotlin
Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->
})
Es importante establecer el tipo para los argumentos de la función o tendrá errores de compilación
El último tipo de argumento cambia con el número de argumento: BiFunction para 2 Function3 para 3 Function4 para 4 ...
De las sugerencias propuestas, zip() realidad combina resultados observables entre sí, lo que puede o no ser lo que se desea, pero no se hizo en la pregunta. En la pregunta, todo lo que se quería era la ejecución de cada una de las operaciones, ya sea una por una o en paralelo (lo que no se especificó, pero el ejemplo de Bolts vinculado se refería a la ejecución en paralelo). Además, zip () se completará de inmediato cuando se complete cualquiera de los observables, por lo que infringe los requisitos.
Para la ejecución paralela de Observables, flatMap () presentado en la otra respuesta está bien, pero merge() sería más directo. Tenga en cuenta que la fusión se cerrará en caso de error de cualquiera de los Observables, si prefiere posponer la salida hasta que todos los observables hayan terminado, debería mirar mergeDelayError() .
Para uno por uno, creo que se debe usar el método estático Observable.concat () . Su javadoc dice así:
concat (java.lang.Iterable> secuencias) aplana un Iterable de observables en un observable, uno tras otro, sin intercalarlos
que suena como lo que buscas si no quieres una ejecución paralela.
Además, si solo le interesa completar su tarea, no devolver valores, probablemente debería buscar Completable en lugar de Observable .
TLDR: para la ejecución individual de tareas y el evento de finalización cuando se completan, creo que Completable.concat () es el más adecuado. Para la ejecución paralela, Completable.merge () o Completable.mergeDelayError () suena como la solución. El primero se detendrá inmediatamente ante cualquier error en cualquier completable, el último los ejecutará a todos, incluso si uno de ellos tiene un error, y solo entonces informa el error.
Estoy escribiendo algunos códigos de cálculo en Kotlin con JavaRx Observables y RxKotlin. Quiero observar una lista de observables para completar y, mientras tanto, darme una actualización con el progreso y el último resultado. Al final, devuelve el mejor resultado de cálculo. Un requisito adicional era ejecutar Observables en paralelo para usar todos mis núcleos de CPU. Terminé con esta solución:
@Volatile var results: MutableList<CalculationResult> = mutableListOf()
fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {
return Observable.create { subscriber ->
Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
}).subscribeBy(
onNext = {
results.add(it)
subscriber.onNext(Pair("A calculation is ready", it))
},
onComplete = {
subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results))
subscriber.onComplete()
},
onError = {
subscriber.onError(it)
}
)
}
}
Parece que estás buscando el operador Zip .
Hay algunas formas diferentes de usarlo, así que veamos un ejemplo. Digamos que tenemos algunos observables simples de diferentes tipos:
Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);
La forma más sencilla de esperarlos a todos es algo como esto:
Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));
Tenga en cuenta que en la función zip, los parámetros tienen tipos concretos que corresponden a los tipos de los observables que se están comprimiendo.
Comprimir una lista de observables también es posible, ya sea directamente:
List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);
Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
... o ajustando la lista en un
Observable<Observable<?>>
:
Observable<Observable<?>> obsObs = Observable.from(obsList);
Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
Sin embargo, en ambos casos, la función zip solo puede aceptar un solo parámetro
Object[]
ya que los tipos de los observables en la lista no se conocen de antemano, así como su número.
Esto significa que la función zip tendría que verificar el número de parámetros y emitirlos en consecuencia.
De todos modos, todos los ejemplos anteriores eventualmente imprimirán
1 Blah true
EDITAR:
cuando use Zip, asegúrese de que los
Observables
están comprimiendo emitan la misma cantidad de elementos.
En los ejemplos anteriores, los tres observables emitieron un solo elemento.
Si tuviéramos que cambiarlos a algo como esto:
Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
Entonces
1, Blah, True
y
2, Hello, True
serían los únicos elementos pasados a las funciones zip.
El ítem
3
nunca se comprimirá ya que los otros observables se han completado.
Probablemente miraste el operador
zip
que funciona con 2 Observables.
También existe el método estático
Observable.zip
.
Tiene una forma que debería ser útil para usted:
zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
Puedes ver el javadoc para más.
Puede usar
flatMap
en caso de que tenga una composición dinámica de tareas.
Algo como esto:
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplemete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
}
Otro buen ejemplo de ejecución paralela.
Nota: Realmente no conozco sus requisitos para el manejo de errores. Por ejemplo, qué hacer si solo falla una tarea. Creo que deberías verificar este escenario.