rxjava2 rxjava rxandroid example curso rx-java

rx java - rxandroid - ¿Cómo puedo crear un observador sobre una lista dinámica en RxJava?



rxjava observable (3)

Necesito crear un observador sobre una matriz que cambia constantemente (agregando elementos).

Estoy usando Obserable.from (iterable) pero parece que crea el Observable sobre el ArrayList tal como está en el momento de la creación.

Necesito que el Observer sea notificado y que la Acción se ejecute cada vez que ArrayList obtiene un nuevo elemento agregado.


Consideraría este enfoque basado en BehaviourSubject. Esto difiere de la solución de juanpavergara en que un onNext () se emitirá inmediatamente al observador cuando se suscriba al observable.

public class ObservableList<T> { protected final List<T> list; protected final BehaviorSubject<List<T>> behaviorSubject; public ObservableList(List<T> list) { this.list = list; this.behaviorSubject = BehaviorSubject.create(list); } public Observable<List<T>> getObservable() { return behaviorSubject; } public void add(T element) { list.add(element); behaviorSubject.onNext(list); } } private void main() { final List<Integer> list = new ArrayList<>(); list.add(0); list.add(1); final ObservableList<Integer> olist = new ObservableList<>(list); olist.getObservable().subscribe(System.out::println); olist.add(2); olist.add(3); }

Esta solución puede ser útil al implementar MVP, cuando desea observar un recurso (es decir, una lista de objetos) devuelto por un componente en el sistema (es decir, un repositorio o DataSource), y desea que el Observador (es decir: Presentador o Interactor) que se notificará cuando se agregue un elemento a la lista en otra parte del sistema.


Hay que ir Gracias a Dávid Karnok en RxJava Google Group

import java.util.ArrayList; import java.util.List; import rx.Observable; import rx.subjects.PublishSubject; public class ObservableListExample { public static class ObservableList<T> { protected final List<T> list; protected final PublishSubject<T> onAdd; public ObservableList() { this.list = new ArrayList<T>(); this.onAdd = PublishSubject.create(); } public void add(T value) { list.add(value); onAdd.onNext(value); } public Observable<T> getObservable() { return onAdd; } } public static void main(String[] args) throws Exception { ObservableList<Integer> olist = new ObservableList<>(); olist.getObservable().subscribe(System.out::println); olist.add(1); Thread.sleep(1000); olist.add(2); Thread.sleep(1000); olist.add(3); } }


Puedes fusionar dos observables a uno. Uno de ellos puede ser la lista inicial de elementos y el segundo puede ser sujeto:

import rx.Observable; import rx.subjects.ReplaySubject; import java.util.ArrayList; import java.util.List; public class ExampleObservableList { public static void main(String[] args) { List<Integer> initialNumbers = new ArrayList<Integer>(); initialNumbers.add(1); initialNumbers.add(2); Observable<Integer> observableInitial = Observable.from(initialNumbers); ReplaySubject<Integer> subject = ReplaySubject.create(); Observable<Integer> source = Observable.merge(observableInitial, subject); source.subscribe(System.out::println); for (int i = 0; i < 100; ++i) { subject.onNext(i); } } }

Si no tiene elementos iniciales, solo puede usar ReplaySubject (u otro Subject -> vea http://reactivex.io/documentation/subject.html ):

public static void main(String[] args) { ReplaySubject<Integer> source = ReplaySubject.create(); source.subscribe(System.out::println); for (int i = 0; i < 100; ++i) { source.onNext(i); } }