single rxkotlin rxjava2 rxjava rxandroid patron observer example español ejemplos diseño curso java system.reactive rx-java

rxkotlin - rxjava2 ejemplos



RxJava y ejecución paralela de código observador. (5)

El uso de flatMap y la especificación para suscribirse en Schedulers.computation() logrará la concurrencia.

Aquí hay un ejemplo más práctico usando Callable , desde la salida, podemos ver que tomará unos 2000 milisegundos para terminar todas las tareas.

static class MyCallable implements Callable<Integer> { private static final Object CALLABLE_COUNT_LOCK = new Object(); private static int callableCount; @Override public Integer call() throws Exception { Thread.sleep(2000); synchronized (CALLABLE_COUNT_LOCK) { return callableCount++; } } public static int getCallableCount() { synchronized (CALLABLE_COUNT_LOCK) { return callableCount; } } } private static void runMyCallableConcurrentlyWithRxJava() { long startTimeMillis = System.currentTimeMillis(); final Semaphore semaphore = new Semaphore(1); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } Observable.just(new MyCallable(), new MyCallable(), new MyCallable(), new MyCallable()) .flatMap(new Function<MyCallable, ObservableSource<?>>() { @Override public ObservableSource<?> apply(@NonNull MyCallable myCallable) throws Exception { return Observable.fromCallable(myCallable).subscribeOn(Schedulers.computation()); } }) .subscribeOn(Schedulers.computation()) .subscribe(new Observer<Object>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Object o) { System.out.println("onNext " + o); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { if (MyCallable.getCallableCount() >= 4) { semaphore.release(); } } }); try { semaphore.acquire(); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } System.out.println("durationMillis " + (System.currentTimeMillis()-startTimeMillis)); }

Estoy teniendo el siguiente código usando la api RxJava Observable:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath()); observable .buffer(10000) .observeOn(Schedulers.computation()) .subscribe(recordInfo -> { _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId()); for(Info info : recordInfo) { // some I/O operation logic } }, exception -> { }, () -> { });

Mi expectativa era que el código de observación, es decir, el código dentro del método subscribe (), se ejecutará en paralelo después de que haya especificado el programador de cálculo. En su lugar, el código todavía se está ejecutando secuencialmente en un solo hilo. ¿Cómo puede hacer que el código se ejecute en paralelo utilizando la api de RxJava?


Esto todavía viene en la misma secuencia. Incluso en nuevos hilos.

Observable ob3 = Observable.range (1, 5);

ob3.flatMap(new Func1<Integer, Observable<Integer>>() { @Override public Observable<Integer> call(Integer pArg0) { return Observable.just(pArg0); } }).subscribeOn(Schedulers.newThread()).map(new Func1<Integer, Integer>() { @Override public Integer call(Integer pArg0) { try { Thread.sleep(1000 - (pArg0 * 100)); System.out.println(pArg0 + " ccc " + Thread.currentThread().getName()); } catch (Exception e) { e.printStackTrace(); } return pArg0; } }).subscribe();

Salida 1 ccc RxNewThreadScheduler-1 2 ccc RxNewThreadScheduler-1 3 ccc RxNewThreadScheduler-1 4 ccc RxNewThreadScheduler-1 5 ccc RxNewThreadScheduler-1


RxJava 2.0.5 introdujo flujos paralelos y ParallelFlowable , lo que hace que la ejecución paralela sea más simple y más declarativa.

Ya no tiene que crear Observable / flatMap dentro de flatMap , simplemente puede llamar parallel() en Flowable y devuelve ParallelFlowable .

No es tan rico en funciones como un Flowable regular, porque la concurrencia plantea muchos problemas con los contratos Rx, pero tiene map() básico map() , filter() y muchos más, lo que debería ser suficiente en la mayoría de los casos.

Así que en lugar de este flujo de @LordRaydenMK responda

Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));

ahora puedes hacer

Flowable<Integer> vals = Flowable.range(1, 10); vals.parallel() .runOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .sequential() .subscribe(val -> System.out.println(val));


RxJava a menudo se entiende mal cuando se trata de los aspectos asíncronos / multiproceso de la misma. La codificación de operaciones multiproceso es simple, pero comprender la abstracción es otra cosa.

Una pregunta común acerca de RxJava es cómo lograr la paralelización o la emisión de múltiples elementos simultáneamente desde un observable. Por supuesto, esta definición rompe el Contrato observable que establece que onNext () debe llamarse secuencialmente y nunca simultáneamente por más de un hilo a la vez.

Para lograr el paralelismo necesitas múltiples Observables.

Esto se ejecuta en un solo hilo:

Observable<Integer> vals = Observable.range(1,10); vals.subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName()));

Esto se ejecuta en múltiples hilos:

Observable<Integer> vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).subscribe(val -> System.out.println(val));

El código y el texto provienen de esta entrada de blog.


observeOn(Schedulers.computation()) especificar subscribeOn(Schedulers.computation()) lugar de observeOn(Schedulers.computation()) para ese propósito. En subscribeOn usted declara en qué hilo emitirá sus valores. En observeOn usted declara en qué hilo va a manejar y los observa.