rxjava: Можно ли использовать функцию retry (), но с задержкой?

Я использую rxjava в своем приложении для Android, чтобы асинхронно обрабатывать сетевые запросы. Теперь я хотел бы повторить неудачный сетевой запрос только через некоторое время.

Есть ли способ использовать retry () в Observable, но повторить только после некоторой задержки?

Есть ли способ, позволяющий Наблюдателю знать, что в настоящее время проводится повторное рассмотрение (в отличие от опробованных в первый раз)?

Я посмотрел на debounce () / throttleWithTimeout (), но они, похоже, делают что-то другое.

Редактировать:

Я думаю, что нашел один способ сделать это, но мне было бы интересно либо подтвердить, что это правильный способ сделать это, либо для других, лучших способов.

Что я делаю, так это: В методе call () моего Observable.OnSubscribe, прежде чем я вызову метод Subscribers onError (), я просто разрешаю Thread sleep в течение необходимого количества времени. Итак, чтобы повторить каждые 1000 миллисекунд, я делаю что-то вроде этого:

@Override public void call(Subscriber<? super List> subscriber) { try { Log.d(TAG, "trying to load all products with pid: " + pid); subscriber.onNext(productClient.getProductNodesForParentId(pid)); subscriber.onCompleted(); } catch (Exception e) { try { Thread.sleep(1000); } catch (InterruptedException e1) { e.printStackTrace(); } subscriber.onError(e); } } 

Так как этот метод работает на streamе ввода-вывода, он не блокирует пользовательский интерфейс. Единственная проблема, которую я вижу, заключается в том, что даже первая ошибка сообщается с задержкой, поэтому задержка существует, даже если нет повтора (). Я бы хотел, чтобы это было лучше, если задержка не была применена после ошибки, но вместо этого перед повторением (но не до первой попытки, очевидно).

Вы можете использовать оператор retryWhen() для добавления логики повтора к любому наблюдаемому.

Следующий class содержит логику повтора:

