RxJava — это реализация ReactiveX для Java — библиотеки для асинхронной обработки потоков данных. Паттерн observable на стероидах, как они сами пишут. В интернете, в том числе на Хабре, есть много «введений в RxJava». Я хочу привести несколько примеров реальных задач. Они не очень сложные, но возможно кто-то увидит какие-то сходства со своими и задумается.
Собственно, задачи:
1. Простое клиентское TCP-соединение. Есть протокол поверх TCP/IP, нужно сформировать сообщение, подключиться к удаленному узлу, если еще не подключился, передать сообщение и прочитать ответ. Плюс обработка ошибок, проверка таймаутов, повтор отправки в случае неудачи. Жестких требований к производительности нет, трафик не большой.
2. Есть двигатель и некоторый датчик. Нужно произвести сканирование — пройтись двигателем по заданной траектории: послать двигатель к точке, дождаться, когда он к ней приедет, снять показания датчика, отобразить точку на графике (в GUI потоке), поехать к следующей точке…
3. Полученные после сканирования данные нужно обработать (условно длительный вычислительный процесс) и засунуть в pdf-отчет (условно длительный процесс ввода-вывода) вместе с изображением графика и данными введенными пользователем (GUI поток).
1. Простое клиентское TCP-соединение
Предположим, есть некоторый протокол обмена сообщениями. Сообщения могут содержать заголовок, контрольную сумму или что-нибудь еще. На каждое сообщение должен быть ответ от сервера. В простейшем виде решение может выглядеть как-то так:
public String send(String command) {
try {
if (!isConnected()) {
connect();
}
byte[] bytes = command.getBytes();
bytes = addHeader(bytes);
sendBytes(bytes);
return readAnswer();
} catch (IOException e) {
// паника
}
}
Детали реализации я не описываю, но вкратце: connect() создает java.net.Socket и подключается к серверу, sendBytes() пишет в output-поток сокета, readAnswer() читает из input-потока сокета. Помимо addHeader() могут быть еще методы, добавляющие контрольную сумму, кодирование и прочее.
Проблемы этого кода: блокирующие запись/чтение и неудобная обработка ошибок — не понятно, что делать с исключением: то ли наверх пробрасывать, то ли тут что-то сделать (рекурсивно повторить отправку?). Как раз эти две проблемы и решает RxJava. Перепишем:
public Observable<String> send(String command) {
return Observable.just(command)
.doOnNext(cmd -> checkConnection())
.map(cmd -> cmd.getBytes())
.map(bytes -> addHeader(bytes))
.map(bytes -> sendBytes(bytes))
.map(result -> readAnswer());
}
Применение:
connection.send("echo 123")
.subscribe(
answer -> { /*обработать ответ*/ },
throwable -> { /*обработать ошибку*/ }
);
В общем-то, получилось то же самое, только в виде монады, набора операторов и с рядом нюансов.
Во-первых, метод sendBytes() теперь возвращает boolean. RxJava работает с потоками данных, а если кто-то возвращает void вместо данных, то потока как-бы уже и нет. Поэтому нужно либо добавить возвращаемый результат в метод (хотя бы return true), либо вместо map использовать doOnNext — этот оператор возвращает то же, что и получил.
Во-вторых, метод send() теперь возвращает Observable, а не сам String. Поэтому нужен отдельный обработчик ответа (или лямбда, как в примере). С исключением то же самое. Тут нужно, как говорится, начать мыслить асинхронно. Вместо самого результата, мы получаем объект, который потом когда-нибудь предоставит нам результат, а мы должны предоставить ему того, что этот результат получит. Вот только этот код все еще блокирующий, поэтому это асинхронное
Улучшим этот код. Начнем с обработки ошибок. RxJava отлавливает исключения, возникающие в операторах, и передает их подписчику. Второй аргумент метода subscribe() — это функциональный интерфейс Action1 — он как раз и отвечает за обработку исключения. Если какой-то из методов раньше мог кидать IOException или какое-то еще checked исключение, то теперь больше нельзя. Такие исключения нужно ловить руками и что-то с ними делать. Например, оборачивать в RuntimeException, чтобы предоставить дальнейшие решения RxJava. Но Action1 не сильно отличается от обычного try-catch подхода. У RxJava есть операторы для работы с ошибками: doOnError(), onErrorReturn(), onErrorResumeNext() и onExceptionResumeNext(). А еще есть банальный retry(), который тут как раз и нужен. Если возникла какая-то ошибка с подключением, то можно просто повторить отправку n-раз.
public Observable<String> send(String command) {
return Observable.just(command)
.doOnNext(cmd -> checkConnection())
.map(cmd -> cmd.getBytes())
.map(bytes -> addHeader(bytes))
.map(bytes -> sendBytes(bytes))
.map(result -> readAnswer())
.doOnError(throwable -> disconnect())
.retry(MAX_RETRY_COUNT);
}
Обработчик исключения, передаваемый в subscribe() будет вызван только в том случае, если все повторы закончатся с ошибкой. Для надежности еще вызываем disconnect() перед повторной попыткой, чтобы закрыть и обнулить сокет. Иначе в checkConnection() внутри при вызове isConnected() можем получить ложно положительное срабатывание, и все повторные попытки опять приведут к ошибке. Например, если сервер убил подключение по таймауту, то метод Socket.isConnected() на стороне клиента все еще будет возвращать true — со стороны клиента сокет же подключен, все норм.
Можно еще добавить таймаут на случай, если серверу поплохело, и клиент заблокировался на записи в сокет:
public Observable<String> send(String command) {
return Observable.just(command)
.doOnNext(cmd -> checkConnection())
.map(cmd -> cmd.getBytes())
.map(bytes -> addHeader(bytes))
.map(bytes -> sendBytes(bytes))
.timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.map(result -> readAnswer())
.doOnError(throwable -> disconnect())
.retry(MAX_RETRY_COUNT);
}
Оператор timeout кидает исключение, если в течение заданного времени от Observable не поступило ни одного элемента. А исключения мы уже умеем обрабатывать.
Теперь вторая проблема — у нас все еще блокирующие операции, поэтому если вызывать send() из потока GUI, то можно получить подвисания интерфейса. Нужно просто сказать RxJava, чтобы все эти действия выполнялись в другом потоке.
Для этого есть операторы observeOn() и subscribeOn(). У многих людей возникают проблемы с пониманием, чем отличаются эти операторы — есть куча статей на эту тему и вопросов на stackoverflow. Давайте вновь поднимем эту тему и вместе подумаем, что нам нужно сейчас использовать. Вот что пишут в официальной документации:
SubscribeOn — specify the Scheduler on which an Observable will operate.
ObserveOn — specify the Scheduler on which an observer will observe this Observable.
Observable — это тот, кто поставляет данные. Observer — это тот, кто получает данные и что-то с ними делает. Нам нужно, чтобы все выполнялось в другом потоке. Вернее, нам нужно, чтобы наш Observable поставлял данные изначально в другом потоке. А раз данные поставляются в другом потоке, то и все observer'ы будут их обрабатывать в другом потоке. Это по определению subscribeOn() — он определяет планировщика для Observable, которого мы создали в самом начале:
public Observable<String> send(String command) {
return Observable.just(command)
.doOnNext(cmd -> checkConnection())
.map(cmd -> cmd.getBytes())
.map(bytes -> addHeader(bytes))
.map(bytes -> sendBytes(bytes))
.timeout(MAX_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS)
.map(result -> readAnswer())
.doOnError(throwable -> disconnect())
.retry(MAX_RETRY_COUNT)
.subscribeOn(Schedulers.io());
}
Стоит иметь в виду, что тогда и обработка ответа подписчиком (он тоже observer) будет осуществляться в другом потоке, а это не всегда хорошо. Например, если мы хотим отобразить ответ в графическом интерфейсе, то скорее всего получим исключение, что мы это делаем не в главном потоке. Для этого нужно поместить обработчик в очередь событий фреймворка, отвечающего за графический интерфейс. В разных фреймворках это делается по-разному. В JavaFX для этого есть метод Platform.runLater(runnable). Можно вызывать его напрямую в обработчике ответа, а можно написать свой планировщик:
public final class FxScheduler extends Scheduler {
private final static FxScheduler m_instance = new FxScheduler();
private FxScheduler() {}
public static FxScheduler getInstance() {
return m_instance;
}
@Override
public Worker createWorker() {
return new Worker() {
private final CompositeSubscription m_subscription = new CompositeSubscription();
@Override
public Subscription schedule(Action0 action0) {
Platform.runLater(action0::call);
return m_subscription;
}
@Override
public Subscription schedule(Action0 action0, long delay, TimeUnit timeUnit) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
Platform.runLater(action0::call);
}
}, timeUnit.toMillis(delay));
return m_subscription;
}
@Override
public void unsubscribe() {
m_subscription.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return m_subscription.isUnsubscribed();
}
};
}
}
К слову, для Android существует AndroidSchedulers.mainThread() в RxAndroid — дополнении для RxJava. Пример отправки команды тогда имеет вид:
send("echo 123")
.observeOn(FxScheduler.getInstance())
.subscribe(
answer -> { /*обработать ответ*/ },
throwable -> { /*обработать ошибку*/ }
);
Здесь мы используем уже observeOn() — нам нужно сообщить RxJava, что «следующий observer должен выполняться через такой-то планировщик».
RxJava предоставляет удобное управление конвейером операторов. Рядом с .map(bytes -> sendBytes(bytes)) можно добавить расчет контрольной суммы, потом прогнать байты через кодирование. Можно добавить вначале логгирование исходящей команды, а в конце — полученного ответа. В общем, идею вы поняли.
2. Сканирование с помощью двигателя и датчика
Есть набор точек — это могут быть углы поворота двигателя в градусах или позиции устройства, которое приводится в движение двигателем. В общем, есть какой-то актуатор. А еще есть внешний датчик, с которого можно получать значения. Нужно проехаться двигателем по набору точек, в каждой точке получить значение с датчика, построить кривую на графике. Повторить процедуру n раз (n кривых на графике). При этом двигатель работает не мгновенно, нужно ждать, когда он выйдет на позицию.
Итак, мы имеем набор точек, для каждой нужно что-то сделать (желательно в другом потоке), а результат обрабатывается в GUI потоке (добавить точку на LineChart, например). Похоже на типичную задачу для RxJava.
public Observable<Point> startScan(List<Double> trajectory, int iterationCount) {
return Observable.from(trajectory)
.subscribeOn(Schedulers.io())
.doOnNext(this::moveMotor)
.doOnNext(this::blockUntilTargetReached)
.map(this::createResultPoint)
.repeat(iterationCount);
}
Используем Schedulers.io(): управление двигателем и датчиком — это все-таки операции ввода-вывода. moveMotor() посылает команду двигателю (через написанный ранее Connection, например).
blockUntilTargetReached() запрашивает у двигателя его позицию, сравнивает с целевой и усыпляет поток на сколько-то миллисекунд, если двигатель еще не доехал. createResultPoint() запрашивает у датчика значение в возвращает объект класса Point, содержащий пару чисел — целевую позицию и значение с датчика. repeat() работает почти как retry() — он повторяет весь поток с самого начала каждый раз, а retry() только после ошибки.
Исходный Observable будет выдавать точки по одной. Следующую точку он выдаст только когда предыдущая пройдет все операторы вплоть до подписчика. Это соответствует функциональному подходу с его ленивыми вычислениями и потоковой обработкой. Таким же образом работают StreamAPI и LINQ. За счет этого сканирование будет идти по точкам по очереди, а не forEach(this::moveMotor), затем forEach(this::blockUntilTargetReached) и так далее.
Применение:
final List<Double> trajectory = ...;
final int n = ...;
startScan(trajectory, n)
.observeOn(FxScheduler.getInstance())
.subscribe(
point -> processPoint(point),
throwable -> processError(throwable),
() -> processData()
);
Проблема в том, что подписчик не отличает на каком именно повторе была получена точка. То есть вместо n кривых, мы получим одну кривую в n раз длиннее. Нужно как-то вручную отслеживать, что началось новое сканирование. Например, считать количество точек и начинать новую кривую, если значение счетчика превысило количество точек в траектории. Или сравнивать пришедшую точку с первой точкой траектории.
В subscribe() появился третий аргумент — это обработчик onComplete(), который вызывается, когда в Observable закончились элементы.
subscribe() возвращает объект, имеющий интерфейс Subscription. Если вызвать у него метод unsubscribe(), то у Observable больше не будет подписчика, принимающего данные, поэтому он просто перестанет их выдавать. Принцип ленивых вычислений — если данные никому не нужны, то не нужно их передавать. Побочных эффектов у операторов все равно не должно быть в соответствии с парадигмой функционального программирования, поэтому просто выполнять операторы без подписчика у Observable смысла нет. С помощью unsubscribe() можно реализовать отмену сканирования. Разве что еще двигателю нужно послать команду на останов движения — за это unsubscribe() не отвечает.
3. Обработка данных и отчет
Мы получили после сканирования много полезных данных, теперь их нужно обработать, рассчитать нужные значения и сгенерировать pdf-отчет.
В отчете так же должны быть значения некоторых полей из интерфейса (например, ФИО пользователя) и рисунок полученных графиков. В случае JavaFX рисунок можно получить методом snapshot(), который есть у каждого графического объекта. Так как это действия с объектами JavaFX, выполняться они должны в GUI-потоке. Для этого у нас уже есть FxScheduler.
class ReportMetaInfo {
private String fileName;
private String name;
private WritableImage image;
}
final Observable<ReportMetaInfo> reportGuiData = Observable.just(m_reportInfoProvider)
.subscribeOn(FxScheduler.getInstance())
.map(provider -> {
final ReportMetaInfo info = new ReportMetaInfo();
info.fileName = m_reportInfoProvider.getFileName();
info.name = m_reportInfoProvider.getName();
info.image = m_reportInfoProvider.getChartSnapshot();
return info;
});
m_reportInfoProvider — это реализация интерфейса ReportInfoProvider — прослойки между моделью и представлением. По сути это вызов геттеров из TextView, но модели все равно — у нее просто интерфейс.
Для расчетов есть Schedulers.computation().
final Observable<ScanResult> reportComputationalData = Observable.just(scanData)
.subscribeOn(Schedulers.computation())
.map(data -> new ResultProcessor(data).calculateAll());
Теперь мы хотим объединить данные из формы и данные из расчетов и поместить все это в тяжелый pdf-файл. Для этого есть оператор zip() и Schedulers.io():
class ReportData {
ReportMetaInfo metaInfo;
ScanResult result;
ReportData(ReportMetaInfo metaInfo, ScanResult result) {
this.metaInfo = metaInfo;
this.result = result;
}
}
Observable.zip(
reportGuiData,
reportComputationalData,
(reportInfo, scanResult) -> new ReportData(reportInfo, scanResult)
)
.observeOn(Schedulers.io())
.map(reportData -> ReportGenerator.createPdf(
reportData.metaInfo.fileName,
reportData.metaInfo.name,
reportData.metaInfo.image,
reportData.result
)).subscribe(
isOk -> { /* здесь, в общем-то, делать нечего */ },
throwable -> { /* что-то пошло не так */ },
() -> { /* здесь мы окажемся, если все прошло успешно */ }
);
zip() принимает до девяти разных Observable и соединяет элементы из них в кортежи. Функцию для соединения нужно предоставить самому как и результирующий тип для кортежей. В итоге получение данных из интерфейса (включая изображение графика) и обработка результатов сканирования проходят параллельно. Нужно ли распараллеливание таких действий, зависит от конкретных задач и объемов данных — я привел несколько упрощенный пример.
Стоит иметь в виду, что когда у нас несколько потоков данных, может проявляться backpressure. Это различные проблемы связанные с разной производительностью потоков и разной производительностью Observable и Observer. В общем, это ситуации, когда кто-то простаивает, а у кого-то уже через буфер переливается. Так что нужно быть аккуратным.
Заключение
Скорее всего для этих задач есть другие решения (и более эффективные) — если кто-то мне их укажет, я с радостью приму это к сведению и учту в работе. На примере этих задач я постарался показать некоторые особенности RxJava: обработка ошибок, отличие subscribeOn() и observeOn(), кастомные планировщики и получение результата в GUI-потоке, принцип ленивых вычислений и его применение для управления внешними устройствами, прерывание работы Observable, параллельная работа нескольких Observable и их объединение. Так что даже если эти задачи не совсем удачные для RxJava, сами рассмотренные принципы могут быть полезны для других.
Автор: fck_r_sns