rxjava2 rxjava rxandroid example java observable rx-java2 take

rxandroid - rxjava2 maven



RxJava2 observable take throws UndeliverableException (2)

  1. Sí, pero como el ''final'' observable no significa que el código que se ejecuta dentro de create(...) está detenido. Para estar completamente seguro en este caso, debe usar o.isDisposed() para ver si el observable ha finalizado en sentido descendente.
  2. La excepción está ahí porque RxJava 2 tiene la política de NUNCA permitir que una llamada onError se pierda. Se entrega en sentido descendente o se lanza como una UndeliverableException global si el observable ya ha terminado. Corresponde al creador del Observable manejar "adecuadamente" el caso en el que el observable ha finalizado y se produce una Excepción.
  3. El problema es que el productor ( Observable ) y el consumidor ( Subscriber ) no están de acuerdo cuando termina la transmisión. Dado que el productor está sobreviviendo al consumidor en este caso, el problema solo puede solucionarse en el productor.

Según tengo entendido, RxJava2 values.take(1) crea otro observable que contiene solo un elemento del observable original. Lo que NO DEBE lanzar una excepción, ya que se filtra hacia fuera por el efecto de take(1) ya que ocurrió en segundo lugar.

como en el siguiente fragmento de código

Observable<Integer> values = Observable.create(o -> { o.onNext(1); o.onError(new Exception("Oops")); }); values.take(1) .subscribe( System.out::println, e -> System.out.println("Error: " + e.getMessage()), () -> System.out.println("Completed") );

Salida

1 Completed io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main$0(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more

Mis preguntas :

  1. ¿Lo estoy entendiendo bien?
  2. Lo que realmente está sucediendo causa la excepción.
  3. ¿Cómo resolver esto del consumidor?

En el comentario anterior, @Kiskae respondió correctamente sobre la razón por la que se produce dicha excepción.

Aquí el enlace al documento oficial sobre este tema: RxJava2-wiki .

A veces no puede cambiar este comportamiento, por lo que hay una manera de manejar esta UndeliverableException de UndeliverableException . Aquí está el fragmento de código de cómo evitar bloqueos y mal comportamiento:

RxJavaPlugins.setErrorHandler(e -> { if (e instanceof UndeliverableException) { e = e.getCause(); } if ((e instanceof IOException) || (e instanceof SocketException)) { // fine, irrelevant network problem or API that throws on cancellation return; } if (e instanceof InterruptedException) { // fine, some blocking code was interrupted by a dispose call return; } if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) { // that''s likely a bug in the application Thread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(), e); return; } if (e instanceof IllegalStateException) { // that''s a bug in RxJava or in a custom operator Thread.currentThread().getUncaughtExceptionHandler() .handleException(Thread.currentThread(), e); return; } Log.warning("Undeliverable exception received, not sure what to do", e); });

Este código tomado del enlace de arriba.

Nota IMPORTANTE. Este enfoque establece el controlador de errores global en RxJava, de modo que si puede deshacerse de estas excepciones, sería una mejor opción.