rxkotlin - rxjava2 ejemplos
RxJava y ejecución paralela de código observador. (5)
El uso de flatMap
y la especificación para suscribirse en Schedulers.computation()
logrará la concurrencia.
Aquí hay un ejemplo más práctico usando Callable
, desde la salida, podemos ver que tomará unos 2000 milisegundos para terminar todas las tareas.
static class MyCallable implements Callable<Integer> {
private static final Object CALLABLE_COUNT_LOCK = new Object();
private static int callableCount;
@Override
public Integer call() throws Exception {
Thread.sleep(2000);
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount++;
}
}
public static int getCallableCount() {
synchronized (CALLABLE_COUNT_LOCK) {
return callableCount;
}
}
}
private static void runMyCallableConcurrentlyWithRxJava() {
long startTimeMillis = System.currentTimeMillis();
final Semaphore semaphore = new Semaphore(1);
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable())
.flatMap(new Function<MyCallable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception {
return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation());
}
})
.subscribeOn(Schedulers.computation())
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext " + o);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
if (MyCallable.getCallableCount() >= 4) {
semaphore.release();
}
}
});
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis));
}
Estoy teniendo el siguiente código usando la api RxJava Observable:
Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
observable
.buffer(10000)
.observeOn(Schedulers.computation())
.subscribe(recordInfo -> {
_logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
for(Info info : recordInfo) {
// some I/O operation logic
}
},
exception -> {
},
() -> {
});
Mi expectativa era que el código de observación, es decir, el código dentro del método subscribe (), se ejecutará en paralelo después de que haya especificado el programador de cálculo. En su lugar, el código todavía se está ejecutando secuencialmente en un solo hilo. ¿Cómo puede hacer que el código se ejecute en paralelo utilizando la api de RxJava?
Esto todavía viene en la misma secuencia. Incluso en nuevos hilos.
Observable ob3 = Observable.range (1, 5);
ob3.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer pArg0) {
return Observable.just(pArg0);
}
}).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer pArg0) {
try {
Thread.sleep(1000 - (pArg0 * 100));
System.out.println(pArg0 + " ccc " + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return pArg0;
}
}).subscribe();
Salida 1 ccc RxNewThreadScheduler-1 2 ccc RxNewThreadScheduler-1 3 ccc RxNewThreadScheduler-1 4 ccc RxNewThreadScheduler-1 5 ccc RxNewThreadScheduler-1
RxJava 2.0.5 introdujo flujos paralelos y ParallelFlowable , lo que hace que la ejecución paralela sea más simple y más declarativa.
Ya no tiene que crear Observable
/ flatMap
dentro de flatMap
, simplemente puede llamar parallel()
en Flowable
y devuelve ParallelFlowable
.
No es tan rico en funciones como un Flowable
regular, porque la concurrencia plantea muchos problemas con los contratos Rx, pero tiene map()
básico map()
, filter()
y muchos más, lo que debería ser suficiente en la mayoría de los casos.
Así que en lugar de este flujo de @LordRaydenMK responda
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
ahora puedes hacer
Flowable<Integer> vals = Flowable.range(1, 10);
vals.parallel()
.runOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.sequential()
.subscribe(val -> System.out.println(val));
RxJava a menudo se entiende mal cuando se trata de los aspectos asíncronos / multiproceso de la misma. La codificación de operaciones multiproceso es simple, pero comprender la abstracción es otra cosa.
Una pregunta común acerca de RxJava es cómo lograr la paralelización o la emisión de múltiples elementos simultáneamente desde un observable. Por supuesto, esta definición rompe el Contrato observable que establece que onNext () debe llamarse secuencialmente y nunca simultáneamente por más de un hilo a la vez.
Para lograr el paralelismo necesitas múltiples Observables.
Esto se ejecuta en un solo hilo:
Observable<Integer> vals = Observable.range(1,10);
vals.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
.subscribe(val -> System.out.println("Subscriber received "
+ val + " on "
+ Thread.currentThread().getName()));
Esto se ejecuta en múltiples hilos:
Observable<Integer> vals = Observable.range(1,10);
vals.flatMap(val -> Observable.just(val)
.subscribeOn(Schedulers.computation())
.map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));
El código y el texto provienen de esta entrada de blog.
observeOn(Schedulers.computation())
especificar subscribeOn(Schedulers.computation())
lugar de observeOn(Schedulers.computation())
para ese propósito. En subscribeOn
usted declara en qué hilo emitirá sus valores. En observeOn
usted declara en qué hilo va a manejar y los observa.