schedulers rxjava observables how for curso java events event-handling rx-java

observables - rxjava observable



Creando Observable desde eventos Java normales (4)

¿Cuál es la mejor manera de crear un Rx-Java Observable desde el patrón de eventos clásico de Java? Es decir, dado

class FooEvent { ... } interface FooListener { void fooHappened(FooEvent arg); } class Bar { public void addFooListener(FooListener l); public void removeFooListener(FooListener l); }

Quiero implementar

Observable<FooEvent> fooEvents(Bar bar);

La implementación que se me ocurrió es:

Observable<FooEvent> fooEvents(Bar bar) { return Observable.create(new OnSubscribeFunc<FooEvent>() { public Subscription onSubscribe(Observer<? super FooEvent> obs) { FooListener l = new FooListener() { public void fooHappened(FooEvent arg) { obs.onNext(arg); } }; bar.addFooListener(l); return new Subscription() { public void unsubscribe() { bar.removeFooListener(l); } }; } }); }

Sin embargo, no me gusta mucho:

  1. es bastante detallado

  2. requiere un oyente por Observer (idealmente, no debería haber oyentes si no hay observadores, y un oyente de lo contrario). Esto se puede mejorar manteniendo un recuento de observadores como un campo en OnSubscribeFunc , incrementándolo en la suscripción y disminuyendo en la cancelación de la suscripción.

¿Hay una solución mejor?

Requisitos:

  1. Trabajar con implementaciones existentes de patrones de eventos sin cambiarlos (si estuviera controlando ese código, ya podría escribirlo para devolver el Observable que necesitaba).

  2. Obteniendo errores de compilación si / cuando la API de origen cambia. No se trabaja con Object lugar del tipo de argumento de evento real o con cadenas de nombre de propiedad.


No creo que haya una manera de crear un observable genérico para cada evento posible, pero ciertamente puede usarlos donde lo necesite.

La fuente de RxJava tiene algunos ejemplos útiles de cómo crear observables a partir de eventos del mouse, eventos de botones, etc. Eche un vistazo a esta clase, que los crea a partir de KeyEvents: KeyEventSource.java .


Si desea algo simple e integrado, pruebe este enfoque http://examples.javacodegeeks.com/core-java/beans/bean-property-change-event-listener/

java.beans.PropertyChangeEvent; java.beans.PropertyChangeListener; java.beans.PropertyChangeSupport;

Desde el sitio, hay un fragmento que muestra cómo usarlo.

package com.javacodegeeks.snippets.core; import java.beans.PropertyChangeEvent; import java.beans.PropertyChangeListener; import java.beans.PropertyChangeSupport; public class BeanPropertyChangeEventListener { public static void main(String[] args) throws Exception { Bean bean = new Bean(); bean.addPropertyChangeListener(new MyPropertyChangeListener()); bean.setProperty1("newProperty1"); bean.setProperty2(123); bean.setProperty1("newnewProperty1"); bean.setProperty2(234); } public static class MyPropertyChangeListener implements PropertyChangeListener { // This method is called every time the property value is changed public void propertyChange(PropertyChangeEvent evt) { System.out.println("Name = " + evt.getPropertyName()); System.out.println("Old Value = " + evt.getOldValue()); System.out.println("New Value = " + evt.getNewValue()); System.out.println("**********************************"); } } public static class Bean { private PropertyChangeSupport pcs = new PropertyChangeSupport(this); // Property property1 private String property1; // Property property2 private int property2; public String getProperty1() { return property1; } public void setProperty1(String property1) { pcs.firePropertyChange("property1", this.property1, property1); this.property1 = property1; } public int getProperty2() { return property2; } public void setProperty2(int property2) { pcs.firePropertyChange("property2", this.property2, property2); this.property2 = property2; } public void addPropertyChangeListener(PropertyChangeListener listener) { pcs.addPropertyChangeListener(listener); } } }

es bastante simple


Su implementación es absolutamente correcta.

es bastante detallado

Se vuelve mucho menos detallado con lambdas (ejemplo para RxJava 2):

Observable<FooEvent> fooEvents(Bar bar) { return Observable.create(emitter -> { FooListener listener = event -> emitter.onNext(event); bar.addFooListener(listener); emitter.setCancellable(() -> bar.removeFooListener(listener)); }); }

idealmente, no debería haber oyentes si no hay observadores, y un oyente de lo contrario

Puede usar el operador share() , lo que hace que su capacidad de observación sea alta , es decir, todos los suscriptores comparten una suscripción única. Se suscribe automáticamente con el primer suscriptor, y se da de baja cuando el último se da de baja:

fooEvents(bar).share()


Supongo que puedes tomar la misma sopa, solo recalienta si usas otra capa de oyentes como un puente entre las devoluciones de llamada reales y tu observador. Actual callback → bridge callback → Observer .

Beneficios:

  • código más lineal
  • una instancia de devolución de llamada real, fuera del observador
  • se ve especialmente bien con funciones de alto orden, como funciones literales en kotlin:

Ex (note cuán pequeño es el cierre observable):

class LocationService @Inject constructor(private val googleApiClient: GoogleApiClient) : ConnectionCallbacks{ val locationObservable: Observable<Location> private var passToObservable: (Location?) -> Unit = {} init { locationObservable = Observable.create<Location> { subscription -> passToObservable = { location -> subscription.onNext(location) } }.doOnSubscribe { googleApiClient.registerConnectionCallbacks(this) googleApiClient.connect() }.doOnUnsubscribe { googleApiClient.unregisterConnectionCallbacks(this) } } override fun onConnected(connectionHint: Bundle?) { val location = LocationServices.FusedLocationApi.getLastLocation(googleApiClient) passToObservable(location) } override fun onConnectionSuspended(cause: Int) { //... } }