single rxjava how examples example curso create java multithreading reactive-programming rx-java

how - RxJava Observando en el hilo de llamada/suscripción



rxjava single example (2)

Tengo algunos problemas para entender cómo suscribe On / observationOn funciona en RxJava. Creé una aplicación simple con observable que emite nombres de planetas del sistema solar, realiza algunos mapas y filtra e imprime resultados.

Según entiendo, la programación del trabajo en el hilo de fondo se realiza mediante el operador subscribeOn (y parece funcionar bien).

Observar en el hilo de fondo también funciona bien con el operador observeOn .

Pero tengo problemas para entender cómo observar en el hilo de llamada (ya sea si es un hilo principal o cualquier otro). Se puede realizar fácilmente en Android con el operador AndroidSchedulers.mainThread() , pero no sé cómo lograrlo en Java puro.

Aquí está mi código:

public class Main { public static void main(String[] args) throws InterruptedException { ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 5, 3000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); System.out.println("Main thread: " + getCurrentThreadInfo()); Observable<String> stringObservable = Observable.from(Arrays.asList("Merkury", "Wenus", "Ziemia", "Mars", "Jowisz", "Saturn", "Uran", "Neptun", "Pluton")) .map(in -> { System.out.println("map on: " + getCurrentThreadInfo()); return in.toUpperCase(); }) .filter(in -> { System.out.println("filter on: " + getCurrentThreadInfo()); return in.contains("A"); }) .subscribeOn(Schedulers.from(executor)); for (int i = 0; i < 5; i++) { Thread thread = new Thread("Thread-" + i) { @Override public void run() { stringObservable .buffer(5) .subscribe(s -> System.out.println("Result " + s + " on: " + getCurrentThreadInfo())); } }; thread.start(); } } private static String getCurrentThreadInfo() { return Thread.currentThread().getName() + "(" + Thread.currentThread().getId() + ")"; } }

Observable en creado y trabajo está suscrito en uno de tres hilos del ejecutor. Esto funciona como se esperaba ¿Pero cómo observar resultados en esos hilos creados dinámicamente en for loop? ¿Hay alguna manera de crear Scheduler a partir del hilo actual?

Además, descubrí que después de ejecutar este código, nunca termina y no sé por qué. :(


Aquí hay un ejemplo simplificado actualizado para RxJava 2. Es el mismo concepto que la respuesta de Marek: un ejecutor que agrega los ejecutables a un BlockingQueue que se está consumiendo en el hilo de la persona que llama.

public class ThreadTest { @Test public void test() throws InterruptedException { final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(); System.out.println("Caller thread: " + Thread.currentThread().getName()); Observable.fromCallable(new Callable<Integer>() { @Override public Integer call() throws Exception { System.out.println("Observable thread: " + Thread.currentThread().getName()); return 1; } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.from(new Executor() { @Override public void execute(@NonNull Runnable runnable) { tasks.add(runnable); } })) .subscribe(new Consumer<Integer>() { @Override public void accept(@NonNull Integer integer) throws Exception { System.out.println("Observer thread: " + Thread.currentThread().getName()); } }); tasks.take().run(); } } // Output: // Caller thread main // Observable thread RxCachedThreadScheduler-1 // Observer thread main


Para responder a su pregunta, permítame comenzar desde el principio, esto permite que otras personas comprendan lo que usted ya sabe.

Programadores

Los programadores juegan el mismo rol que los ejecutores de Java. En resumen, deciden qué acciones de subproceso se ejecutan.

Por lo general, un Observable y los operadores se ejecutan en el hilo actual. A veces puede pasar Scheduler a Observable o al operador como un parámetro (por ejemplo, Observable.timer ()).

Además, RxJava proporciona 2 operadores para especificar Scheduler:

  • subscribeOn: especifique el Programador en el que funcionará un Observable
  • observeOn: especifique el Programador en el que un observador observará este Observable

Para entenderlos rápidamente, uso un código de ejemplo:

En todas las muestras, usaré helper createObservable, que emite un nombre de hilo en el que opera el Observable:

public static Observable<String> createObservable(){ return Observable.create((Subscriber<? super String> subscriber) -> { subscriber.onNext(Thread.currentThread().getName()); subscriber.onCompleted(); } ); }

Sin planificadores:

createObservable().subscribe(message -> { System.out.println("Case 1 Observer thread " + message); System.out.println("Case 1 Observable thread " + Thread.currentThread().getName()); }); //will print: //Case 1 Observer thread main //Case 1 Observable thread main

Suscribir:

createObservable() .subscribeOn(Schedulers.newThread()) .subscribe(message -> { System.out.println("Case 2 Observer thread " + message); System.out.println("Case 2 Observable thread " + Thread.currentThread().getName()); }); //will print: //Case 2 Observer thread RxNewThreadScheduler-1 //Case 2 Observable thread RxNewThreadScheduler-1

Suscríbete y observa:

reateObservable() .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.newThread()) .subscribe(message -> { System.out.println("Case 3 Observer thread " + message); System.out.println("Case 3 Observable thread " + Thread.currentThread().getName()); }); //will print: //Case 3 Observer thread RxNewThreadScheduler-2 //Case 3 Observable thread RxNewThreadScheduler-1

ObserveOn:

createObservable() .observeOn(Schedulers.newThread()) .subscribe(message -> { System.out.println("Case 4 Observer thread " + message); System.out.println("Case 4 Observable thread " + Thread.currentThread().getName()); }); //will print: //Case 4 Observer thread main //Case 4 Observable thread RxNewThreadScheduler-1

Responder:

AndroidSchedulers.mainThread () devuelve un sheduler que delega el trabajo a MessageQueue asociado con el hilo principal.
Para este fin, usa android.os.Looper.getMainLooper () y android.os.Handler.

En otras palabras, si desea especificar un hilo en particular, debe proporcionar los medios para programar y realizar tareas en el hilo.

Debajo puede usar cualquier tipo de MQ para almacenar tareas y lógica que bucles el Quee y ejecutar tareas.

En java, tenemos un ejecutor designado para tales tareas. RxJava puede crear fácilmente el Programador desde dicho Ejecutor.

A continuación se muestra un ejemplo que muestra cómo se puede observar en el hilo principal (no es útil en particular, pero muestra todas las partes necesarias).

public class RunCurrentThread implements Executor { private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>(); public static void main(String[] args) throws InterruptedException { RunCurrentThread sample = new RunCurrentThread(); sample.observerOnMain(); sample.runLoop(); } private void observerOnMain() { createObservable() .subscribeOn(Schedulers.newThread()) .observeOn(Schedulers.from(this)) .subscribe(message -> { System.out.println("Observer thread " + message); System.out.println("Observable thread " + Thread.currentThread().getName()); }); ; } public Observable<String> createObservable() { return Observable.create((Subscriber<? super String> subscriber) -> { subscriber.onNext(Thread.currentThread().getName()); subscriber.onCompleted(); } ); } private void runLoop() throws InterruptedException { while(!Thread.interrupted()){ tasks.take().run(); } } @Override public void execute(Runnable command) { tasks.add(command); }

}

Y la última pregunta, por qué tu código no termina:

ThreadPoolExecutor utiliza subprocesos no deamon por defult, por lo que su programa no termina hasta que existan. Debe usar el método de shutdown para cerrar los hilos.