Введение в RxJava: Создание последовательности

в 11:27, , рубрики: java, reactive programming, rxjava

image

Теперь, когда мы понимаем основные принципы Rx, настало время научиться создавать и управлять последовательностями. Стиль управления последовательностями был позаимствован у оригинального C# LINQ, который в свою очередь был вдохновлен функциональным программироанием. Мы поделим все операции по темам, которые отсортированы в порядке возрастания сложности операций. Большинство операторов Rx управляют уже существующими последовательностями, но для начала мы научимся их создавать.

Содержание

Содержание:

  • Часть первая – Вступление
    1. Почему Rx?
    2. Ключевые типы
    3. Жизненный цикл подписки
  • Часть вторая – Последовательности
    1. Создание последовательности
    2. Фильтрация последовательности
    3. Исследование
    4. Агрегация
    5. Трансформация
  • Часть третья – Управление последовательностями
  • Часть четвертая – Параллельность

Часть 2 — Основы последовательностей

Создание последовательности

Ранее, мы использовали Subject'ы и вручную подавали на них значения чтобы создать последовательность. Мы делали так чтобы продемонстрировать некоторые ключевые моменты, в том числе основной Rx метод subscribe. В большинстве случаев Subject это не лучший способ создать новый Observable. В этом разделе мы рассмотрим более элегантные способы сделать это.

Простые фабричные методы

Observable.just

just создает Observable, который выдаст определенное заранее количество значений, после чего завершится.

Observable<String> values = Observable.just("one", "two", "three");
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);  

Вывод

Received: one
Received: two
Received: three
Completed

Observable.empty

Этот Observable выдаст только событие onCompleted и больше ничего.

Observable<String> values = Observable.empty();
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Completed

Observable.never

Этот Observable никогда ничего не выдаст.

Observable<String> values = Observable.never();
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Код выше ничего не напечатает. Но это не означает что программа блокируется. По-факту она просто мгновенно завершается.

Observable.error

Этот Observable выдаст событие onError и завершится.

Observable<String> values = Observable.error(new Exception("Oops"));
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Error: java.lang.Exception: Oops

Observable.defer

defer не создает новый Observable, но позволяет определить каким образом Observable будет создан при появлении подписчиков. Подумайте, как бы вы создали Observable который будет выдавать текущее время? Так как значение только одно, похоже, что здесь нам может помочь just.

Observable<Long> now = Observable.just(System.currentTimeMillis());

now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);

Вывод

1431443908375
1431443908375

Обратите внимание как второй подписчик, подписавшись на секунду позже, получил такое же время. Это происходит потому что значение времени было вычислено лишь единажды: когда выполнение доходит до метода just. Однако в нашем случае мы хотим вычислять текущее время при каждой подписке. defer принимает функцию, которая возвращает Observable и будет выполнена для каждого нового подписчика.

Observable<Long> now = Observable.defer(() ->
        Observable.just(System.currentTimeMillis()));

now.subscribe(System.out::println);
Thread.sleep(1000);
now.subscribe(System.out::println);

Вывод

1431444107854
1431444108858

Observable.create

create это очень мощный метод создания Observable.

static <T> Observable<T> create(Observable.OnSubscribe<T> f)

Все намного проще чем выглядит. Внутри всего лишь функция, которая принимает Subscriber для типа T. Внутри нее мы можем вручную определить события, которые будут выдаваться подписчику.

Observable<String> values = Observable.create(o -> {
    o.onNext("Hello");
    o.onCompleted();
});
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Received: Hello
Completed

Когда кто-нибудь подпишется на наш Observable (в данном случае values), соответствующий экземпляр Subscriber будет передан в функцию create. По мере выполнения кода, значения будут переданы подписчику. Следует обратить внимание, что нужно самостоятельно вызывать метод onCompleted чтобы просигнализировать окончание последовательности.

Данный метод является рекомендуемым способом создания Observable в случае если ни один из других способов не подходит. Это похоже на то, как мы создавали Subject и вручную подавали на него значения, однако есть несколько важных отличий. В первую очередь, источних событий аккуратно инкапсулирован и отделен от другого кода. Во-вторых, Subject'ы имеют неочевидные опасности: любой кто имеет доступ к обьекту сможет изменить последовательность. Мы еще вернемся к этой проблеме позже.

Еще одним ключевым отличием от использования Subject является то, что код выполняется "lazily", только тогда когда прибывает новый подписчик. В примере выше, код выполняется не в момент создания Observable (так как подписчиков еще нет), а в момент вызова метода subscribe. Это означает, что значения будет вычислены заново для каждого подписчика, как в ReplaySubject. Конечный результат похож на ReplaySubject, за исключением кеширования. С помощью create мы также можем легко перенести выполнение в отделный поток, в то время как с ReplaySubject нам приходилось бы вручную создавать потоки для вычисления значений. Мы еще рассмотрим способы сделать выполнение метода onSubscribe параллельным.

