rxjava programming example rx-java

rx-java - programming - rxjava retrofit



Las mejores prácticas para el manejo de errores y el procesamiento continuo (4)

Como novato de Rx, también estaba buscando una respuesta simple para procesar las excepciones por separado y continuar procesando el próximo evento, pero no pude encontrar respuestas a lo que @Daniele Segato estaba preguntando. Aquí hay una solución donde no tienes control:

Los ejemplos anteriores suponen que usted tiene control sobre los observables, es decir, una forma es retrasar los errores hasta el final usando mergeDelayError O devolver un evento vacío conocido Observable para cada evento como Observable por separado utilizando Merge.

Si se trata de un error de evento de origen, puede usar lift para crear otro observable que básicamente procesa el valor del Observable actual con gracia. La clase SimpleErrorEmitter simula un flujo ilimitado que puede fallar a veces.

Observable.create(new SimpleErrorEmitter()) // transform errors to write to error stream .lift(new SuppressError<Integer>(System.err::println)) .doOnNext(System.out::println) // and everything else to console .subscribe(); class SimpleErrorEmitter implements OnSubscribe<Integer> { @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); subscriber.onError(new FooException()); subscriber.onNext(3); subscriber.onNext(4); subscriber.onCompleted(); } class SuppressError<T> implements Operator<T, T> { final Action1<Throwable> onError; public SuppressError(Action1<Throwable> onError) { this.onError = onError; } @Override public Subscriber<? super T> call(Subscriber<? super T> t1) { return new Subscriber<T>(t1) { @Override public void onNext(T t) { t1.onNext(t); } @Override public void onError(Throwable e) { // handle errors using a separate function onError.call(e); } @Override public void onCompleted() { t1.onCompleted(); } }; }

Si se trata de un error de procesamiento del suscriptor que puede intentar / capturar y continuar con gracia

Observable<Integer> justInts = justStrs.map((str) -> { try { return Integer.parseInt(str); } catch (NumberFormatException e) { return null; } });

Todavía estoy intentando encontrar una forma sencilla de volver a intentar o retrasar el intento fallido y continuar a partir del siguiente.

Observable<String> justStrs = Observable .just("1", "2", "three", "4", "5") // or an unbounded stream // both these retrying from beginning // when you delay or retry, if they are of known exception type .retryWhen(ex -> ex.flatMap(eachex -> { // for example, if it is a socket or timeout type of exception, try delaying it or retrying it if (eachex instanceof RuntimeException) { return Observable.timer(1L, TimeUnit.MICROSECONDS, Schedulers.immediate()); } return Observable.error(eachex); })) // or simply retry 2 times .retry(2) // if it is the source problem, attempt retry .doOnError((ex) -> System.err.println("On Error:" + ex));

Referencia: https://groups.google.com/forum/#!topic/rxjava/trm2n6S4FSc

Soy nuevo en RxJava pero lo estoy integrando en un proyecto en el que estoy trabajando para ayudarme a aprenderlo. Me he encontrado con una pregunta sobre las mejores prácticas.

Tengo una pregunta sobre cómo manejar onError para evitar que se detenga el procesamiento de Observable .

Aquí está la configuración:

Tengo una lista de ID de usuario para cada una de las que me gustaría hacer 2 o más solicitudes de red. Si alguna de las solicitudes de red falla para el ID de usuario, ese ID de usuario no se actualizará y se puede omitir. Esto no debería impedir que los otros ID de usuario se procesen. Tengo una solución, pero implica suscripciones anidadas (ver segundo bloque de código). Un problema que sí veo es que, si cada llamada falla, no hay manera de cortocircuitar y evitar que el resto llegue a un recurso de la red, incluso después de que la detección de un determinado umbral haya fallado.

¿Hay una mejor manera de hacer esto?

En el código tradicional:

