rxandroid - rxjava2 maven
RxJava2 observable take throws UndeliverableException (2)
- Sí, pero como el ''final'' observable no significa que el código que se ejecuta dentro de
create(...)
está detenido. Para estar completamente seguro en este caso, debe usaro.isDisposed()
para ver si el observable ha finalizado en sentido descendente. - La excepción está ahí porque RxJava 2 tiene la política de NUNCA permitir que una llamada
onError
se pierda. Se entrega en sentido descendente o se lanza como unaUndeliverableException
global si el observable ya ha terminado. Corresponde al creador del Observable manejar "adecuadamente" el caso en el que el observable ha finalizado y se produce una Excepción. - El problema es que el productor (
Observable
) y el consumidor (Subscriber
) no están de acuerdo cuando termina la transmisión. Dado que el productor está sobreviviendo al consumidor en este caso, el problema solo puede solucionarse en el productor.
Según tengo entendido, RxJava2 values.take(1)
crea otro observable que contiene solo un elemento del observable original. Lo que NO DEBE lanzar una excepción, ya que se filtra hacia fuera por el efecto de take(1)
ya que ocurrió en segundo lugar.
como en el siguiente fragmento de código
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
Salida
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Mis preguntas :
- ¿Lo estoy entendiendo bien?
- Lo que realmente está sucediendo causa la excepción.
- ¿Cómo resolver esto del consumidor?
En el comentario anterior, @Kiskae respondió correctamente sobre la razón por la que se produce dicha excepción.
Aquí el enlace al documento oficial sobre este tema: RxJava2-wiki .
A veces no puede cambiar este comportamiento, por lo que hay una manera de manejar esta UndeliverableException
de UndeliverableException
. Aquí está el fragmento de código de cómo evitar bloqueos y mal comportamiento:
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that''s likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that''s a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});
Este código tomado del enlace de arriba.
Nota IMPORTANTE. Este enfoque establece el controlador de errores global en RxJava, de modo que si puede deshacerse de estas excepciones, sería una mejor opción.