В сети очень много русско- и англоязычных статей по Rx операторам retryWhen и repeatWhen.
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).
Приведу несколько как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.
В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.
Императивный способ перезапуска цепи
Предположим, мы используем Retrofit и загрузку начинаем в методе load(). Repository.getSomething() возвращает Single<Something>().
@NonNull
private Subscription loadingSubscription = Subscriptions.unsubscribed();
private void load() {
subscription.unsubscribe();
subscription = repository
.getSomething()
.subscribe(result -> {}, err -> {});
}
private void update() {
load();
}
Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update(), который, в свою очередь, вызовет метод load(), где с нуля будет создана подписка.
Я предпочитаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen().
Реактивный способ перезапуска цепи — repeatWhen
Создадим объект PublishSubject updateSubject, и передадим в оператор лямбду
repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable())
@NonNull
private final PublishSubject<Void> updateSubject = PublishSubject.create();
private void load() {
repository
.getSomething()
.repeatWhen(repeatHandler ->
repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
.subscribe(result -> {}, err -> {});
}
Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.
private void update() {
updateSubject.onNext(null);
}
Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete() сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).
Реактивный способ обработки ошибок retryWhen
Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError() внутри Single, который возвращается методом getUser().
В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry().
@NonNull
private final PublishSubject<Void> retrySubject = PublishSubject.create();
private void load() {
repository
.getSomething()
.doOnError(err -> showConnectionDialog())
.retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))
.subscribe(result -> {}, err -> {});
}
private void retry() {
retrySubject.onNext(null);
}
По вызову retrySubject.onNext(null) вся цепочка выше retryWhen() переподпишется к источнику getUser(), и повторит запрос.
При таком подходе важно помнить, что doOnError() должен находиться выше в цепочке, чем retryWhen(), поскольку последний «поглощает» ошибки до эмита repeatHandler'а.
В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.
В следующем, бессовестно притянутом за уши, примере, в методе load() мы объединяем два источника оператором combineLatest.
Первый источник — repository.getSomething() загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge(), загружает что-то тяжелое из локального хранилища.
public void load() {
Observable.combineLatest(repository.getSomething(),
localStorage.fetchSomethingReallyHuge(),
(something, hugeObject) -> new Stuff(something, hugeObject))
.subscribe(stuff -> {}, err -> {});
}
При обработке ошибки императивным способом, вызывая load() на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.
Посмотрим, как будет выглядеть реактивный способ.
public void load() {
Observable.combineLatest(
repository.getSomething()
.retryWhen(retryHandler ->
retryHandler.flatMap(
err -> retrySubject.asObservable())),
localStorage.fetchSomethingReallyHuge()
.retryWhen(retryHandler ->
retryHandler.flatMap(
nothing -> retrySubject.asObservable())),
(something, hugeObject) -> new Stuff(something, hugeObject))
.subscribe(stuff -> {}, err -> {});
}
Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen() исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.
А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.
Делегирование обработки ошибок
Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:
public class RetryEvent {
}
Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:
interface RetryManager {
Observable<RetryEvent> observeRetries(@NonNull Throwable error);
}
Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.
Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:
//pass this through constructor, DI or use singleton (but please don't)
private final RetryManager retryManager;
public void load() {
Observable.combineLatest(
repository.getSomething()
.retryWhen(retryHandler ->
retryHandler.flatMap(
err -> retryManager.observeRetries())),
localStorage.fetchSomethingReallyHuge()
.retryWhen(retryHandler ->
retryHandler.flatMap(
nothing -> retryManager.observeRetries())),
(something, hugeObject) -> new Stuff(something, hugeObject))
.subscribe(stuff -> {}, err -> {});
}
Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.
Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen() и retryWhen() в своих проектах.
Автор: yatsinar