Rx. Постигаем retryWhen и repeatWhen на примерах из Android разработки

в 14:47, , рубрики: android, rxandroid, Разработка под android

В сети очень много русско- и англоязычных статей по Rx операторам retryWhen и repeatWhen.
Несмотря на это, очень часто встречаю нежелание их использовать (ввиду сложного синтаксиса и непонятных диаграмм).

Приведу несколько как можно с их помощью эффективно перезапускать участки цепи и делегировать обработку перезапусков при ошибках и завершениях потока.

В примерах будет Java код с лямбдами (Retrolamda), но переписать его на Kotlin или чистую Java не составит труда.

Императивный способ перезапуска цепи

Предположим, мы используем Retrofit и загрузку начинаем в методе load(). Repository.getSomething() возвращает Single<Something>().

@NonNull
private Subscription loadingSubscription = Subscriptions.unsubscribed();

private void load() {
    subscription.unsubscribe();
    subscription = repository
             .getSomething()
             .subscribe(result -> {}, err -> {});
}

private void update() {
    load();
}

Из какого-нибудь листенера обновлений (e.g. PullToRefreshView) мы вызываем метод update(), который, в свою очередь, вызовет метод load(), где с нуля будет создана подписка.

Я предпочитаю ко вниманию вариант использования более реактивного, на мой взгляд, способа с вышеупомянутым оператором repeatWhen().

Реактивный способ перезапуска цепи — repeatWhen

Создадим объект PublishSubject updateSubject, и передадим в оператор лямбду
repeatHandler -> repeatHandler.flatMap(nothing -> updateSubject.asObservable())

@NonNull
private final PublishSubject<Void> updateSubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .repeatWhen(repeatHandler ->
                                repeatHandler.flatMap(nothing -> updateSubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

Теперь для обновления загруженных данных нужно заэмитить null в updateSubject.

private void update() {
    updateSubject.onNext(null);
}

Нужно помнить, что работает такой реактивный способ только с Single, который вызывает onComplete() сразу после эмита единственного элемента (будет работать и с Observable, но только после завершения потока).

Реактивный способ обработки ошибок retryWhen

Подобным образом можно обрабатывать и ошибки. Предположим, у пользователя пропала сеть, что приведет к ошибке и вызову onError() внутри Single, который возвращается методом getUser().

В этот момент можно показать пользователю диалог с текстом «Проверьте соединение», а по нажатию кнопки OK вызвать метод retry().

@NonNull
private final PublishSubject<Void> retrySubject = PublishSubject.create();

private void load() {
    repository
            .getSomething()
            .doOnError(err -> showConnectionDialog())
            .retryWhen(retryHandler -> retryHandler.flatMap(nothing -> retrySubject.asObservable()))
            .subscribe(result -> {}, err -> {});
}

private void retry() {
    retrySubject.onNext(null);
}

По вызову retrySubject.onNext(null) вся цепочка выше retryWhen() переподпишется к источнику getUser(), и повторит запрос.

При таком подходе важно помнить, что doOnError() должен находиться выше в цепочке, чем retryWhen(), поскольку последний «поглощает» ошибки до эмита repeatHandler'а.

В данном конкретном случае выигрыша по производительности не будет, а кода стало даже чуть больше, но эти примеры помогут начать мыслить реактивными паттернами.

В следующем, бессовестно притянутом за уши, примере, в методе load() мы объединяем два источника оператором combineLatest.

Первый источник — repository.getSomething() загружает что-то из сети, второй, localStorage.fetchSomethingReallyHuge(), загружает что-то тяжелое из локального хранилища.

public void load() {
    Observable.combineLatest(repository.getSomething(),
                             localStorage.fetchSomethingReallyHuge(),
                             (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

При обработке ошибки императивным способом, вызывая load() на каждую ошибку, мы будем заново подписываться на оба источника, что, в данном примере, абсолютно ненужно. При сетевой ошибке, второй источник успешно заэмитит данные, ошибка произойдет только в первом. В этом случае императивный способ будет еще и медленней.

Посмотрим, как будет выглядеть реактивный способ.


public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retrySubject.asObservable())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retrySubject.asObservable())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

Прелесть такого подхода в том, что лямбда, переданная в оператор retryWhen() исполняется только после ошибки внутри источника, соответственно, если «ошибется» только один из источников, то и переподписка произойдет только на него, а оставшаяся цепочка ниже будет ожидать переисполнения.

А если ошибка произойдет внутри обоих источников, то один и тот же retryHandler сработает в двух местах.

Делегирование обработки ошибок

Следующим шагом можно делегировать обработку повторов некоему RetryManager. Перед этим еще можно немного подготовиться к переезду на Rx2 и убрать из наших потоков null объекты, которые запрещены в Rx2. Для этого можно создать класс:

public class RetryEvent {
}

Без ничего. Позже туда можно будет добавлять разные флаги, но это другая история. Интерфейс RetryManager может выглядеть как-то так:

interface RetryManager {

    Observable<RetryEvent> observeRetries(@NonNull Throwable error);

}

Реализация может проверять ошибки, показывать диалоги, снэкбар, устанавливать бесшумный таймаут — всё, что душе угодно. И слушать коллбэки от всех этих UI компонентов, чтобы в последствии заэмитить RetryEvent в наш retryHandler.

Предыдущий пример с использованием такого RetryManager будет выглядеть вот так:

//pass this through constructor, DI or use singleton (but please don't)
private final RetryManager retryManager;

public void load() {
    Observable.combineLatest(
            repository.getSomething()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               err -> retryManager.observeRetries())),
            localStorage.fetchSomethingReallyHuge()
                    .retryWhen(retryHandler ->
                                       retryHandler.flatMap(
                                               nothing -> retryManager.observeRetries())),
            (something, hugeObject) -> new Stuff(something, hugeObject))
            .subscribe(stuff -> {}, err -> {});
}

Таким нехитрым образом обработка повторов при ошибках делегирована сторонней сущности, которую можно передавать как зависимость.

Надеюсь, эти примеры окажутся кому-то полезны и соблазнят попробовать repeatWhen() и retryWhen() в своих проектах.

Автор: yatsinar

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js