В последние годы реактивное программирование в целом, а технология ReactiveX в частности, обретает всё большую популярность среди разработчиков. Одни уже активно используют все преимущества этого подхода, а другие только “что-то слышали”. Со своей стороны я постараюсь помочь вам представить, насколько некоторые концепции реактивного программирования способны изменить взгляд на привычные, казалось бы, вещи.
Существует два принципиально различных способа организации больших систем: в соответствии с объектами и состояниями, которые живут в системе, и в соответствии с потоками данных, которые проходят через неё. Парадигма реактивного программирования предполагает легкость выражения потоков данных, а также распространение изменений благодаря этим потокам. Например, в императивном программировании операция присваивания означает конечность результата, тогда как в реактивном значение будет пересчитано при получении новых входных данных. Поток значений проходит в системе ряд трансформаций, которые необходимы для решения определенной задачи. Оперирование потоками позволяет системе быть расширяемой и асинхронной, а правильная реакция на возникающие ошибки – отказоустойчивой.
ReactiveX – библиотека, позволяющая создавать асинхронные и событийно-ориентированные программы, использующие наблюдаемые последовательности. Она расширяет шаблон Наблюдателя для поддержки последовательностей данных, добавляет операторы для их декларативного соединения, избавляя от необходимости заботиться о синхронизации и безопасности потоков, разделяемых структурах данных и неблокирующего I/O.
Одним из основных отличий библиотеки ReactiveX от функционального реактивного программирования является то, что она оперирует не непрерывно изменяющимися, а дискретными значениями, которые “испускаются” в течении длительного времени.
Стоит немного рассказать о том, что такое Observer, Observable, Subject. Модель Observable является источником данных и позволяет обрабатывать потоки асинхронных событий похожим образом с тем, который вы используете для коллекций данных, таких как массивы. И всё это вместо колбэков, а значит, код является более читабельным и менее склонным к ошибкам.
В ReactiveX наблюдатель (Observer) подписывается на Observable и впоследствии реагирует на элемент или последовательность элементов, которые тот отправляет. У каждого Observer, подписанного на Observable, вызывается метод Observer.on_next() на каждый элемент потока данных, после которого может быть вызван как Observer.on_complete(), так и Observer.on_error(). Часто Observable применяется таким образом, что он не начинает отдавать данные до тех пор, пока кто-нибудь не подписывается на него. Это так называемые “ленивые вычисления” – значения вычисляются только тогда, когда в них возникает потребность.
Бывают задачи, для решения которых нужно соединить Observer и Observable, чтобы принимать сообщения о событиях и сообщать о них своим подписчикам. Для этого существует Subject, имеющий, кроме стандартной, ещё несколько реализаций:
- ReplaySubject имеет возможность кэшировать все поступившие в него данные, а при появлении нового подписчика – отдавать всю эту последовательность сначала, работая далее в обычном режиме.
- BehaviorSubject хранит последнее значение, по аналогии с ReplaySubject отдавая его появившемуся подписчику. При создании он получает значение по умолчанию, которое будет получать каждый новый подписчик, если последнего значения еще не было.
- AsyncSubject также хранит последнее значение, но не отдает данные, пока не завершится вся последовательность.
Observable и Observer – только начало ReactiveX. Они не несут в себе всю мощь, которую являют собой операторы, позволяющие трансформировать, объединять, манипулировать последовательностями элементов, которые отдают Observable.
В документации ReactiveX описание операторов включает в себя использование Marble Diagram. К примеру, вот как эти диаграммы представляют Observable и их трансформации:
Глядя на диаграмму ниже, легко понять, что оператор map трансформирует элементы, отдаваемые Observable, путем применения функции к каждому из них.
Хорошей иллюстрацией возможностей ReactiveX является приложение RSS-агрегатора. Здесь возникает необходимость асинхронной загрузки данных, фильтрации и трансформации значений, поддержания актуального состояния путем периодического обновления.
В этой статье примеры для представления основных принципов ReactiveX написаны с использованием библиотеки rx для языка программирования Python. Вот так, например, выглядит абстрактная реализация наблюдателя:
class Observer(metaclass=ABCMeta):
@abstractmethod
def on_next(self, value):
return NotImplemented
@abstractmethod
def on_error(self, error):
return NotImplemented
@abstractmethod
def on_completed(self):
return NotImplemented
Наше приложение в режиме реального времени будет обмениваться сообщениями с браузером посредством веб-сокетов. Возможность легко реализовать это предоставляет Tornado.
Работа программы начинается с запуска сервера. При обращении браузера к серверу открывается веб-сокет.
import json
import os
import feedparser
from rx import config, Observable
from rx.subjects import Subject
from tornado.escape import json_decode
from tornado.httpclient import AsyncHTTPClient
from tornado.platform.asyncio import AsyncIOMainLoop
from tornado.web import Application, RequestHandler, StaticFileHandler, url
from tornado.websocket import WebSocketHandler
asyncio = config['asyncio']
class WSHandler(WebSocketHandler):
urls = ['https://lenta.ru/rss/top7',
'http://wsrss.bbc.co.uk/russian/index.xml']
def open(self):
print("WebSocket opened")
# здесь будет основная логика нашего приложения
def on_message(self, message):
obj = json_decode(message)
# Отправляет сообщение, которое получает user_input
self.subject.on_next(obj['term'])
def on_close(self):
# Отписаться от Observable; по цепочке остановит работу всех observable
self.combine_latest_sbs.dispose()
print("WebSocket closed")
class MainHandler(RequestHandler):
def get(self):
self.render("index.html")
def main():
AsyncIOMainLoop().install()
port = os.environ.get("PORT", 8080)
app = Application([
url(r"/", MainHandler),
(r'/ws', WSHandler),
(r'/static/(.*)', StaticFileHandler, {'path': "."})
])
print("Starting server at port: %s" % port)
app.listen(port)
asyncio.get_event_loop().run_forever()
Для обработки введенного пользователем запроса создается Subject, при подписке на который он отправляет значение по умолчанию (в нашем случае — пустую строку), а затем раз в секунду отправляет то, что введено пользователем и удовлетворяет условиям: длина 0 или больше 2, значение изменилось.
# Subject одновременно и Observable, и Observer
self.subject = Subject()
user_input = self.subject.throttle_last(
1000 # На заданном временном промежутке получать последнее значение
).start_with(
'' # Сразу же после подписки отправляет значение по умолчанию
).filter(
lambda text: len(text) == 0 or len(text) > 2
).distinct_until_changed() # Только если значение изменилось
Также для периодического обновления новостей предусмотрен Observable, который раз в 60с отдает значение.
interval_obs = Observable.interval(
60000 # Отдает значение раз в 60с (для периодического обновления)
).start_with(0)
Два этих потока соединяются оператором combine_latest, в цепочку встраивается Observable для получения списка новостей. После чего на этот Observable создается подписка, вся цепочка начинает работать только в этот момент.
# combine_latest собирает 2 потока из запросов пользователя и временных
# интервалов, срабатывает на любое сообщение из каждого потока
self.combine_latest_sbs = user_input.combine_latest(
interval_obs, lambda input_val, i: input_val
).do_action( # Срабатывает на каждый выпущенный элемент
# Отправляет сообщение для очистки списка на фронтенд
lambda x: send_response('clear')
).flat_map(
# В цепочку встраивается Observable для получения списка
self.get_data
).subscribe(send_response, on_error)
# Создается подписка; вся цепочка начинает работать только в этот момент
Следует подробнее остановиться на том, что такое “Observable для получения списка новостей”. Из списка url для получения новостей мы создаем поток данных, элементы которого приходят в функцию, где при помощи HTTP-клиента Tornado AsyncHTTPClient происходит асинхронная загрузка данных для каждого элемента списка urls. Из них также создается поток данных, который фильтруется по запросу, введенному пользователем. Из каждого потока мы берем по 5 новостей, которые приводим к нужному формату для отправки на фронтенд.
def get_rss(self, rss_url):
http_client = AsyncHTTPClient()
return http_client.fetch(rss_url, method='GET')
def get_data(self, query):
# Observable создается из списка url
return Observable.from_list(
self.urls
).flat_map(
# Для каждого url создается Observable, который загружает данные
lambda url: Observable.from_future(self.get_rss(url))
).flat_map(
# Полученные данные парсятся, из них создается Observable
lambda x: Observable.from_list(
feedparser.parse(x.body)['entries']
).filter(
# Фильтрует по вхождению запроса в заголовок или текст новости
lambda val, i: query in val.title or query in val.summary
).take(5) # Берем только по 5 новостей по каждому url
).map(lambda x: {'title': x.title, 'link': x.link,
'published': x.published, 'summary': x.summary})
# Преобразует данные для отправки на фронтенд
После того, как поток выходных данных сформирован, его подписчик начинает поэлементно получать данные. Функция send_response отправляет полученные значения во фронтенд, который добавляет новость в список.
def send_response(x):
self.write_message(json.dumps(x))
def on_error(ex):
print(ex)
В файле feeder.js
ws.onmessage = function(msg) {
var value = JSON.parse(msg.data);
if (value === "clear") {$results.empty(); return;}
// Append the results
$('<li><a tabindex="-1" href="' + value.link +
'">' + value.title +'</a> <p>' + value.published +
'</p><p>' + value.summary + '</p></li>'
).appendTo($results);
$results.show();
}
Таким образом, реализуется push-технология, в которой данные поступают от сервера к фронтенду, который лишь отправляет введенный пользователем запрос для поиска по новостям.
В качестве заключения предлагаю задуматься о том, какая реализация получилась бы при привычном подходе с использованием колбэков вместо Observable, без возможности легко объединить потоки данных, без возможности мгновенной отправки данных потребителю-фронтенду и с необходимостью отслеживать изменения в строке запроса. Среди Python-разработчиков технология пока что практически не распространена, однако я вижу уже несколько возможностей её применить на текущих проектах.
Пример использования ReactiveX для Python вы можете найти в github репозитории с демо-проектом RSS-агрегатора.
Автор: frauwntr