Теперь, когда мы понимаем основные принципы Rx, настало время научиться создавать и управлять последовательностями. Стиль управления последовательностями был позаимствован у оригинального C# LINQ, который в свою очередь был вдохновлен функциональным программироанием. Мы поделим все операции по темам, которые отсортированы в порядке возрастания сложности операций. Большинство операторов Rx управляют уже существующими последовательностями, но для начала мы научимся их создавать.
Содержание:
- Часть первая – Вступление
- Часть вторая – Последовательности
- Создание последовательности
- Фильтрация последовательности
- Исследование
- Агрегация
- Трансформация
- Часть третья – Управление последовательностями
- Часть четвертая – Параллельность
Часть 2 — Основы последовательностей
Создание последовательности
Ранее, мы использовали Subject
'ы и вручную подавали на них значения чтобы создать последовательность. Мы делали так чтобы продемонстрировать некоторые ключевые моменты, в том числе основной Rx метод subscribe
. В большинстве случаев Subject
это не лучший способ создать новый Observable
. В этом разделе мы рассмотрим более элегантные способы сделать это.
Простые фабричные методы
Observable.just
just
создает Observable
, который выдаст определенное заранее количество значений, после чего завершится.
Observable<String> values = Observable.just("one", "two", "three");
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: one
Received: two
Received: three
Completed
Observable.empty
Этот Observable
выдаст только событие onCompleted
и больше ничего.
Observable<String> values = Observable.empty();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Completed
Observable.never
Этот Observable
никогда ничего не выдаст.
Observable<String> values = Observable.never();
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Код выше ничего не напечатает. Но это не означает что программа блокируется. По-факту она просто мгновенно завершается.
Observable.error
Этот Observable
выдаст событие onError и завершится.
Observable<String> values = Observable.error(new Exception("Oops"));
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Error: java.lang.Exception: Oops
Observable.defer
defer
не создает новый Observable
, но позволяет определить каким образом Observable
будет создан при появлении подписчиков. Подумайте, как бы вы создали Observable
который будет выдавать текущее время? Так как значение только одно, похоже, что здесь нам может помочь just
.
Observable<Long> now = Observable.just(System.currentTimeMillis());
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);
1431443908375
1431443908375
Обратите внимание как второй подписчик, подписавшись на секунду позже, получил такое же время. Это происходит потому что значение времени было вычислено лишь единажды: когда выполнение доходит до метода just
. Однако в нашем случае мы хотим вычислять текущее время при каждой подписке. defer
принимает функцию, которая возвращает Observable
и будет выполнена для каждого нового подписчика.
Observable<Long> now = Observable.defer(() ->
Observable.just(System.currentTimeMillis()));
now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);
1431444107854
1431444108858
Observable.create
create
это очень мощный метод создания Observable
.
static <T> Observable<T> create(Observable.OnSubscribe<T> f)
Все намного проще чем выглядит. Внутри всего лишь функция, которая принимает Subscriber
для типа T
. Внутри нее мы можем вручную определить события, которые будут выдаваться подписчику.
Observable<String> values = Observable.create(o -> {
o.onNext("Hello");
o.onCompleted();
});
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: Hello
Completed
Когда кто-нибудь подпишется на наш Observable
(в данном случае values
), соответствующий экземпляр Subscriber
будет передан в функцию create
. По мере выполнения кода, значения будут переданы подписчику. Следует обратить внимание, что нужно самостоятельно вызывать метод onCompleted
чтобы просигнализировать окончание последовательности.
Данный метод является рекомендуемым способом создания Observable
в случае если ни один из других способов не подходит. Это похоже на то, как мы создавали Subject
и вручную подавали на него значения, однако есть несколько важных отличий. В первую очередь, источних событий аккуратно инкапсулирован и отделен от другого кода. Во-вторых, Subject
'ы имеют неочевидные опасности: любой кто имеет доступ к обьекту сможет изменить последовательность. Мы еще вернемся к этой проблеме позже.
Еще одним ключевым отличием от использования Subject
является то, что код выполняется "lazily", только тогда когда прибывает новый подписчик. В примере выше, код выполняется не в момент создания Observable
(так как подписчиков еще нет), а в момент вызова метода subscribe
. Это означает, что значения будет вычислены заново для каждого подписчика, как в ReplaySubject
. Конечный результат похож на ReplaySubject
, за исключением кеширования. С помощью create
мы также можем легко перенести выполнение в отделный поток, в то время как с ReplaySubject
нам приходилось бы вручную создавать потоки для вычисления значений. Мы еще рассмотрим способы сделать выполнение метода onSubscribe
параллельным.
Вы уже могли заметить что любой из предыдущих Observable
можно реализовать при помощи Observable.create
. Наш пример для create
эквивалентен Observable.just("hello")
.
Функциональные методы
В функциональном программировании обычным делом является создание бесконечных последовательностей.
Observable.range
Простой и знакомый функциональным программистам метод. Выдает значения из заданного диапазона.
Observable<Integer> values = Observable.range(10, 15);
Этот пример последовательно выдает значения от 10 до 24.
Observable.interval
Эта функция создаст бесконечную последовательность значений, отделенных заданным интервалом времени.
Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
System.in.read();
Received: 0
Received: 1
Received: 2
Received: 3
...
Последовательность не завершится до тех пор пока мы не отпишемся.
Следует обратить внимание почему блокирующий ввод в конце примера обязателен. Без него программа завершится ничего не напечатав. Это происходит потому, что все наши операции являются неблокирующими: мы создаем периодически выдающий значения Observable
, затем регистрируем подписчика, который выполняет некоторые действия в момент прибытия этих значений. Ничто из этого не блокирует главный поток от завершения.
Observable.timer
Существует две перегрузки Observable.timer
. Первый вариант создает Observable
выдающий 0L
через заданный промежуток времени.
Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 0
Completed
Второй вариант ожидает заданный промежуток времени, затем начинает выдавать значения так же как interval
с заданной частотой.
Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 0
Received: 1
Received: 2
...
Пример выше ждет 2 секунды, затем начинает считать каждую секунду.
Превращение в Observable
В java существуют инструменты для работы с последовательностями, коллекциями и асинхронными событиями, которые могут не иметь прямой совместимости с Rx. Сейчас мы рассмотрим каким образом можно превратить их во входящие данные вашего Rx кода.
Если вы используете EventHandler'ы, то с помощь Observable.create
из событий можно создать последовательность.
Observable<ActionEvent> events = Observable.create(o -> {
button2.setOnAction(new EventHandler<ActionEvent>() {
@Override public void handle(ActionEvent e) {
o.onNext(e)
}
});
})
В зависимости от конкретного события, его тип (в данном случае ActionEvent
) сам по себе может нести достаточно информации чтобы стать типом вашего Observable
. Однако, очень часто вам может понадобиться что-нибудь другое, например, значение некого поля в момент события. Получить значение такого поля лучше всего внутри хендлера, пока UI поток заблокирован и значения поля актуально. И хотя не существует гарантий, что значение останется неизменным до достижения конечного подписчика, в правильно реализованном Rx коде изменения контролируются на стороне потребителя [1].
Observable.from
Вы можете превратить любые входные данные в Observable
при помощи create
. Однако, для распространенных типов данных, существуют уже готовые методы, призванные облегчить этот процесс.
Future
'ы являются частью Java и вы должно быть сталкивались с ними во время работы с фреймворками использующими многопоточность. Они являются менее мощным многопоточным инструментом чем Rx, так как возвращают только одно значение. Как правило, вы захотите превратить их в Observable
.
FutureTask<Integer> f = new FutureTask<Integer>(() -> {
Thread.sleep(2000);
return 21;
});
new Thread(f).start();
Observable<Integer> values = Observable.from(f);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 21
Completed
Observable
выдает результат FutureTask
по-готовности, после чего завершается. Если задача была отменена, observable выдаст ошибку java.util.concurrent.CancellationException
.
Если вы заинтересованы в результате Future
только ограниченное время, существует возможность задать таймаут в качестве аргумента.
Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);
Если за это время Future
не завершится, observable проигнорирует результат и выдаст TimeoutException
.
С помощью Observable.from
можно превратить любую коллекцию в последовательность. Будет создан Observable
, выдающий каждый элемент коллекции по-отдельности и onCompleted
в конце.
Integer[] is = {1,2,3};
Observable<Integer> values = Observable.from(is);
Subscription subscription = values.subscribe(
v -> System.out.println("Received: " + v),
e -> System.out.println("Error: " + e),
() -> System.out.println("Completed")
);
Received: 1
Received: 2
Received: 3
Completed
Observable
это не то же что Iterable
или Stream
. Observable
push-ориентированный, в том смысле, что вызов onNext
провоцирует стек обработчиков выполниться вплоть до последнего subscribe
метода. Остальные модели pull-ориентированные — значения в них запрашиваются с другой стороны и выполнение блокируется до возвращения результата.
[1] consumer, тот, кто поглащает значения выданные Observable
Теперь у проекта есть свой публичный репозиторий и любой желающий может присоединится к созданию углубленного русскоязычного туториала по Rx. Перевод этой части уже там, остальные появятся в скором времени, а с вашей помощью, еще быстрее.
Автор: bolein95