RxJava - Guía rápida
RxJava es una extensión de ReactiveX basada en Java. Proporciona implementación o proyecto ReactiveX en Java. A continuación se presentan las características clave de RxJava.
Extiende el patrón del observador.
Soporta secuencias de datos / eventos.
Proporciona operadores para componer secuencias juntas de forma declarativa.
Maneja el subprocesamiento, la sincronización, la seguridad de subprocesos y las estructuras de datos concurrentes internamente.
¿Qué es ReactiveX?
ReactiveX es un proyecto que tiene como objetivo proporcionar un concepto de programación reactiva a varios lenguajes de programación. La programación reactiva se refiere al escenario donde el programa reacciona cuando aparecen los datos. Es un concepto de programación basado en eventos y los eventos pueden propagarse a los observadores de registros.
Según el Reactive, han combinado lo mejor del patrón Observer, el patrón Iterator y el patrón funcional.
El patrón Observer bien hecho. ReactiveX es una combinación de las mejores ideas del patrón Observer, el patrón Iterator y la programación funcional.
Programación funcional
La programación funcional gira en torno a la construcción del software utilizando funciones puras. Una función pura no depende del estado anterior y siempre devuelve el mismo resultado para los mismos parámetros pasados. Las funciones puras ayudan a evitar problemas asociados con objetos compartidos, datos mutables y efectos secundarios que a menudo prevalecen en entornos de subprocesos múltiples.
Programación reactiva
La programación reactiva se refiere a la programación impulsada por eventos en la que los flujos de datos vienen de forma asincrónica y se procesan cuando llegan.
Programación funcional reactiva
RxJava implementa ambos conceptos juntos, donde los datos de los flujos cambian con el tiempo y la función del consumidor reacciona en consecuencia.
El Manifiesto Reactivo
Reactive Manifesto es un documento en línea que establece el alto nivel de los sistemas de software de aplicación. Según el manifiesto, los siguientes son los atributos clave de un software reactivo:
Responsive - Siempre debe responder de manera oportuna.
Message Driven - Debe utilizar el paso de mensajes asincrónico entre componentes para que mantengan un acoplamiento suelto.
Elastic - Debe seguir respondiendo incluso bajo una carga alta.
Resilient - Debe seguir respondiendo incluso si falla algún componente.
Componentes clave de RxJava
RxJava tiene dos componentes clave: Observables y Observer.
Observable - Representa un objeto similar a Stream que puede emitir cero o más datos, puede enviar mensaje de error, cuya velocidad puede controlarse mientras emite un conjunto de datos, puede enviar datos finitos e infinitos.
Observer- Se suscribe a los datos de secuencia de Observable y reacciona por elemento de los observables. Los observadores son notificados cada vez que Observable emite un dato. Un observador maneja los datos uno por uno.
Nunca se notifica a un observador si los elementos no están presentes o si no se devuelve una devolución de llamada para un elemento anterior.
Configuración del entorno local
RxJava es una biblioteca para Java, por lo que el primer requisito es tener JDK instalado en su máquina.
Requisitos del sistema
JDK | 1,5 o superior. |
---|---|
Memoria | Sin requisitos mínimos. |
Espacio del disco | Sin requisitos mínimos. |
Sistema operativo | Sin requisitos mínimos. |
Paso 1: verificar la instalación de Java en su máquina
En primer lugar, abra la consola y ejecute un comando java basado en el sistema operativo en el que está trabajando.
SO | Tarea | Mando |
---|---|---|
Ventanas | Abrir consola de comandos | c: \> java -version |
Linux | Terminal de comando abierto | $ java -version |
Mac | Terminal abierta | máquina: <joseph $ java -version |
Verifiquemos la salida para todos los sistemas operativos:
SO | Salida |
---|---|
Ventanas | versión de Java "1.8.0_101" Entorno de ejecución Java (TM) SE (compilación 1.8.0_101) |
Linux | versión de Java "1.8.0_101" Entorno de ejecución Java (TM) SE (compilación 1.8.0_101) |
Mac | versión de Java "1.8.0_101" Entorno de ejecución Java (TM) SE (compilación 1.8.0_101) |
Si no tiene Java instalado en su sistema, descargue Java Software Development Kit (SDK) desde el siguiente enlace https://www.oracle.com. Asumimos Java 1.8.0_101 como la versión instalada para este tutorial.
Paso 2: configurar el entorno JAVA
Selecciona el JAVA_HOMEvariable de entorno para apuntar a la ubicación del directorio base donde está instalado Java en su máquina. Por ejemplo.
SO | Salida |
---|---|
Ventanas | Establezca la variable de entorno JAVA_HOME en C: \ Archivos de programa \ Java \ jdk1.8.0_101 |
Linux | exportar JAVA_HOME = / usr / local / java-current |
Mac | exportar JAVA_HOME = / Library / Java / Home |
Agregue la ubicación del compilador de Java a la ruta del sistema.
SO | Salida |
---|---|
Ventanas | Añade la cadena C:\Program Files\Java\jdk1.8.0_101\bin al final de la variable del sistema, Path. |
Linux | export PATH = $ PATH: $ JAVA_HOME / bin / |
Mac | no requerido |
Verifique la instalación de Java usando el comando java -version como se explicó anteriormente.
Paso 3: descarga el archivo RxJava2
Descargue la última versión del archivo jar de RxJava de RxJava @ MVNRepository y su dependencia Reactive Streams @ MVNRepository . En el momento de escribir este tutorial, hemos descargado rxjava-2.2.4.jar, reactive-streams-1.0.2.jar y lo hemos copiado en la carpeta C: \> RxJava.
SO | Nombre de archivo |
---|---|
Ventanas | rxjava-2.2.4.jar, corrientes-reactivas-1.0.2.jar |
Linux | rxjava-2.2.4.jar, corrientes-reactivas-1.0.2.jar |
Mac | rxjava-2.2.4.jar, corrientes-reactivas-1.0.2.jar |
Paso 4: configurar el entorno RxJava
Selecciona el RX_JAVAvariable de entorno para apuntar a la ubicación del directorio base donde se almacena el jar de RxJava en su máquina. Supongamos que hemos almacenado rxjava-2.2.4.jar y reactive-streams-1.0.2.jar en la carpeta RxJava.
No Señor | SO y descripción |
---|---|
1 | Windows Establezca la variable de entorno RX_JAVA en C: \ RxJava |
2 | Linux exportar RX_JAVA = / usr / local / RxJava |
3 | Mac exportar RX_JAVA = / Library / RxJava |
Paso 5: establecer la variable CLASSPATH
Selecciona el CLASSPATH variable de entorno para apuntar a la ubicación del jar de RxJava.
No Señor | SO y descripción |
---|---|
1 | Windows Establezca la variable de entorno CLASSPATH en% CLASSPATH%;% RX_JAVA% \ rxjava-2.2.4.jar;% RX_JAVA% \ reactive-streams-1.0.2.jar;.; |
2 | Linux export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :. |
3 | Mac export CLASSPATH = $ CLASSPATH: $ RX_JAVA / rxjava-2.2.4.jar: reactive-streams-1.0.2.jar :. |
Paso 6: probar la configuración de RxJava
Cree una clase TestRx.java como se muestra a continuación -
import io.reactivex.Flowable;
public class TestRx {
public static void main(String[] args) {
Flowable.just("Hello World!")
.subscribe(System.out::println);
}
}
Paso 7: verificar el resultado
Compila las clases usando javac compilador de la siguiente manera:
C:\RxJava>javac Tester.java
Verifique la salida.
Hello World!
Observables representa las fuentes de datos donde como Observers (Subscribers)escúchalos. En pocas palabras, un Observable emite elementos y un Suscriptor luego los consume.
Observable
Observable proporciona datos una vez que el suscriptor comienza a escuchar.
Observable puede emitir cualquier número de elementos.
Observable también puede emitir una señal de finalización sin ningún elemento.
Observable puede terminar con éxito.
Observable puede no terminar nunca. por ejemplo, se puede hacer clic en un botón tantas veces como desee.
Observable puede arrojar un error en cualquier momento.
Abonado
Observable puede tener varios suscriptores.
Cuando un Observable emite un elemento, se invoca a cada suscriptor del método onNext ().
Cuando un Observable termina de emitir elementos, se invoca a cada suscriptor del método onComplete ().
Si un Observable emite un error, se invoca cada método onError () de suscriptor.
Las siguientes son las clases base para crear observables.
Flowable- 0..N flujos, emite 0 o n elementos. Soporta Reactive-Streams y contrapresión.
Observable - Flujos 0..N, pero sin contrapresión.
Single- 1 artículo o error. Puede tratarse como una versión reactiva de la llamada al método.
Completable- Ningún artículo emitido. Se utiliza como señal de finalización o error. Puede tratarse como una versión reactiva de Runnable.
MayBe- No se emitió ningún artículo o se emitió 1 artículo. Puede tratarse como una versión reactiva de Opcional.
A continuación se muestran los métodos convenientes para crear observables en la clase Observable.
just(T item) - Devuelve un Observable que señala el elemento dado (referencia constante) y luego lo completa.
fromIterable(Iterable source) - Convierte una secuencia Iterable en un ObservableSource que emite los elementos de la secuencia.
fromArray(T... items) - Convierte una matriz en una fuente observable que emite los elementos de la matriz.
fromCallable(Callable supplier) - Devuelve un Observable que, cuando un observador se suscribe, invoca una función que especifiques y luego emite el valor devuelto por esa función.
fromFuture(Future future) - Convierte un futuro en una fuente observable.
interval(long initialDelay, long period, TimeUnit unit) - Devuelve un Observable que emite un 0L después del InitialDelay y números cada vez mayores después de cada período de tiempo a partir de entonces.
La clase Single representa la respuesta de valor único. Un solo observable solo puede emitir un valor exitoso o un error. No emite un evento onComplete.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.Single<T> clase -
public abstract class Single<T>
extends Object
implements SingleSource<T>
Protocolo
A continuación se muestra el protocolo secuencial que opera Single Observable:
onSubscribe (onSuccess | onError)?
Ejemplo único
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create the observable
Single<String> testSingle = Single.just("Hello World");
//Create an observer
Disposable disposable = testSingle
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Hello World
La clase MayBe representa una respuesta diferida. MayBe observable puede emitir un solo valor exitoso o ningún valor.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.Single<T> clase -
public abstract class Maybe<T>
extends Object
implements MaybeSource<T>
Protocolo
A continuación se muestra el protocolo secuencial que opera MayBe Observable:
onSubscribe (onSuccess | onError | OnComplete)?
MayBe Ejemplo
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Maybe;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Maybe.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Hello World
La clase Completable representa una respuesta diferida. Completable observable puede indicar una finalización exitosa o un error.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.Completable clase -
public abstract class Completable
extends Object
implements CompletableSource
Protocolo
A continuación se muestra el protocolo secuencial que opera Completable Observable:
onSubscribe (onError | onComplete)?
Ejemplo completo
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.concurrent.TimeUnit;
import io.reactivex.Completable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableCompletableObserver;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
//Create an observer
Disposable disposable = Completable.complete()
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableCompletableObserver() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onStart() {
System.out.println("Started!");
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
//start observing
disposable.dispose();
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Started!
Done!
La clase CompositeDisposable representa un contenedor que puede contener varios desechables y ofrece la complejidad O (1) de agregar y quitar desechables.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.disposables.CompositeDisposable clase -
public final class CompositeDisposable
extends Object
implements Disposable, io.reactivex.internal.disposables.DisposableContainer
Ejemplo de material compuesto desechable
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableMaybeObserver;
import io.reactivex.observers.DisposableSingleObserver;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
CompositeDisposable compositeDisposable = new CompositeDisposable();
//Create an Single observer
Disposable disposableSingle = Single.just("Hello World")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(
new DisposableSingleObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
});
//Create an observer
Disposable disposableMayBe = Maybe.just("Hi")
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableMaybeObserver<String>() {
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onSuccess(String value) {
System.out.println(value);
}
@Override
public void onComplete() {
System.out.println("Done!");
}
});
Thread.sleep(3000);
compositeDisposable.add(disposableSingle);
compositeDisposable.add(disposableMayBe);
//start observing
compositeDisposable.dispose();
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Hello World
Hi
A continuación se muestran los operadores que se utilizan para crear un Observable.
No Señor. | Operador y descripción |
---|---|
1 | Create Crea un Observable desde cero y permite que el método del observador llame mediante programación. |
2 | Defer No cree un Observable hasta que un observador se suscriba. Crea un observable nuevo para cada observador. |
3 | Empty/Never/Throw Crea un Observable con comportamiento limitado. |
4 | From Convierte una estructura de objeto / datos en un Observable. |
5 | Interval Crea un Observable que emite enteros en secuencia con un intervalo de tiempo especificado. |
6 | Just Convierte un objeto / estructura de datos en un Observable para emitir el mismo o el mismo tipo de objetos. |
7 | Range Crea un Observable que emite enteros en secuencia de un rango dado. |
8 | Repeat Crea un Observable emitiendo enteros en secuencia repetidamente. |
9 | Start Crea un Observable para emitir el valor de retorno de una función. |
10 | Timer Crea un Observable para emitir un solo elemento después de un retraso determinado. |
Ejemplo de creación de operador
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using fromArray operator to create an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
ABCDEFG
A continuación se muestran los operadores que se utilizan para transformar un elemento emitido desde un Observable.
No Señor. | Operador y descripción |
---|---|
1 | Buffer Reúne elementos de Observable en paquetes periódicamente y luego los emite en lugar de elementos. |
2 | FlatMap Se utiliza en observables anidados. Transforma elementos en observables. Luego aplana los elementos en un solo Observable. |
3 | GroupBy Divida un Observable en un conjunto de Observables organizados por clave para emitir diferentes grupos de elementos. |
4 | Map Aplica una función a cada elemento emitido para transformarlo. |
5 | Scan Aplicar una función a cada elemento emitido, secuencialmente y luego emitir el valor sucesivo. |
6 | Window Reúne elementos de Observable en Observable windows periódicamente y luego emite las ventanas en lugar de elementos. |
Ejemplo de operador transformador
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using map operator to transform an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.map(String::toUpperCase)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
ABCDEFG
A continuación se muestran los operadores que se utilizan para emitir selectivamente elementos desde un Observable.
No Señor. | Operador y descripción |
---|---|
1 | Debounce Emite elementos solo cuando se agota el tiempo de espera sin emitir otro elemento. |
2 | Distinct Emite solo elementos únicos. |
3 | ElementAt emitir solo elemento en n índice emitido por un Observable. |
4 | Filter Emite solo aquellos elementos que pasan la función de predicado dada. |
5 | First Emite el primer elemento o el primer elemento que pasó los criterios dados. |
6 | IgnoreElements No emite ningún elemento de Observable pero marca la finalización. |
7 | Last Emite el último elemento de Observable. |
8 | Sample Emite el elemento más reciente con un intervalo de tiempo determinado. |
9 | Skip Omite los primeros n elementos de un Observable. |
10 | SkipLast Omite los últimos n elementos de un Observable. |
11 | Take toma los primeros n elementos de un Observable. |
12 | TakeLast toma los últimos n elementos de un Observable. |
Ejemplo de operador de filtrado
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using take operator to filter an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable
.take(2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
ab
A continuación se muestran los operadores que se utilizan para crear un único Observable a partir de múltiples Observables.
No Señor. | Operador y descripción |
---|---|
1 | And/Then/When Combine conjuntos de artículos utilizando los intermediarios Patrón y Plan. |
2 | CombineLatest Combine el último elemento emitido por cada Observable a través de una función específica y emita el elemento resultante. |
3 | Join Combine los elementos emitidos por dos Observables si se emitieron durante el marco de tiempo del segundo elemento Observable emitido. |
4 | Merge Combina los elementos emitidos de Observables. |
5 | StartWith Emite una secuencia específica de elementos antes de comenzar a emitir los elementos de la fuente Observable |
6 | Switch Emite los elementos más recientes emitidos por Observables. |
7 | Zip Combina elementos de Observables en función de la función y emite los elementos resultantes. |
Ejemplo de operador de combinación
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using combineLatest operator to combine Observables
public class ObservableTester {
public static void main(String[] args) {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.combineLatest(observable1, observable2, (a,b) -> a + b)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
g1g2g3g4g5g6
A continuación se muestran los operadores que a menudo son útiles con Observables.
No Señor. | Operador y descripción |
---|---|
1 | Delay Registrar acciones para manejar eventos observables del ciclo de vida. |
2 | Materialize/Dematerialize Representa el elemento emitido y la notificación enviada. |
3 | ObserveOn Especifique el planificador que se observará. |
4 | Serialize Force Observable para realizar llamadas serializadas. |
5 | Subscribe Operar sobre las emisiones de elementos y notificaciones como completa de un Observable |
6 | SubscribeOn Especifique el planificador que utilizará un Observable cuando esté suscrito. |
7 | TimeInterval Convierta un Observable para emitir indicaciones de la cantidad de tiempo transcurrido entre las emisiones. |
8 | Timeout Emite una notificación de error si ocurre un tiempo especificado sin emitir ningún elemento. |
9 | Timestamp Adjunte una marca de tiempo a cada elemento emitido. |
9 | Using Crea un recurso desechable o la misma vida útil que el de Observable. |
Ejemplo de operador de servicios públicos
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using subscribe operator to subscribe to an Observable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable = Observable.fromArray(letters);
observable.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
abcdefg
A continuación se muestran los operadores que evalúan uno o varios Observables o elementos emitidos.
No Señor. | Operador y descripción |
---|---|
1 | All Evalúa todos los elementos emitidos para cumplir con los criterios dados. |
2 | Amb Emite todos los elementos del primer Observable solo dados múltiples Observables. |
3 | Contains Comprueba si un Observable emite un elemento en particular o no. |
4 | DefaultIfEmpty Emite un elemento predeterminado si el Observable no emite nada. |
5 | SequenceEqual Comprueba si dos Observables emiten la misma secuencia de elementos. |
6 | SkipUntil Descarta los elementos emitidos por el primer Observable hasta que un segundo Observable emite un elemento. |
7 | SkipWhile Deseche los elementos emitidos por un Observable hasta que una condición determinada se vuelva falsa. |
8 | TakeUntil Descarta elementos emitidos por un Observable después de que un segundo Observable emite un elemento o termina. |
9 | TakeWhile Deseche los elementos emitidos por un Observable después de que una condición especificada se vuelva falsa. |
Ejemplo de operador condicional
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using defaultIfEmpty operator to operate on an Observable
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result = new StringBuilder();
Observable.empty()
.defaultIfEmpty("No Data")
.subscribe(s -> result.append(s));
System.out.println(result);
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result1 = new StringBuilder();
Observable.fromArray(letters)
.firstElement()
.defaultIfEmpty("No data")
.subscribe(s -> result1.append(s));
System.out.println(result1);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
No Data
a
A continuación se muestran los operadores que operan en elementos completos emitidos por un Observable.
No Señor. | Operador y descripción |
---|---|
1 | Average Evalúa promedios de todos los elementos y emite el resultado. |
2 | Concat Emite todos los elementos de varios Observable sin intercalar. |
3 | Count Cuenta todos los elementos y emite el resultado. |
4 | Max Evalúa el elemento de valor máximo de todos los elementos y emite el resultado. |
5 | Min Evalúa el elemento con valor mínimo de todos los elementos y emite el resultado. |
6 | Reduce Aplique una función en cada elemento y devuelva el resultado. |
7 | Sum Evalúa la suma de todos los elementos y emite el resultado. |
Ejemplo de operador matemático
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
//Using concat operator to operate on multiple Observables
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Integer[] numbers = { 1, 2, 3, 4, 5, 6};
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
Observable<String> observable1 = Observable.fromArray(letters);
Observable<Integer> observable2 = Observable.fromArray(numbers);
Observable.concat(observable1, observable2)
.subscribe( letter -> result.append(letter));
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
abcdefg123456
A continuación se muestran los operadores que tienen un control más preciso sobre la suscripción.
No Señor. | Operador y descripción |
---|---|
1 | Connect Instruya a un Observable conectable para que emita elementos a sus suscriptores. |
2 | Publish Convierte un Observable en Observable conectable. |
3 | RefCount Convierte un Observable conectable en Observable ordinario. |
4 | Replay Asegúrese de que cada suscriptor vea la misma secuencia de elementos emitidos, incluso después de que el Observable haya comenzado a emitir elementos y los suscriptores se suscriban más tarde. |
Ejemplo de operador conectable
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.observables.ConnectableObservable;
//Using connect operator on a ConnectableObservable
public class ObservableTester {
public static void main(String[] args) {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
final StringBuilder result = new StringBuilder();
ConnectableObservable<String> connectable = Observable.fromArray(letters).publish();
connectable.subscribe(letter -> result.append(letter));
System.out.println(result.length());
connectable.connect();
System.out.println(result.length());
System.out.println(result);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
0
7
abcdefg
Según el Reactive, un Sujeto puede actuar tanto como Observable como Observador.
Un sujeto es una especie de puente o proxy que está disponible en algunas implementaciones de ReactiveX que actúa como observador y como observable. Debido a que es un observador, puede suscribirse a uno o más Observables, y debido a que es un Observable, puede pasar a través de los elementos que observa reemitiéndolos, y también puede emitir nuevos elementos.
Hay cuatro tipos de sujetos:
No Señor. | Descripcion del sujeto |
---|---|
1 | Publish Subject Emite solo aquellos elementos que se emiten después del momento de la suscripción. |
2 | Replay Subject Emite todos los elementos emitidos por fuente Observable independientemente de cuándo se haya suscrito el Observable. |
3 | Behavior Subject Tras la suscripción, emite el elemento más reciente y luego continúa emitiendo el elemento emitido por la fuente Observable. |
4 | Async Subject Emite el último elemento emitido por la fuente Observable después de que se completa la emisión. |
PublishSubject emite elementos a los observadores actualmente suscritos y eventos terminales a los observadores actuales o tardíos.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.subjects.PublishSubject<T> clase -
public final class PublishSubject<T>
extends Subject<T>
Ejemplo de PublishSubject
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.PublishSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be d only
//as subscribed after c item emitted.
System.out.println(result2);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
abcd
d
BehaviorSubject emite el elemento más reciente que ha observado y luego todos los elementos observados subsiguientes a cada observador suscrito.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.subjects.BehaviorSubject<T> clase -
public final class BehaviorSubject<T>
extends Subject<T>
BehaviorSubject Ejemplo
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.BehaviorSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
BehaviorSubject<String> subject = BehaviorSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be cd being BehaviorSubject
//(c is last item emitted before subscribe)
System.out.println(result2);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
abcd
cd
ReplaySubject reproduce eventos / elementos para los observadores actuales y tardíos.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.subjects.ReplaySubject<T> clase -
public final class ReplaySubject<T>
extends Subject<T>
Ejemplo de ReplaySubject
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects.ReplaySubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be abcd
System.out.println(result1);
//Output will be abcd being ReplaySubject
//as ReplaySubject emits all the items
System.out.println(result2);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
abcd
abcd
AsyncSubject emite el único último valor seguido de un evento de finalización o el error recibido a los observadores.
Declaración de clase
A continuación se muestra la declaración de io.reactivex.subjects.AsyncSubject<T> clase -
public final class AsyncSubject<T>
extends Subject<T>
Ejemplo de AsyncSubject
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.subjects. AsyncSubject;
public class ObservableTester {
public static void main(String[] args) {
final StringBuilder result1 = new StringBuilder();
final StringBuilder result2 = new StringBuilder();
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(value -> result1.append(value) );
subject.onNext("a");
subject.onNext("b");
subject.onNext("c");
subject.subscribe(value -> result2.append(value));
subject.onNext("d");
subject.onComplete();
//Output will be d being the last item emitted
System.out.println(result1);
//Output will be d being the last item emitted
System.out.println(result2);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
d
d
Los programadores se utilizan en entornos de subprocesos múltiples para trabajar con operadores observables.
Según el Reactive, Scheduler se utilizan para programar cómo se aplicará la cadena de operadores a diferentes subprocesos.
De forma predeterminada, un Observable y la cadena de operadores que le apliques harán su trabajo y notificarán a sus observadores en el mismo hilo en el que se llama a su método Subscribe. El operador SubscribeOn cambia este comportamiento especificando un Programador diferente en el que debería operar el Observable. El operador ObserveOn especifica un Programador diferente que el Observable utilizará para enviar notificaciones a sus observadores.
Hay los siguientes tipos de programadores disponibles en RxJava:
No Señor. | Programador y descripción |
---|---|
1 | Schedulers.computation() Crea y devuelve un Programador destinado al trabajo computacional. El recuento de subprocesos que se programarán depende de las CPU presentes en el sistema. Se permite un hilo por CPU. Ideal para bucles de eventos o operaciones de devolución de llamada. |
2 | Schedulers.io() Crea y devuelve un planificador destinado al trabajo vinculado a IO. El grupo de subprocesos puede extenderse según sea necesario. |
3 | Schedulers.newThread() Crea y devuelve un planificador que crea un nuevo subproceso para cada unidad de trabajo. |
4 | Schedulers.trampoline() Crea y devuelve un Programador que pone en cola el trabajo en el hilo actual para que se ejecute después de que se complete el trabajo actual. |
4 | Schedulers.from(java.util.concurrent.Executor executor) Convierte un Ejecutor en una nueva instancia de Programador. |
El método Schedulers.trampoline () crea y devuelve un Programador que pone en cola el trabajo en el hilo actual para que se ejecute después de que se complete el trabajo actual.
Ejemplo de Schedulers.trampoline ()
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.trampoline()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Processing Thread main
Receiver Thread main, Item length 1
Processing Thread main
Receiver Thread main, Item length 2
Processing Thread main
Receiver Thread main, Item length 3
El método Schedulers.newThread () crea y devuelve un Scheduler que crea un nuevo Thread para cada unidad de trabajo.
Ejemplo de Schedulers.newThread ()
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.newThread()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Processing Thread RxNewThreadScheduler-1
Receiver Thread RxNewThreadScheduler-1, Item length 1
Processing Thread RxNewThreadScheduler-2
Receiver Thread RxNewThreadScheduler-2, Item length 2
Processing Thread RxNewThreadScheduler-3
Receiver Thread RxNewThreadScheduler-3, Item length 3
El método Schedulers.computation () crea y devuelve un Scheduler destinado al trabajo computacional. El recuento de subprocesos que se programarán depende de las CPU presentes en el sistema. Se permite un hilo por CPU. Ideal para bucles de eventos o operaciones de devolución de llamada.
Ejemplo de Schedulers.computation ()
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Processing Thread RxComputationThreadPool-1
Receiver Thread RxComputationThreadPool-1, Item length 1
Processing Thread RxComputationThreadPool-2
Receiver Thread RxComputationThreadPool-2, Item length 2
Processing Thread RxComputationThreadPool-3
Receiver Thread RxComputationThreadPool-3, Item length 3
El método Schedulers.io () crea y devuelve un Programador destinado al trabajo vinculado a IO. El grupo de subprocesos puede extenderse según sea necesario. Ideal para operaciones intensivas de E / S.
Ejemplo de Schedulers.io ()
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.Random;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.io()))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 1
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 2
Processing Thread RxCachedThreadScheduler-1
Receiver Thread RxCachedThreadScheduler-1, Item length 3
El método Schedulers.from (Executor) convierte un Executor en una nueva instancia de Scheduler.
Programadores. De (Ejecutor) Ejemplo
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import java.util.Random;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable.just("A", "AB", "ABC")
.flatMap(v -> getLengthWithDelay(v)
.doOnNext(s -> System.out.println("Processing Thread "
+ Thread.currentThread().getName()))
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(3))))
.subscribe(length -> System.out.println("Receiver Thread "
+ Thread.currentThread().getName()
+ ", Item length " + length));
Thread.sleep(10000);
}
protected static Observable<Integer> getLengthWithDelay(String v) {
Random random = new Random();
try {
Thread.sleep(random.nextInt(3) * 1000);
return Observable.just(v.length());
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Processing Thread pool-1-thread-1
Processing Thread pool-3-thread-1
Receiver Thread pool-1-thread-1, Item length 1
Processing Thread pool-4-thread-1
Receiver Thread pool-4-thread-1, Item length 3
Receiver Thread pool-3-thread-1, Item length 2
El operador de almacenamiento en búfer permite reunir elementos emitidos por un Observable en una lista o paquetes y emitir esos paquetes en lugar de elementos. En el siguiente ejemplo, hemos creado un Observable para emitir 9 elementos y, al utilizar el almacenamiento en búfer, se emitirán 3 elementos juntos.
Ejemplo de almacenamiento en búfer
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.buffer(3)
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(List<Integer> integers) {
System.out.println("onNext: ");
for (Integer value : integers) {
System.out.println(value);
}
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!
El operador de ventana funciona de manera similar al operador de búfer, pero permite recopilar elementos emitidos por un Observable en otro observable en lugar de colección y emitir esos Observables en lugar de colecciones. En el siguiente ejemplo, hemos creado un Observable para emitir 9 elementos y, utilizando el operador de ventana, 3 Observable se emitirán juntos.
Ejemplo de ventana
Cree el siguiente programa Java usando cualquier editor de su elección en, digamos, C: \> RxJava.
ObservableTester.java
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
public class ObservableTester {
public static void main(String[] args) throws InterruptedException {
Observable<Integer> observable = Observable.just(1, 2, 3, 4,
5, 6, 7, 8, 9);
observable.subscribeOn(Schedulers.io())
.delay(2, TimeUnit.SECONDS, Schedulers.io())
.window(3)
.subscribe(new Observer<Observable<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
@Override
public void onNext(Observable<Integer> integers) {
System.out.println("onNext: ");
integers.subscribe(value -> System.out.println(value));
}
@Override
public void onError(Throwable e) {
System.out.println("Error");
}
@Override
public void onComplete() {
System.out.println("Done! ");
}
});
Thread.sleep(3000);
}
}
Verificar el resultado
Compila la clase usando javac compilador de la siguiente manera:
C:\RxJava>javac ObservableTester.java
Ahora ejecute ObservableTester de la siguiente manera:
C:\RxJava>java ObservableTester
Debería producir el siguiente resultado:
Subscribed
onNext:
1
2
3
onNext:
4
5
6
onNext:
7
8
9
Done!