Я долго боялся использовать RxJava в production. Её назначение и принцип работы оставались для меня загадкой. Чтение исходного кода не добавляло ясности, а статьи только путали. Под катом попытка ответить на вопросы: «Какие задачи эта технология решает лучше аналогов?» и «Как это работает?» с помощью аналогий с классической Java и простых метафор.
Применение
RxJava отлично заменяет Streams API из Java 8 на более ранних версиях Java. Так как Android Java 8 поддерживается далеко не с 4.0, Rx будет оптимальным решением. В статье RxJava рассматривается именно с этого ракурса, так как, по-моему, он наиболее понятный и по-настоящему реактивное приложение под Android с помощью чистой Rx реализовать сложно.
Emitter
Всем нам знаком паттерн Iterator.
interface Iterator<T> {
T next();
boolean hasNext();
}
За интерфейсом скрывается какой-нибудь источник данных, причём совершенно не важно, какой. Iterator полностью скрывает все детали реализации, предоставляя всего два метода:
next — получить следующий элемент
hasNext — узнать, есть ли ещё данные в источнике
У этого паттерна есть одна особенность: потребитель запрашивает данные и ждёт («зависает»), пока источник не выдаст их. Поэтому в качестве источника обычно выступает конечная, часто заранее сформированная коллекция.
Проведём небольшой рефакторинг.
interface Iterator<T> {
T getNext();
boolean isComplete();
}
Думаю, вы уже поняли, к чему я. Интерфейс Emitter из RxJava (для потребителей он дублируется в Observer (Subscriber в RxJava 1)):
interface Emitter<T> {
void onNext(T value);
void onComplete();
void onError(Throwable error);
}
Он похож на Iterator, но работает в обратную сторону: источник сообщает потребителю о том, что появились новые данные.
Это позволяет разрешить все проблемы с многопоточностью на стороне источника и, например, если вы проектируете UI, то вы сможете рассчитывать на то, что весь код, отвечающий за графический интерфейс — последовательный. Невероятно удобно. Прощайте, каллбэки! Скучать не буду.
Аналогия с Iterator взята из [1]
Sources
Теперь немного о самих источниках. Они бывают множества типов: Observable, Single, Maybe… И все они похожи на капусту (и монады, но это не так важно).
Потому что создав один источник, можно заворачивать его в другой источник, который можно ещё раз завернуть в ещё один источник и так до OutOfMemory. (Но так как обычный источник весит меньше 100 байт, скорее, пока заряд не кончится.)
Давайте завернём в источник ответ на тот самый вопрос.
Observable.just(42)
Как мы знаем, получение ответа — довольно долгая операция. Поэтому завернём в источник, который выполнит вычисления в специальном потоке:
Observable.just(42)
.subscribeOn(computation())
А ещё мы хотим, чтобы приложение не упало при ответе. Заворачиваем в источник, который вернёт ответ в главном потоке:
Observable.just(42)
.subscribeOn(computation())
.observeOn(mainThread())
И, наконец, запускаем:
Observable.just(42)
.subscribeOn(computation())
.observeOn(mainThread())
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer answer) {
System.out.print(answer);
}
@Override public void onComplete() {}
@Override public void onError(Throwable e) {}
});
В консоль вывелся ответ, но что же произошло?
Метод subscribe определён в Observable. Он делает проверки и подготовку, а затем вызывает метод subscribeActual, который уже по-разному определён для разных источников.
В нашем случае метод subscribe вызвал метод subscribeActual у ObservableObserveOn, который вызывает метод subscribe завёрнутого в него источника, уточнив, в какой поток нужно вернуть результат.
В ObservableObserveOn лежит ObservableSubscribeOn. Его subscribeActual запускает subscribe завёрнутого в заданном потоке.
И, наконец, в ObservableSubscribeOn завёрнут ObservableJust, который просто выдаёт в onNext своё значение.
Естественно, просто с числом не интересно. Поэтому вот источник, который получает список товаров и узнаёт для них цены. Цены можно получать только по 20 штук (у InAppBilling API такое же ограничение).
→ github.com/a-dminator/rx-products-and-prices
Этот пример создан для демонстрации принципа работы, а не для использования в реальных проектах.
В RxJava огромное количество разных реализаций источников. Все они работают по одному принципу, а детали отлично описаны в документации. Поэтому не буду останавливаться на них.
Операции
Все операции над источниками делятся на 2 типа:
— Не терминальные возвращают новый источник, который завернул исходный
— Терминальные исполняют цепочку и получают данные (subscribe, map...)
И да, ничего не исполнится, пока не будет выполнена терминальная операция. Цепочка может сколько угодно лежать в памяти, не делая вообще ничего. И это хорошо, потому что если мы не получаем данные, то зачем их производить? (Ленивые вычисления без Haskell в комплекте!).
По аналогии со Streams API из [2]
Dispose (Unsubscribe в RxJava 1)
Исполнение цепочки можно прервать. Делается это вызовом dispose() у DisposableObserver (unsubscribe() у Subscriber в RxJava 1).
После этого RxJava прекратит исполнение цепочек, отпишет всех Observer'ов и вызовет iterrupt() у потоков, которые больше не нужны.
Так же можно узнать, не прервано ли исполнение из источников. Для этого у Emitter есть метод isDispose() (isUnsubscribe() для RxJava 1).
У этого есть логичная, но неприятная особенность: так как Observer отвечает за обработку ошибок, теперь все ошибки крашат приложение. Я пока не нашёл решения, о котором готов написать.
Заключение
RxJava:
— Позволяет легко компоновать запросы к сети, базе данных и т.д; организуя их асинхронное выполнение. Это означает, что ваши пользователи получат более быстрое и отзывчивое приложение.
— Не содержит в себе никакой магии. Только составление и исполнение цепочек источников.
— (Для меня) Решает больше проблем, чем создаёт!
Всем спасибо.
Автор: adev_one