List<String> results = new ArrayList<String>(); for (String userId : userIds) { try { String info = getInfo(userId); // can throw an GetInfoException String otherInfo = getOtherInfo(userId); // can throw an GetOtherInfoException results.add(info + ", " + otherInfo); } catch (GetInfoException e) { log.error(e); } catch (GetOtherInfoException e) { log.error(e); } }

PROBLEMA:

Pseudocódigo

userid -> network requests -> result 1 -> a, b -> onNext(1[a ,b]) 2 -> a, onError -> onError 3 -> a, b -> onNext(3[a, b]) 4 -> a, b -> onNext(4[a, b])

El siguiente es un ejemplo de trabajo de una lista de ID de usuario y por cada 2 solicitudes de información. Si lo ejecuta, verá que fallará (consulte a continuación el código fuente)

import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; public class TestMergeDelayError { public static Observable<String> getUserIds() { return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"}); } public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) { Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() { public Subscription onSubscribe(Observer<? super String> t1) { if (integer.contains(errorNumber)) { t1.onError(new Exception()); } else { t1.onNext(prefix + integer); t1.onCompleted(); } return Subscriptions.empty(); } }); return observable; } public static void main(String[] args) { Observable<String> userIdObservable = getUserIds(); Observable<String> t = userIdObservable.flatMap(new Func1<String, Observable<String>>() { public Observable<String> call(final String t1) { Observable<String> info1 = getInfo("1::: ", t1, "2"); Observable<String> info2 = getInfo("2::: ",t1, "3"); return Observable.mergeDelayError(info1, info2); } }); t.subscribe(new Action1<String>() { public void call(String t1) { System.out.println(t1); } }, new Action1<Throwable>() { public void call(Throwable t1) { t1.printStackTrace(); } }, new Action0(){ public void call() { System.out.println("onComplete"); } }); } }

Salida:

1::: 1 2::: 1 2::: 2 java.lang.Exception at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:32) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:266) at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable$ParentObserver.onNext(OperationMergeDelayError.java:210) at rx.operators.OperationMergeDelayError$2.onSubscribe(OperationMergeDelayError.java:77) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMergeDelayError$MergeDelayErrorObservable.onSubscribe(OperationMergeDelayError.java:171) at rx.operators.OperationMergeDelayError$1.onSubscribe(OperationMergeDelayError.java:64) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116) at rx.operators.OperationMap$MapObservable$1.onNext(OperationMap.java:105) at rx.operators.SafeObserver.onNext(SafeObserver.java:102) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMap$MapObservable.onSubscribe(OperationMap.java:102) at rx.operators.OperationMap$2.onSubscribe(OperationMap.java:76) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106) at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:483)

Solución de suscripción anidada:

import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; public class TestMergeDelayError { public static Observable<String> getUserIds() { return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"}); } public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) { Observable<String> observable = Observable.create(new OnSubscribeFunc<String>() { public Subscription onSubscribe(Observer<? super String> t1) { if (integer.contains(errorNumber)) { t1.onError(new Exception()); } else { t1.onNext(prefix + integer); t1.onCompleted(); } return Subscriptions.empty(); } }); return observable; } public static void main(String[] args) { Observable<String> userIdObservable = getUserIds(); userIdObservable.subscribe(new Action1<String>() { public void call(String t1) { Observable<String> info1 = getInfo("1::: ", t1, "2"); Observable<String> info2 = getInfo("2::: ", t1, "3"); Observable.merge(info1, info2).subscribe(new Action1<String>() { public void call(String t1) { System.out.println(t1); } }, new Action1<Throwable>() { public void call(Throwable t1) { t1.printStackTrace(); } }, new Action0() { public void call() { System.out.println("onComplete"); } }); } }); } }

Salida:

1::: 1 2::: 1 onComplete java.lang.Exception at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106) at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:483) at TestMergeDelayError$2.call(TestMergeDelayError.java:47) at TestMergeDelayError$2.call(TestMergeDelayError.java:42) at rx.Observable$2.onNext(Observable.java:381) at rx.operators.SafeObserver.onNext(SafeObserver.java:102) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:367) at TestMergeDelayError.main(TestMergeDelayError.java:42) 1::: 3 java.lang.Exception at TestMergeDelayError$1.onSubscribe(TestMergeDelayError.java:28) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:164) at rx.operators.OperationMerge$MergeObservable$ParentObserver.onNext(OperationMerge.java:116) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.operators.OperationMerge$MergeObservable.onSubscribe(OperationMerge.java:106) at rx.operators.OperationMerge$1.onSubscribe(OperationMerge.java:56) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:483) at TestMergeDelayError$2.call(TestMergeDelayError.java:47) at TestMergeDelayError$2.call(TestMergeDelayError.java:42) at rx.Observable$2.onNext(Observable.java:381) at rx.operators.SafeObserver.onNext(SafeObserver.java:102) at rx.operators.OperationToObservableIterable$ToObservableIterable.onSubscribe(OperationToObservableIterable.java:94) at rx.Observable.subscribe(Observable.java:241) at rx.Observable.protectivelyWrapAndSubscribe(Observable.java:320) at rx.Observable.subscribe(Observable.java:367) at TestMergeDelayError.main(TestMergeDelayError.java:42) 1::: 4 2::: 4 onComplete 1::: 5 2::: 5 onComplete 1::: 6 2::: 6 onComplete

