Отказоустойчивость, отзывчивость, ориентированность на события и масштабируемость — четыре принципа нынче популярного реактивного программирования. Именно следуя им создаётся backend больших систем с одновременной поддержкой десятков тысяч соединений.
Отзывчивость, простота, гибкость и расширяемость кода — принципы, которые можно закрепить за реактивным UI.
Наверняка, если совместить реактивные backend и UI, то можно получить качественный продукт. Именно его мы и попытались сделать, разрабатывая 2GIS Dialer — звонилки, которая работает через API и при этом должна оставаться быстрой и удобной.
Зачем нам реактивное программирование
Рассмотрим пример:
requestDataTask = new AsyncTask<Void, Void, JSONObject>() {
@Override
protected JSONObject doInBackground(Void... params) {
final String requestResult = apiService.getData();
final JSONObject json = JsonUtils.parse(requestResult);
lruCache.cacheJson(json);
return json;
}
};
Тут всё просто, мы создаем AsyncTask, в котором:
- Делаем запрос к API 2ГИС.
- Создаем
<code>JSONObject
на основе результата запроса. - Кэшируем
JSONObject
. - Возвращаем
JSONObject
.
Подобный код встречается во многих проектах, он понятен, а миллионы леммингов не могут ошибаться. Но давайте копнём чуть глубже:
- Что делать, если где-то во время выполнения выпал
Exception
? doInBackground(Void...)
выполняется в отдельном потоке, как нам сказать пользователю об ошибке в UI? Заводить поля дляException
?- А что возвращать, если не прошел запрос? null?
- А если json не валидный?
- Что стоит делать, если не удалось кэшировать объект?
И ведь это не самый сложный пример. Представьте, что вам надо сделать ещё один запрос на основе результатов предыдущего. На AsyncTask’ах это будет callback-hell, который, как минимум, будет неустойчив к падениям, ошибкам и т.д.
Вопросов больше, чем ответов. О недостатках AsyncTask’ов можно написать целую статью, серьезно. Но есть ли варианты лучше?
Фреймворк RxJava
Оглядываясь на принципы реактивного программирования мы начали искать решение, которое обеспечит:
- отсутствие зависаний и тормозов;
- масштабируемость на ресурсы смартфона;
- отсутствие крэшей;
- ориентированность на события.
Таковым стала RxJava от ребят из Netflix — reactive extension, идея (но не реализация) которого перекочевала из reactive extension for c#.
В RxJava всё крутится вокруг Observable
. Observable
— это как потоки данных (ещё их можно рассматривать как монады), которые могут каким-либо образом получать и отдавать эти самые данные. Над Observable
’ами можно применять операции, такие как flatmap
, filter
, zip
, merge
, cast
и т.д.
Простой пример:
//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5))
.subscribeOn(Schedulers.newThread()) //отдаем новый тред для работы в background
.observeOn(AndroidSchedulers.mainThread()) //говорим, что обсервить хотим в main thread
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something with result
}
});
Мы создаем Observable
, который поочередно отдает нам числа из Iterable
. Указываем, что генерация и передача данных будет происходить в отдельном треде, а обработка результата — в main thread. Подписываемся на него, и в методе подписчика производим любые манипуляции с каждым следующим результатом.
Можно сделать этот пример более интересным:
//Observable, который последовательно будет давать нам элементы из Iterable
final Subscription subscription = Observable.from(Arrays.asList(1, 2, 3, 4, 5)).
//оператор фильтрации для отсеивания ненужных результатов
filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0; //выражение верно только для четных чисел
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
//do something with ONLY EVEN result
}
});
Теперь, указав оператор filter
, мы можем обрабатывать только чётные значения.
Как используют RxJava
Вернёмся к нашему первому AsyncTask и посмотрим, как бы мы решили задачу с помощью реактивного программирования.
Для начала создадим Observable с запросом:
//Observable, действия которого основанны на переданной ему Observable.OnSubscribe<String>
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
//сообщить сабскрайберу о том, что есть новые данные
subscriber.onNext(apiService.getData());
//А теперь сообщаем о том, что мы закончили и данных больше нет
subscriber.onCompleted();
}
});
Тут мы создаем Observable
и специфицируем его поведение. Делаем запрос и отдаем результат в onNext(...)
, после чего говорим Subscriber
’у, что мы закончили, вызвав onCompleted()
.
С этим понятно: мы создали Observalble
, который отвечает только за получение объекта String
с API. SRP в чистом виде.
Что, если запрос не прошёл по каким-то причинам? Тогда мы можем позвать у Observable
метод retry(...)
, который будет повторять этот самый Observable
n раз, пока он не завершится успешно (читай, без Exception
). Кроме того, мы можем отдать Observable
’у другой Observable
, если даже retry()
не помог. Если backend написан криво, то лучше бы нам закрывать соединение по таймауту. И у нас есть метод timeout(...)
на этот случай. Всё вместе это выглядело бы так:
final Subscription subscription =
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext(apiService.getData());
subscriber.onCompleted();
}
})
.timeout(5, TimeUnit.SECONDS) //указываем таймаут операции в секундах
.retry(3) // делаем 3 попытки запроса
//назначаем обработчик в случае, если все таки мы не спасли положение
.onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {
@Override
public Observable<? extends String> call(Throwable throwable) {
//return new observable here, that can rescure us from error
}
});
И немного рефакторинга:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()); // назначили обработчик ошибки
Теперь займемся созданием json. Для этого результат нашего первого Observable
(а там String
) надо преобразовать. Используем map(...)
, и, если что-то вдруг пойдет не так, вернем другой, нужный нам в случае неудачи, json с помощью onErrorReturn(...)
.
Вот так:
final Subscription subscription =
createApiRequestObservable()
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
.retry(RETRY_COUNT_FOR_REQUEST)
.onErrorResumeNext(createRequestErrorHandler())
//модифицируем Observable, чтобы тот преобразовывал String в JSONObject
.map(new Func1<String, JSONObject>() {
@Override
public JSONObject call(String s) {
return JsonUtils.parse(s);
}
})
//снова ставим обработчик ошибки
//и вернем предустановленный "ошибочный" json
.onErrorReturn(new Func1<Throwable, JSONObject>() {
@Override
public JSONObject call(Throwable throwable) {
return jsonObjectForErrors;
}
});
Ок, с json разобрались. Осталось кэширование. Кэширование: это не преобразование результата, а действие над ним. Для этого у Observable
есть методы doOnNext(...)
, doOnEach(...)
и т.д. Получается как-то так:
final Subscription subscription =
createApiRequestObservable()
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS)
.retry(RETRY_COUNT_FOR_REQUEST)
.onErrorResumeNext(createRequestErrorHandler())
//модифицируем Observable, чтобы тот преобразовывал String в JSONObject
.map(new Func1<String, JSONObject>() {
@Override
public JSONObject call(String s) {
return JsonUtils.parse(s);
}
})
//снова ставим обработчик ошибки
//и вернем предустановленный "ошибочный" json
.onErrorReturn(new Func1<Throwable, JSONObject>() {
@Override
public JSONObject call(Throwable throwable) {
return jsonObjectForErrors;
}
})
//процедура, вызывающаяся при каждом onNext(..) от Observable
.doOnNext(new Action1<JSONObject>() {
@Override
public void call(JSONObject jsonObject) {
lruCache.cacheJson(jsonObject);
}
});
Снова немного отрефакторим код:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
.map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
.onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
.doOnNext(createCacheOperation()); //кэшируем JSONObject
Мы почти закончили. Как в самом первом примере с RxJava, добавим обработчик результата и укажем треды, в которых надо исполняться.
Финальная версия:
final Subscription subscription =
createApiRequestObservable() //создали Observable с запросом
.timeout(TIMEOUT_IN_SECONDS, TimeUnit.SECONDS) //поставили таймаут
.retry(RETRY_COUNT_FOR_REQUEST) //поставили кол-во повторов
.onErrorResumeNext(createRequestErrorHandler()) // назначили обработчик ошибки
.map(createJsonMapOperator()) //модифицировали Observable, чтобы получать JSONObject
.onErrorReturn(createJsonErrorHandler()) //возвращаем в случае ошибки то, что ожидаем
.doOnNext(createCacheOperation()); //кэшируем JSONObject
.subscribeOn(Schedulers.newThread()) //делаем запрос, преобразование, кэширование в отдельном потоке
.observeOn(AndroidSchedulers.mainThread()) // обработка результата - в main thread
.subscribe(subscriber); //обработчик результата
Давайте посмотрим, чего мы тут добились:
- Принцип отказоустойчивости в действии: результат выполнения всех операций всегда предсказуем. Мы знаем обо всех ошибках и потенциально проблемных местах, которые могут возникнуть в коде, и уже обработали их. Никаких исключений не будет.
- Принцип отзывчивости в действии: соединение с базой или сервером не зависнет благодаря таймауту, попытается сам восстановиться при ошибке и, что тоже важно, вернет результат сразу, до кэширования. А кэширование в
doOnNext
выполнится параллельно обработке результата. - Принцип ориентированности на события в действии: по ходу выполнения запроса и парсинга, мы всегда реагируем на события — события успешного/неуспешного завершения запроса, событие окончания парсинга json (2 реакции: обработка в UI и обработка в бэкграунд трэде для кэширования) и т.д. Кроме того, можно несколько раз подписываться на один
Observable
и держать в консистентном состоянии всю систему. - Код легко расширяем и почти не требует изменений. Если нам необходимо сделать логирование ошибки или сохранение стэктрейс, можно добавить метод
doOnError(Throwable thr)
. Хотите отфильтровать результаты — добавьте операторfilter
и реализуйте его поведение.
Как и недостатки AsyncTask’ов, преимущества этого подхода, на мой взгляд, можно перечислять очень долго. Последний из принципов реактивного программирования, принцип масштабируемости, продемонстрируем ниже.
RxJava в 2GIS Dialer
Живой пример:
//создаем новый Observable путем комбинирования четырех других
final Observable<AggregatedContactData> obs = Observable.combineLatest(
createContactsObservable(context), //Observable для получения контактов из базы
createPhonesObservable(context), //Observable для получения всех телефонов контактов
createAccountsObservable(context), //Observable для полуения аккаунтов и контактов по ним
createJobsObservable(context), //Observable для получения мест работы контактов
contactsInfoCombinator() //функция комбинироваия результатов всех Observable выше
).onErrorReturn(createEmptyObservable()).cache() //обработчик ошибки и оператор кэширования
.subscribeOn(Schedulers.executor(ThreadPoolHolder.getGeneralExecutor())) //для выполнения такой задачи потребуется тред пул
.observeOn(AndroidSchedulers.mainThread()); // обработка данных как всегда - в main thread
- Тут происходит сразу много интересного и посложнее описанного выше:
Первое, что бросается в глаза, этоObservable.combineLatest(...)
. Этот оператор ждетonNext(...)
от всех переданных емуObservable
’ов и применяет функцию комбинирования сразу ко всем результатам. Может показаться сложным, но картинка из вики RxJava сделает всё понятнее. Самое важное тут, что каждый изObservable
, переданных вObservable.combineLatest(...)
— этоCursorObservable
, который передает в свойonNext(...)
новый курсор, как только он меняется в базе данных. Таким образом, на любое обновление любого из четырех курсоров выполняется функция комбинирования, что позволяет всегда поставлять самые свежие данные. Это и есть принцип ориентированности на события. - Если что-то пошло не так, то мы исходя из своих нужд возвращаем требуемое. В данном случае
Collections.emptyList();
- Оператор
cache()
очень полезен, если на этотObservable
могут быть подписаны сразу несколькоSubscribers
. Если этот оператор применен кObservable
, то новый его подписчик мгновенно получает данные, при условии, что эти данные уже были посчитаны для подписавшихся ранее. Таким образом, все желающие имеет актуальные одинаковые данные. - А вот тут видно принцип масштабируемости: в
subscribeOn(...)
я отдаю тред пул на 4 треда, чтобы каждый из 4х моихObservable
выполнялся в отдельном треде с целью максимизации скорости, всю остальную заботу берет на себя RxJava. То есть задействованы будут все 4 процессора, при наличие оных.
Как видите, потенциал у реактивного программирования огромный, а фукнционал RxJava реализует его в достаточной мере.
Проблемы, с которыми мы столкнулись
Всё, продемонстрированное выше и намного больше в том или ином виде используется у нас в дайлере. И вот с какими проблемами мы столкнулись:
- Проблема OOM. Наивно полагать, что Android может дать много тредов для многопоточной работы. При количестве тредов больше 15, даже топовые смартфоны начинали “задумываться”, а их мелкие собратья и вовсе падали с
OutOfMemoryError
. Решение было простым. ВвестиCachedThreadPool
для этих дел и проблема решена. - Кэширование запросов. Речь не про оператор
cache()
из примера выше. Хотелось бы, чтобы следующий запрос на тот же самый url сразу брался из кэша. В RxJava такого нет. В принципе это правильно, потому что реактивность и кэш — две разные вещи. Поэтому мы написали свой кэш.
Что еще?
Мы увидели, как классно реактивно работать с многопоточностью и запросами в Android. Но это далеко не всё. Например, можно подписываться на изменение Checkable
или EditText
(это из коробки идет в RxJava для Android). Тут всё просто, но ужасно удобно.
Кстати, одной RxJava реактивное программирование под Java не ограничивается. Существуют и другие библиотеки, например, Bolts-Android.
Кроме этого, сейчас активно разрабатывается Reactive-Streams, который призван унифицировать работу с разными реактивными провайдерами в java.
Вывод
Понравилось ли нам? Однозначно. Реактивные приложения действительно гораздо устойчивее к багам и падениям, код становится понятным и гибким (были бы лямбды — был бы еще и красивым). Много рутинной работы перекладывается на библиотеку, которая выполняет свою работу лучше, чем нативные Android-компоненты. Это позволяет сосредоточиться на реализации вещей, которые действительно стоит обдумать.
Реактивное программирование — это немного другое
Ссылки, которые помогут вам разобраться с реактивным программированием или просто могут оказаться интересными:
- Википедия RxJava.
- Очень толковая статья от ведущего программиста SoundHound и евангелиста RxJava под Android.
- Реактивный манифест.
- Reactive-Streams.
Небольшое отступление. Если вы разделяете наши взгляды на программирование и создание продуктов, то приходите — будем рады вас видеть в команде 2GIS Dialer.
Автор: lNevermore