RxJava 2.x

 public class RetryWithDelay implements Function, Observable> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable apply(final Observable attempts) { return attempts .flatMap(new Function>() { @Override public Observable apply(final Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

RxJava 1.x

 public class RetryWithDelay implements Func1, Observable> { private final int maxRetries; private final int retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Observable call(Observable attempts) { return attempts .flatMap(new Func1>() { @Override public Observable call(Throwable throwable) { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Observable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Observable.error(throwable); } }); } } 

Применение:

 // Add retry logic to existing observable. // Retry max of 3 times with a delay of 2 seconds. observable .retryWhen(new RetryWithDelay(3, 2000)); 

Это решение, основанное на fragmentах Бена Кристенсена, которые я видел: RetryWhen Example и RetryWhenTestsConditional (мне пришлось изменить n.getThrowable() на n чтобы он работал). Я использовал evant / gradle-retrolambda, чтобы сделать нотную запись lambda на Android, но вам не нужно использовать lambda (хотя это очень рекомендуется). Для задержки я реализовал экспоненциальный откат, но вы можете подключить любую логику возврата, которую вы хотите там. Для полноты я добавил операторов subscribeOn и observeOn . Я использую ReactiveX / RxAndroid для AndroidSchedulers.mainThread() .

 int ATTEMPT_COUNT = 10; public class Tuple { public final X x; public final Y y; public Tuple(X x, Y y) { this.x = x; this.y = y; } } observable .subscribeOn(Schedulers.io()) .retryWhen( attempts -> { return attempts.zipWith(Observable.range(1, ATTEMPT_COUNT + 1), (n, i) -> new Tuple(n, i)) .flatMap( ni -> { if (ni.y > ATTEMPT_COUNT) return Observable.error(ni.x); return Observable.timer((long) Math.pow(2, ni.y), TimeUnit.SECONDS); }); }) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 

вместо использования MyRequestObservable.retry Я использую функцию-оболочку retryObservable (MyRequestObservable, retrycount, seconds), которые возвращают новую Observable, которая обрабатывает косвенную связь для задержки, поэтому я могу сделать

 retryObservable(restApi.getObservableStuff(), 3, 30) .subscribe(new Action1(){ @Override public void call(BonusIndividualList arg0) { //success! } }, new Action1(){ @Override public void call(Throwable arg0) { // failed after the 3 retries ! }}); // wrapper code private static  Observable retryObservable( final Observable requestObservable, final int nbRetry, final long seconds) { return Observable.create(new Observable.OnSubscribe() { @Override public void call(final Subscriber subscriber) { requestObservable.subscribe(new Action1() { @Override public void call(T arg0) { subscriber.onNext(arg0); subscriber.onCompleted(); } }, new Action1() { @Override public void call(Throwable error) { if (nbRetry > 0) { Observable.just(requestObservable) .delay(seconds, TimeUnit.SECONDS) .observeOn(mainThread()) .subscribe(new Action1>(){ @Override public void call(Observable observable){ retryObservable(observable, nbRetry - 1, seconds) .subscribe(subscriber); } }); } else { // still fail after retries subscriber.onError(error); } } }); } }); } 

Вдохновленный ответом Павла , и если вас не волнует retryWhen проблемы, заявленные Абхиджит Саркаром , самый простой способ отсрочить повторную подписку с помощью rxJava2 unconditionnaly:

 source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS)) 

Вы можете захотеть увидеть больше образцов и объяснений при повторном повторении и повторении .

Теперь с RxJava версии 1.0+ вы можете использовать zipWith для достижения повторной попытки с задержкой.

Добавлены изменения в kjones .

модифицированный

 public class RetryWithDelay implements Func1, Observable> { private final int MAX_RETRIES; private final int DELAY_DURATION; private final int START_RETRY; /** * Provide number of retries and seconds to be delayed between retry. * * @param maxRetries Number of retries. * @param delayDurationInSeconds Seconds to be delays in each retry. */ public RetryWithDelay(int maxRetries, int delayDurationInSeconds) { MAX_RETRIES = maxRetries; DELAY_DURATION = delayDurationInSeconds; START_RETRY = 1; } @Override public Observable call(Observable observable) { return observable .delay(DELAY_DURATION, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), new Func2() { @Override public Integer call(Throwable throwable, Integer attempt) { return attempt; } }); } } 

Вы можете добавить задержку в Observable, возвращенную в retryWhen Operator

  /** * Here we can see how onErrorResumeNext works and emit an item in case that an error occur in the pipeline and an exception is propagated */ @Test public void observableOnErrorResumeNext() { Subscription subscription = Observable.just(null) .map(Object::toString) .doOnError(failure -> System.out.println("Error:" + failure.getCause())) .retryWhen(errors -> errors.doOnNext(o -> count++) .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)), Schedulers.newThread()) .onErrorResumeNext(t -> { System.out.println("Error after all retries:" + t.getCause()); return Observable.just("I save the world for extinction!"); }) .subscribe(s -> System.out.println(s)); new TestSubscriber((Observer) subscription).awaitTerminalEvent(500, TimeUnit.MILLISECONDS); } 

Здесь вы можете увидеть больше примеров. https://github.com/politrons/reactive

retryWhen это сложный, возможно, даже глючный оператор. В официальном документе и, по крайней мере, один ответ здесь используется оператор range , который не будет выполнен, если повторных попыток не будет. Смотрите мою дискуссию с членом ReactiveX Дэвидом Карноком.

Я улучшил ответ на flatMap изменив flatMap на concatMap и добавив class RetryDelayStrategy . flatMap не сохраняет порядок эмиссии, в то время как concatMap делает это, что важно для задержек с concatMap . RetryDelayStrategy , как видно из названия, позволяет пользователю выбирать из разных режимов создания задержек RetryDelayStrategy попыток, включая откат. Код доступен на моем GitHub в комплекте со следующими тестовыми примерами:

  1. Превышает 1-ю попытку (нет попыток)
  2. Сбой после повторной попытки
  3. Попытка повторить попытку 3 раза, но преуспеть на втором, следовательно, не повторяет 3 раза
  4. Достигает 3-й попытки

См. Метод setRandomJokes .

Тот же ответ, что и у kjones, но обновлен до последней версии. Для версии RxJava 2.x : (‘io.reactivex.rxjava2: rxjava: 2.1.3’)

 public class RetryWithDelay implements Function, Publisher> { private final int maxRetries; private final long retryDelayMillis; private int retryCount; public RetryWithDelay(final int maxRetries, final int retryDelayMillis) { this.maxRetries = maxRetries; this.retryDelayMillis = retryDelayMillis; this.retryCount = 0; } @Override public Publisher apply(Flowable throwableFlowable) throws Exception { return throwableFlowable.flatMap(new Function>() { @Override public Publisher apply(Throwable throwable) throws Exception { if (++retryCount < maxRetries) { // When this Observable calls onNext, the original // Observable will be retried (ie re-subscribed). return Flowable.timer(retryDelayMillis, TimeUnit.MILLISECONDS); } // Max retries hit. Just pass the error along. return Flowable.error(throwable); } }); } 

}

Применение:

// Добавить логику повтора в существующую наблюдаемую. // Повторяем максимум 3 раза с задержкой в ​​2 секунды.

 observable .retryWhen(new RetryWithDelay(3, 2000)); 

Для версии Kotlin & RxJava1

 class RetryWithDelay(private val MAX_RETRIES: Int, private val DELAY_DURATION_IN_SECONDS: Long) : Function1, Observable<*>> { private val START_RETRY: Int = 1 override fun invoke(observable: Observable): Observable<*> { return observable.delay(DELAY_DURATION_IN_SECONDS, TimeUnit.SECONDS) .zipWith(Observable.range(START_RETRY, MAX_RETRIES), object : Function2 { override fun invoke(throwable: Throwable, attempt: Int): Int { return attempt } }) } } 

(Kotlin) Я немного улучшил код с экспоненциальным отсрочкой и применил защитную защиту Observable.range ():

  fun testOnRetryWithDelayExponentialBackoff() { val interval = 1 val maxCount = 3 val ai = AtomicInteger(1); val source = Observable.create { emitter -> val attempt = ai.getAndIncrement() println("Subscribe ${attempt}") if (attempt >= maxCount) { emitter.onNext(Unit) emitter.onComplete() } emitter.onError(RuntimeException("Test $attempt")) } // Below implementation of "retryWhen" function, remove all "println()" for real code. val sourceWithRetry: Observable = source.retryWhen { throwableRx -> throwableRx.doOnNext({ println("Error: $it") }) .zipWith(Observable.range(1, maxCount) .concatMap { Observable.just(it).delay(0, TimeUnit.MILLISECONDS) }, BiFunction { t1: Throwable, t2: Int -> t1 to t2 } ) .flatMap { pair -> if (pair.second >= maxCount) { Observable.error(pair.first) } else { val delay = interval * 2F.pow(pair.second) println("retry delay: $delay") Observable.timer(delay.toLong(), TimeUnit.SECONDS) } } } //Code to print the result in terminal. sourceWithRetry .doOnComplete { println("Complete") } .doOnError({ println("Final Error: $it") }) .blockingForEach { println("$it") } } 

Просто сделайте это так:

  Observable.just("") .delay(2, TimeUnit.SECONDS) //delay .flatMap(new Func1>() { @Override public Observable call(String s) { L.from(TAG).d("postAvatar="); File file = PhotoPickUtil.getTempFile(); if (file.length() <= 0) { throw new NullPointerException(); } return Observable.just(file); } }) .retry(6) .subscribe(new Action1() { @Override public void call(File file) { postAvatar(file); } }, new Action1() { @Override public void call(Throwable throwable) { } }); 
Давайте будем гением компьютера.