Como puede ver, solo los ID de usuario individuales que fallaron detuvieron su procesamiento individual, pero el resto de los ID de usuario se procesaron.

Solo busco consejos, vea si esta solución tiene sentido y si no lo es, cuál es la mejor práctica.

Gracias alex


La mejor práctica es usar mergeDelayError( ) que combina múltiples Observables en uno, permitiendo que los Observables sin errores continúen antes de propagar los errores.

mergeDelayError comporta de manera muy similar a merge . La excepción es cuando uno de los Observables que se fusionan termina con una notificación onError. Si esto sucede con la combinación, el Observable fusionado emitirá inmediatamente una notificación onError y finalizará. mergeDelayError, por otro lado, se abstendrá de informar el error hasta que le haya dado a cualquier otro observable que no genere errores que está fusionando la posibilidad de terminar de emitir sus artículos, y los emitirá ellos mismos, y solo terminará con una notificación onError cuando todos los demás Observables combinados hayan finalizado.


Mirando la fuente de Observable.flatMap :

return merge(map(func));

Si desea que se procesen todos los ID de usuario posibles, puede continuar con la versión modificada de flatMap:

Observable.mergeDelayError(userIdObservable.map(userInfoFunc))

Más adelante, si dices:

Si alguna de las solicitudes de red falla para el ID de usuario, ese ID de usuario no se actualizará y se puede omitir.

Entonces no uses:

return Observable.mergeDelayError(info1, info2);

Porque esto causará que tanto info1 como info2 se soliciten incluso cuando uno de ellos falla.

Prefiero ir con:

return Observable.merge(info1, info2);

Cuando info1 e info2 están suscritos al mismo hilo, se ejecutarán secuencialmente, por lo que si falla info1, nunca se solicitará info2. Dado que info1 e info2 están limitados a la E / S, supongo que desea ejecutarlos en paralelo:

getInfo("1::: ", t1, "2").subscribeOn(Schedulers.io()); getInfo("2::: ",t1, "3").subscribeOn(Schedulers.io());

Esto debería acelerar significativamente su procesamiento

Todo el código:

public class TestMergeDelayError { public static Observable<String> getUserIds() { return Observable.from(new String[]{"1", "2", "3", "4", "5", "6"}); } public static Observable<String> getInfo(final String prefix, final String integer, final String errorNumber) { return Observable.create(new OnSubscribeFunc<String>() { public Subscription onSubscribe(Observer<? super String> t1) { if (integer.contains(errorNumber)) { t1.onError(new Exception()); } else { t1.onNext(prefix + integer); t1.onCompleted(); } return Subscriptions.empty(); } }) .subscribeOn(Schedulers.io()); } public static void main(String[] args) { Observable<String> userIdObservable = getUserIds(); Observable<String> t = Observable.mergeDelayError(userIdObservable.map(new Func1<String, Observable<String>>() { public Observable<String> call(final String t1) { Observable<String> info1 = getInfo("1::: ", t1, "2"); Observable<String> info2 = getInfo("2::: ",t1, "3"); return Observable.merge(info1, info2); } })); //rest is the same } }


Ya que quiere ignorar el error, puede probar onErrorResumeNext(Observable.<String>empty()); . Por ejemplo,

Observable<String> info1 = getInfo("1::: ", t1, "2").onErrorResumeNext(Observable.<String>empty()); Observable<String> info2 = getInfo("2::: ", t1, "3").onErrorResumeNext(Observable.<String>empty()); return Observable.merge(info1, info2);