rx-java reactive-programming

rx java - Error de captura si se vuelven a intentar los intentos de reintento cuando:



rx-java reactive-programming (6)

El Javadoc para retryWhen a retryWhen establece que:

Si ese Observable llama onComplete o onError, el reintento llamará onCompleted o onError en la suscripción secundaria.

En pocas palabras, si desea propagar la excepción, deberá volver a emitir la excepción original una vez que haya tenido suficiente reintento.

Una forma fácil es configurar su Observable.range de Observable.range para que sea 1 mayor que la cantidad de veces que desea volver a intentarlo.

Luego, en la función zip, compruebe el número actual de reintentos. Si es igual a NUMBER_OF_RETRIES + 1, devuelve Observable.error(throwable) o vuelve a lanzar tu excepción.

P.EJ

Observable.create((Subscriber<? super String> s) -> { System.out.println("subscribing"); s.onError(new RuntimeException("always fails")); }).retryWhen(attempts -> { return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (throwable, attempt) -> { if (attempt == NUMBER_OF_RETRIES + 1) { throw Throwables.propagate(throwable); } else { return attempt; } }).flatMap(i -> { System.out.println("delaying retry by " + i + " second(s)"); return Observable.timer(i, TimeUnit.SECONDS); }); }).toBlocking().forEach(System.out::println);

Como parte, doOnError no afecta al Observable de ninguna manera, simplemente le proporciona un enlace para realizar alguna acción si se produce un error. Un ejemplo común es el registro.

En la documentación de RetryWhen el ejemplo allí va así:

Observable.create((Subscriber<? super String> s) -> { System.out.println("subscribing"); s.onError(new RuntimeException("always fails")); }).retryWhen(attempts -> { return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> { System.out.println("delay retry by " + i + " second(s)"); return Observable.timer(i, TimeUnit.SECONDS); }); }).toBlocking().forEach(System.out::println);

Pero, ¿cómo propagar el error si se terminan los reintentos?

Agregar .doOnError(System.out::println) después de la cláusula retryWhen no retryWhen el error. ¿Es incluso emitido?

Agregar un .doOnError(System.out::println) antes de reintentar Cuando la pantalla always fails para todos los reintentos.


El documento para retryWhen a retryWhen dice que pasa la notificación de onError a sus suscriptores y termina. Así que puedes hacer algo como esto:

final int ATTEMPTS = 3; Observable.create((Subscriber<? super String> s) -> { System.out.println("subscribing"); s.onError(new RuntimeException("always fails")); }).retryWhen(attempts -> attempts .zipWith(Observable.range(1, ATTEMPTS), (n, i) -> i < ATTEMPTS ? Observable.timer(i, SECONDS) : Observable.error(n)) .flatMap(x -> x)) .toBlocking() .forEach(System.out::println);


Puede obtener el comportamiento que desea utilizando el constructor rxjava-extras en rxjava-extras que se encuentra en Maven Central. Utiliza la última versión.

Observable.create((Subscriber<? super String> s) -> { System.out.println("subscribing"); s.onError(new RuntimeException("always fails")); }) .retryWhen(RetryWhen .delays(Observable.range(1, 3) .map(n -> (long) n), TimeUnit.SECONDS).build()) .doOnError(e -> e.printStackTrace()) .toBlocking().forEach(System.out::println);


Puede usar la función de escaneo, que devuelve un par con el índice acumulado y decidir si transmitir o no el error:

.retryWhen(attempts -> return .scan(Pair.create(0, null), (index, value) -> Pair.create(index.first + 1, value)) .flatMap(pair -> { if(pair.first > MAX_RETRY_COUNT) { throw new RuntimeException(pair.second); } return Observable.timer(pair.first, TimeUnit.SECONDS); });

O puede seguir con el operador zipWith pero aumentar el número en el range Observable y devolver un par, en lugar del índice solo. De esa manera, no perderá la información sobre throwable anteriores.

attempts .zipWith(Observable.range(1, MAX_RETRY_COUNT + 1), (throwable, i) -> Pair.create(i, throwable)) .flatMap(pair -> { if(pair.first > MAX_RETRY_COUNT) throw new RuntimeException(pair.second); System.out.println("delay retry by " + pair.first + " second(s)"); return Observable.timer(pair.first, TimeUnit.SECONDS); });


Una opción es usar Observable.materialize() para convertir elementos Observable.range() en notificaciones. Luego, una vez que se onCompleted() , se puede propagar el error en sentido descendente (en la muestra a continuación, Pair se usa para ajustar las notificaciones de Observable.range() y la excepción de Observable )

@Test public void retryWhen() throws Exception { Observable.create((Subscriber<? super String> s) -> { System.out.println("subscribing"); s.onError(new RuntimeException("always fails")); }).retryWhen(attempts -> { return attempts.zipWith(Observable.range(1, 3).materialize(), Pair::new) .flatMap(notifAndEx -> { System.out.println("delay retry by " + notifAndEx + " second(s)"); return notifAndEx.getRight().isOnCompleted() ? Observable.<Integer>error(notifAndEx.getLeft()) : Observable.timer(notifAndEx.getRight().getValue(), TimeUnit.SECONDS); }); }).toBlocking().forEach(System.out::println); } private static class Pair<L,R> { private final L left; private final R right; public Pair(L left, R right) { this.left = left; this.right = right; } public L getLeft() { return left; } public R getRight() { return right; } }


Debe usar onErrorResumeNext después del reintento cuando

En tu ejemplo

Observable.create((Subscriber<? super String> s) -> { System.out.println("subscribing"); s.onError(new RuntimeException("always fails")); }).retryWhen(attempts -> { return attempts.zipWith(Observable.range(1, NUMBER_OF_RETRIES + 1), (n, i) -> { if (i == NUMBER_OF_RETRIES + 1) { throw Throwables.propagate(n); } else { return i; } }).flatMap(i -> { System.out.println("delay retry by " + i + " second(s)"); return Observable.timer(i, TimeUnit.SECONDS); }); }) .onErrorResumeNext(t -> {System.out.println("Error after all retries:" + t.getMessage()); return Observable.error(t); }) .toBlocking().forEach(System.out::println);

En la parte inferior de esta clase puede ver un ejemplo práctico para comprender cómo funciona. https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/errors/ObservableExceptions.java