Основы реактивного программирования с использованием RxJS

в 14:46, , рубрики: javascript, rx, rxjs, реактивное программирование, реактивность

Часть 1. Реактивность и потоки

Данная серия статей посвящена реактивности и ее применению в JS с использованием такой замечательной библиотеки как RxJS.

Для кого эта статья: в основном, здесь я буду объяснять основы, поэтому в первую очередь статья рассчитана на новичков в данной технологии. Вместе с тем надеюсь, что и опытные разработчики смогут почерпнуть для себя что-то новое. Для понимания потребуются знания js(es5/es6).

Мотивация: впервые я столкнулся с RxJS, когда начал работать с angular. Именно тогда у меня возникли сложности с пониманием механизма реактивности. Сложности прибавлял еще тот факт, что на момент начала моей работы большинство статей было посвящено старой версии библиотеки. Пришлось читать много документации, различных мануалов, чтобы хоть что-то понять. И только спустя некоторое время я начал осознавать, как “все устроено”. Чтобы упростить жизнь другим, я решил разложить все по полочкам.

Что такое реактивность?

Сложно найти ответ на, казалось бы, такой распространенный термин. Если кратко: реактивность — это способность реагировать на какие-либо изменения. Но о каких изменениях идет речь? В первую очередь, об изменениях данных. Реактивный подход строится на push стратегии распространения изменений. Чтобы понять, что это такое, сначала придется разобраться, какие стратегии распространения изменений существуют. Рассмотрим пример:

let a = 2;

let b = 3;

let sum = a + b;

console.log(sum); // 5

a = 3;

console.log(sum); // 5 - данные нужно пересчитать

Данный пример демонстрирует привычную нам императивную парадигму программирования. Императивный подход основывается на стратегии pull. Pull стратегия подразумевает, что мы сами должны “стянуть” изменения. Т.е., чтобы узнать значение суммы a и b, мы должны заново его посчитать.

В отличие от pull стратегии, push стратегия подразумевает, что в случае изменения данных эти самые изменения будут “проталкиваться”, и зависимые от них данные будут автоматически обновляться. Вот как бы вел себя наш пример, если бы применялась push стратегия:

let a = 2;

let b = 3;

let sum = a + b;

console.log(sum); // 5

a = 3;

console.log(sum); // 6 - значение переменной sum автоматически пересчиталось

Данный пример показывает реактивный подход. Стоит отметить, что этот пример не имеет ничего общего с реальностью, я его привел лишь с целью показать разницу в подходах. Реактивный код в реальных приложениях будет выглядеть совсем иначе, и прежде чем перейти к практике, нам стоит поговорить еще об одной важной составляющей реактивности.

Поток данных

Если поискать в Википедии термин “реактивное программирование”, то сайт нам выдаст следующее определение: “Реактивное программирование — парадигма программирования, ориентированная на потоки данных и распространение изменений”. Из этого определения можно сделать вывод, что реактивность базируется на двух основных “китах”. Про распространение изменений я упоминал выше, поэтому дальше мы на этом останавливаться не будем. А вот про потоки данных следует поговорить подробнее. Посмотрим на следующий пример:

const input = document.querySelector('input'); // получаем ссылку на элемент

const eventsArray = [];
input.addEventListener('keyup',
   event => eventsArray.push(event)
); // пушим каждое событие в массив eventsArray

Мы слушаем событие keyup и кладем объект события в наш массив. Со временем наш массив может содержать тысячи объектов KeyboardEvent. При этом стоит отметить, что наш массив отсортирован по времени — индекс более поздних событий больше, чем индекс более ранних. Такой массив представляет собой упрощенную модель потока данных. Почему упрощенную? Потому что массив умеет только хранить данные. Еще мы можем проитерировать массив и как-то обработать его элементы. Но массив не может сообщить нам о том, что в него был добавлен новый элемент. Для того, чтобы узнать, были ли добавлены новые данные в массив, нам придется снова проитерировать его.

Но что, если бы наш массив умел сообщать нам о том, что в него поступили новые данные? Такой массив можно было бы с полной уверенностью назвать потоком. Итак, мы подошли к определению потока. Поток — это массив данных, отсортированных по времени, который может сообщать о том, что данные изменились.

Observable

Теперь, когда мы знаем, что такое потоки, давайте поработаем с ними. В RxJS потоки представлены классом Observable. Чтобы создать свой поток, достаточно вызвать конструктор данного класса и передать ему в качестве аргумента функцию подписки:

const observable = new Observable(observer => {
  observer.next(1);
  observer.next(2);
  observer.complete();
})

