rx java - example - rxjava: ¿Puedo usar retry() pero con retraso?
rxjs (11)
Estoy usando rxjava en mi aplicación de Android para manejar las solicitudes de red de forma asincrónica. Ahora me gustaría volver a intentar una solicitud de red fallida solo después de que haya transcurrido un cierto tiempo.
¿Hay alguna forma de utilizar retry () en un Observable pero volver a intentarlo solo después de un cierto retraso?
¿Hay alguna manera de hacerle saber al Observable que se está reintentando actualmente (en lugar de intentarlo por primera vez)?
Eché un vistazo a rebotar () / throttleWithTimeout () pero parece que están haciendo algo diferente.
Editar:
Creo que encontré una forma de hacerlo, pero me interesaría la confirmación de que esta es la forma correcta de hacerlo o de otras formas mejores.
Lo que estoy haciendo es esto: en el método call () de mi Observable.OnSubscribe, antes de llamar al método Suscripers onError (), simplemente dejo que el subproceso duerma durante el tiempo deseado. Por lo tanto, para volver a intentar cada 1000 milisegundos, hago algo como esto:
@Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
try {
Log.d(TAG, "trying to load all products with pid: " + pid);
subscriber.onNext(productClient.getProductNodesForParentId(pid));
subscriber.onCompleted();
} catch (Exception e) {
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e.printStackTrace();
}
subscriber.onError(e);
}
}
Como este método se ejecuta en una secuencia de IO, no bloquea la interfaz de usuario. El único problema que puedo ver es que incluso el primer error se informa con retraso, por lo que la demora está presente incluso si no hay reintento (). Me gustaría que fuera mejor si el retraso no se aplicaba después de un error, sino antes de volver a intentarlo (pero no antes del primer intento, obviamente).
(Kotlin) Poco mejoré el código con el retroceso exponencial y la emisión de defensa aplicada de Observable.range ():
fun testOnRetryWithDelayExponentialBackoff() {
val interval = 1
val maxCount = 3
val ai = AtomicInteger(1);
val source = Observable.create<Unit> { emitter ->
val attempt = ai.getAndIncrement()
println("Subscribe ${attempt}")
if (attempt >= maxCount) {
emitter.onNext(Unit)
emitter.onComplete()
}
emitter.onError(RuntimeException("Test $attempt"))
}
// Below implementation of "retryWhen" function, remove all "println()" for real code.
val sourceWithRetry: Observable<Unit> = source.retryWhen { throwableRx ->
throwableRx.doOnNext({ println("Error: $it") })
.zipWith(Observable.range(1, maxCount)
.concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) },
BiFunction { t1: Throwable, t2: Int -> t1 to t2 }
)
.flatMap { pair ->
if (pair.second >= maxCount) {
Observable.error(pair.first)
} else {
val delay = interval * 2F.pow(pair.second)
println("retry delay: $delay")
Observable.timer(delay.toLong(), TimeUnit.SECONDS)
}
}
}
//Code to print the result in terminal.
sourceWithRetry
.doOnComplete { println("Complete") }
.doOnError({ println("Final Error: $it") })
.blockingForEach { println("$it") }
}
Ahora con la versión 1.0+ de RxJava puede usar zipWith para lograr el reintento con retraso.
Añadiendo modificaciones a la respuesta de kjones .
Modificado
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int MAX_RETRIES;
private final int DELAY_DURATION;
private final int START_RETRY;
/**
* Provide number of retries and seconds to be delayed between retry.
*
* @param maxRetries Number of retries.
* @param delayDurationInSeconds Seconds to be delays in each retry.
*/
public RetryWithDelay(int maxRetries, int delayDurationInSeconds) {
MAX_RETRIES = maxRetries;
DELAY_DURATION = delayDurationInSeconds;
START_RETRY = 1;
}
@Override
public Observable<?> call(Observable<? extends Throwable> observable) {
return observable
.delay(DELAY_DURATION, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
new Func2<Throwable, Integer, Integer>() {
@Override
public Integer call(Throwable throwable, Integer attempt) {
return attempt;
}
});
}
}
Esta es una solución basada en los fragmentos de Ben Christensen que vi, RetryWhen Example y RetryWhenTestsConditional (tuve que cambiar n.getThrowable()
por n
para que funcione). Utilicé evant/gradle-retrolambda para hacer que la notación lambda funcione en Android, pero no tienes que usar lambdas (aunque es muy recomendable). Para la demora, implementé un retroceso exponencial, pero puede conectar la lógica de rechazo que quiera allí. Para completar, agregué los operadores subscribeOn
y observeOn
. Estoy usando ReactiveX/RxAndroid para ReactiveX/RxAndroid AndroidSchedulers.mainThread()
.
int ATTEMPT_COUNT = 10;
public class Tuple<X, Y> {
public final X x;
public final Y y;
public Tuple(X x, Y y) {
this.x = x;
this.y = y;
}
}
observable
.subscribeOn(Schedulers.io())
.retryWhen(
attempts -> {
return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple<Throwable, Integer>(n, i))
.flatMap(
ni -> {
if (ni.y > ATTEMPT_COUNT)
return Observable.error(ni.x);
return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS);
});
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
Inspirado por la respuesta de Pablo , y si no le preocupa retryWhen
a retryWhen
problemas planteados por Abhijit Sarkar , la forma más simple de retrasar la resuscripción incondicional con rxJava2 no es incondicional:
source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))
Es posible que desee ver más muestras y explicaciones al volver a intentar cuándo y repetir cuándo .
La misma respuesta que desde kjones pero actualizada a la versión más reciente Para la versión RxJava 2.x : (''io.reactivex.rxjava2: rxjava: 2.1.3'')
public class RetryWithDelay implements Function<Flowable<Throwable>, Publisher<?>> {
private final int maxRetries;
private final long retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Publisher<?> apply(Flowable<Throwable> throwableFlowable) throws Exception {
return throwableFlowable.flatMap(new Function<Throwable, Publisher<?>>() {
@Override
public Publisher<?> apply(Throwable throwable) throws Exception {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Flowable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Flowable.error(throwable);
}
});
}
}
Uso:
// Agregar lógica de reintento a observable existente. // Reintentar max de 3 veces con un retraso de 2 segundos.
observable
.retryWhen(new RetryWithDelay(3, 2000));
Para la versión de Kotlin y RxJava1
class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long)
: Function1<Observable<out Throwable>, Observable<*>> {
private val START_RETRY: Int = 1
override fun invoke(observable: Observable<out Throwable>): Observable<*> {
return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS)
.zipWith(Observable.range(START_RETRY, MAX_RETRIES),
object : Function2<Throwable, Int, Int> {
override fun invoke(throwable: Throwable, attempt: Int): Int {
return attempt
}
})
}
}
Puede agregar un retraso en el Observable devuelto en el intento de nuevo cuando el operador
/**
* Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated
*/
@Test
public void observableOnErrorResumeNext() {
Subscription subscription = Observable.just(null)
.map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world for extinction!");
})
.subscribe(s -> System.out.println(s));
new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
}
Puedes ver más ejemplos aquí. https://github.com/politrons/reactive
Puede usar el operador retryWhen()
para agregar la lógica de reintento a cualquier Observable.
La siguiente clase contiene la lógica de reintento:
RxJava 2.x
public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> apply(final Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Function<Throwable, Observable<?>>() {
@Override
public Observable<?> apply(final Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
RxJava 1.x
public class RetryWithDelay implements
Func1<Observable<? extends Throwable>, Observable<?>> {
private final int maxRetries;
private final int retryDelayMillis;
private int retryCount;
public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
this.maxRetries = maxRetries;
this.retryDelayMillis = retryDelayMillis;
this.retryCount = 0;
}
@Override
public Observable<?> call(Observable<? extends Throwable> attempts) {
return attempts
.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
if (++retryCount < maxRetries) {
// When this Observable calls onNext, the original
// Observable will be retried (i.e. re-subscribed).
return Observable.timer(retryDelayMillis,
TimeUnit.MILLISECONDS);
}
// Max retries hit. Just pass the error along.
return Observable.error(throwable);
}
});
}
}
Uso:
// Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
.retryWhen(new RetryWithDelay(3, 2000));
Simplemente hazlo así:
Observable.just("")
.delay(2, TimeUnit.SECONDS) //delay
.flatMap(new Func1<String, Observable<File>>() {
@Override
public Observable<File> call(String s) {
L.from(TAG).d("postAvatar=");
File file = PhotoPickUtil.getTempFile();
if (file.length() <= 0) {
throw new NullPointerException();
}
return Observable.just(file);
}
})
.retry(6)
.subscribe(new Action1<File>() {
@Override
public void call(File file) {
postAvatar(file);
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
en lugar de utilizar MyRequestObservable.retry utilizo una función de reintento retryObservable (MyRequestObservable, retrycount, seconds) que devuelve un nuevo Observable que maneja la indirección para la demora, así que puedo hacer
retryObservable(restApi.getObservableStuff(), 3, 30)
.subscribe(new Action1<BonusIndividualList>(){
@Override
public void call(BonusIndividualList arg0)
{
//success!
}
},
new Action1<Throwable>(){
@Override
public void call(Throwable arg0) {
// failed after the 3 retries !
}});
// wrapper code
private static <T> Observable<T> retryObservable(
final Observable<T> requestObservable, final int nbRetry,
final long seconds) {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
requestObservable.subscribe(new Action1<T>() {
@Override
public void call(T arg0) {
subscriber.onNext(arg0);
subscriber.onCompleted();
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable error) {
if (nbRetry > 0) {
Observable.just(requestObservable)
.delay(seconds, TimeUnit.SECONDS)
.observeOn(mainThread())
.subscribe(new Action1<Observable<T>>(){
@Override
public void call(Observable<T> observable){
retryObservable(observable,
nbRetry - 1, seconds)
.subscribe(subscriber);
}
});
} else {
// still fail after retries
subscriber.onError(error);
}
}
});
}
});
}
retryWhen
es un operador complicado, tal vez incluso con errores. El documento oficial y al menos una respuesta aquí usan el operador de range
, que fallará si no se realizan reintentos. Vea mi discussion con el miembro de ReactiveX David Karnok.
Mejore la respuesta de flatMap
cambiando flatMap
a concatMap
y agregando una clase RetryDelayStrategy
. flatMap
no conserva el orden de emisión mientras que concatMap
hace, lo cual es importante para retrasos con retroceso. RetryDelayStrategy
, como su nombre indica, el usuario puede elegir entre varios modos de generación de retrasos de reintentos, incluido el RetryDelayStrategy
. El código está disponible en mi GitHub completo con los siguientes casos de prueba:
- Triunfa en el primer intento (sin reintentos)
- Falla después de 1 intento
- Intenta volver a intentar 3 veces pero tiene éxito en 2nd; por lo tanto, no vuelve a intentar la 3ª vez
- Sucede en el tercer intento
Ver el método setRandomJokes
.