Вы уже могли заметить что любой из предыдущих Observable можно реализовать при помощи Observable.create. Наш пример для create эквивалентен Observable.just("hello").

Функциональные методы

В функциональном программировании обычным делом является создание бесконечных последовательностей.

Observable.range

Простой и знакомый функциональным программистам метод. Выдает значения из заданного диапазона.

Observable<Integer> values = Observable.range(10, 15);

Этот пример последовательно выдает значения от 10 до 24.

Observable.interval

Эта функция создаст бесконечную последовательность значений, отделенных заданным интервалом времени.

Observable<Long> values = Observable.interval(1000, TimeUnit.MILLISECONDS);
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);
System.in.read();

Вывод

Received: 0
Received: 1
Received: 2
Received: 3
...

Последовательность не завершится до тех пор пока мы не отпишемся.

Следует обратить внимание почему блокирующий ввод в конце примера обязателен. Без него программа завершится ничего не напечатав. Это происходит потому, что все наши операции являются неблокирующими: мы создаем периодически выдающий значения Observable, затем регистрируем подписчика, который выполняет некоторые действия в момент прибытия этих значений. Ничто из этого не блокирует главный поток от завершения.

Observable.timer

Существует две перегрузки Observable.timer. Первый вариант создает Observable выдающий 0L через заданный промежуток времени.

Observable<Long> values = Observable.timer(1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Received: 0
Completed

Второй вариант ожидает заданный промежуток времени, затем начинает выдавать значения так же как interval с заданной частотой.

Observable<Long> values = Observable.timer(2, 1, TimeUnit.SECONDS);
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Received: 0
Received: 1
Received: 2
...

Пример выше ждет 2 секунды, затем начинает считать каждую секунду.

Превращение в Observable

В java существуют инструменты для работы с последовательностями, коллекциями и асинхронными событиями, которые могут не иметь прямой совместимости с Rx. Сейчас мы рассмотрим каким образом можно превратить их во входящие данные вашего Rx кода.

Если вы используете EventHandler'ы, то с помощь Observable.create из событий можно создать последовательность.

Observable<ActionEvent> events = Observable.create(o -> {
    button2.setOnAction(new EventHandler<ActionEvent>() {
        @Override public void handle(ActionEvent e) {
            o.onNext(e)
        }
    });
})

В зависимости от конкретного события, его тип (в данном случае ActionEvent) сам по себе может нести достаточно информации чтобы стать типом вашего Observable. Однако, очень часто вам может понадобиться что-нибудь другое, например, значение некого поля в момент события. Получить значение такого поля лучше всего внутри хендлера, пока UI поток заблокирован и значения поля актуально. И хотя не существует гарантий, что значение останется неизменным до достижения конечного подписчика, в правильно реализованном Rx коде изменения контролируются на стороне потребителя [1].

Observable.from

Вы можете превратить любые входные данные в Observable при помощи create. Однако, для распространенных типов данных, существуют уже готовые методы, призванные облегчить этот процесс.

Future'ы являются частью Java и вы должно быть сталкивались с ними во время работы с фреймворками использующими многопоточность. Они являются менее мощным многопоточным инструментом чем Rx, так как возвращают только одно значение. Как правило, вы захотите превратить их в Observable.

FutureTask<Integer> f = new FutureTask<Integer>(() -> {
    Thread.sleep(2000);
    return 21;
});
new Thread(f).start();

Observable<Integer> values = Observable.from(f);

Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Received: 21
Completed

Observable выдает результат FutureTask по-готовности, после чего завершается. Если задача была отменена, observable выдаст ошибку java.util.concurrent.CancellationException.

Если вы заинтересованы в результате Future только ограниченное время, существует возможность задать таймаут в качестве аргумента.

Observable<Integer> values = Observable.from(f, 1000, TimeUnit.MILLISECONDS);

Если за это время Future не завершится, observable проигнорирует результат и выдаст TimeoutException.

С помощью Observable.from можно превратить любую коллекцию в последовательность. Будет создан Observable, выдающий каждый элемент коллекции по-отдельности и onCompleted в конце.

Integer[] is = {1,2,3};
Observable<Integer> values = Observable.from(is);
Subscription subscription = values.subscribe(
    v -> System.out.println("Received: " + v),
    e -> System.out.println("Error: " + e),
    () -> System.out.println("Completed")
);

Вывод

Received: 1
Received: 2
Received: 3
Completed

Observable это не то же что Iterable или Stream. Observable push-ориентированный, в том смысле, что вызов onNext провоцирует стек обработчиков выполниться вплоть до последнего subscribe метода. Остальные модели pull-ориентированные — значения в них запрашиваются с другой стороны и выполнение блокируется до возвращения результата.

[1] consumer, тот, кто поглащает значения выданные Observable

Теперь у проекта есть свой публичный репозиторий и любой желающий может присоединится к созданию углубленного русскоязычного туториала по Rx. Перевод этой части уже там, остальные появятся в скором времени, а с вашей помощью, еще быстрее.

Автор: bolein95

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js