Через вызов конструктора класса Observable мы создаем новый поток. В качестве аргумента в конструктор мы передали функцию подписки. Функция подписки — это обычная функция, которая в качестве параметра принимает наблюдателя(observer). Сам наблюдатель представляет собой объект, у которого есть три метода:

  • next — выбрасывает новое значение в поток
  • error — выбрасывает в поток ошибку, после чего поток завершается
  • complete — завершает поток

Таким образом, мы создали поток, который испускает два значения и завершается.

Subscription

Если мы запустим предыдущий код, то ничего не произойдет. Мы лишь создадим новый поток и сохраним ссылку на него в переменную observable, но сам поток так никогда и не испустит ни одного значения. Это происходит потому, что потоки являются “ленивыми” объектами и ничего сами по себе не делают. Для того, чтобы наш поток начал испускать значения и мы могли бы эти значения обрабатывать, нам необходимо начать “слушать” поток. Сделать это можно, вызвав метод subscribe у объекта observable.

const observer = {
  next: value => console.log(value), // 1, 2
  error: error => console.error(error), //
  complete: () => console.log("completed") // completed
};

observable.subscribe(observer);

Мы определили нашего наблюдателя и описали у него три метода: next, error, complete. Методы просто логируют данные, которые передаются в качестве параметров. Затем мы вызываем метод subscribe и передаем в него нашего наблюдателя. В момент вызова subscribe происходит вызов функции подписки, той самой, которую мы передали в конструктор на этапе объявления нашего потока. Дальше будет выполняться код функции-подписки, которая передает нашему наблюдателю два значения, а затем завершает поток.

Наверняка, у многих возник вопрос, что будет, если мы подпишемся на поток еще раз? Будет все то же самое: поток снова передаст два значения наблюдателю и завершится. При каждом вызове метода subscribe будет происходить обращение к функции-подписке, и весь ее код будет выполняться заново. Отсюда можно сделать вывод: сколько бы раз мы не подписывались на поток, наши наблюдатели получат одни и те же данные.

Unsubscribe

Теперь попробуем реализовать более сложный пример. Напишем таймер, который будет отсчитывать секунды с момента подписки, и передавать их наблюдателям.

const timer = new Observable(observer => {
  let counter = 0; //объявляем счетчик
  setInterval(() => {
    observer.next(counter++); // передаем значение счетчика наблюдателю и увеличиваем его на единицу
  }, 1000);
});

timer.subscribe({
  next: console.log //просто логируем каждое значение
});

Код получился достаточно простой. Внутри функции-подписки мы объявляем переменную счетчик(counter). Затем, используя замыкание, получаем доступ к переменной из стрелочной функции в setInterval. И каждую секунду передаем переменную наблюдателю, после чего инкрементируем ее. Дальше подписываемся на поток, указываем только один метод — next. Не стоит переживать, что другие методы мы не объявили. Ни один из методов наблюдателя не является обязательным. Мы даже можем передать пустой объект, но в этом случае поток будет работать впустую.

После запуска мы увидим заветные логи, которые будут появляться каждую секунду. Если хотите, можете поэкспериментировать и подписаться на поток несколько раз. Вы увидите, что каждый из потоков будет выполняться независимо от остальных.

Если подумать, то наш поток будет выполняться в течение жизни всего приложения, ведь никакой логики отмены setInterval у нас нет, а в функции-подписке нет вызова метода complete. Но что, если нам нужно, чтобы поток завершился?

На самом деле все очень просто. Если посмотреть в документацию, то можно увидеть, что метод subscribe возвращает объект подписки. У данного объекта есть метод unsubscribe. Вызовем его, и наш наблюдатель перестанет получать значения из потока.

const subscription = timer.subscribe({next: console.log});
setTimeout(() => subscription.unsubscribe(), 5000); //поток завершиться через 5 секунд

После запуска мы увидим, что счетчик остановится на цифре 4. Но, хоть мы и отписались от потока, наша функция setInterval продолжает работать. Она каждую секунду инкрементирует наш счетчик и передает его наблюдателю-пустышке. Чтобы такого не происходило, надо написать логику отмены интервала. Для этого нужно вернуть из функции-подписки новую функцию, в которой будет реализована логика отмены.

const timer = new Observable(observer => {
  let counter = 0;
  const intervalId = setInterval(() => {
    observer.next(counter++);
  }, 1000);

  return () => {
    clearInterval(intervalId);
  }
});

Теперь мы можем вздохнуть с облегчением. После вызова метода unsubscribe произойдет вызов нашей функции отписки, которая очистит интервал.

Заключение

В этой статье показаны отличия императивного подхода от реактивного, а также приведены примеры создания собственных потоков. В следующей части я расскажу о том, какие еще методы для создания потоков существуют и как их применять.

Автор: Денис Макаров

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js