Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов

в 10:00, , рубрики: ravendb, Алгоритмы, Блог компании RUVDS.com, Криптовалюты, разработка, финансы в IT
Что если можно было бы хранить данные временных рядов вместе с «обычными» данными, избавившись от затрат времени, сил и ресурсов, связанных с использованием отдельной СУБД?

RavenDB — это документо-ориентированная NoSQL-база данных, оснащённая стандартной поддержкой работы с временными рядами. То есть — получается нечто вроде MongoDB со встроенной InfluxDB. Это позволяет применять RavenDB для хранения и обработки данных, получаемых с финансовых рынков. В частности — строить графики цены Bitcoin с использованием C# и TypeScript.

Вот 5-минутное видео, в котором приведено сравнение поддержки временных рядов в RavenDB с их поддержкой в других подобных системах.

В этом видео идёт речь об интересных рыночных данных и о построении ценовых графиков по образцу популярного приложения для трейдинга, разработанного компанией Robinhood. Данный материал посвящён разбору демонстрационного приложения. Когда вы его освоите, вы должны получить представление о том, как работать с временными рядами в RavenDB.

Что представляют собой данные временных рядов?

«Временным рядом» (time series) обычно называют набор значений, упорядоченных по времени. Такие данные, по своей природе, обычно являются высокочастотными. Среди типичных источников временных рядов можно отметить IoT-устройства, различные вспомогательные ИТ-системы, системы аналитики, финансовые системы.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 1

Визуализация временных рядов

Что такого особенного во временных рядах?

У того, кто впервые столкнулся с временными рядами, может появиться такой вопрос: «Можно ли хранить подобные значения в документо-ориентированной базе данных или в виде строк в реляционной БД?».

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 2

Сведения о курсе криптовалюты

Поступить так можно, но подобные решения не отличаются масштабируемостью. При таком подходе к работе с временными рядами хранилище данных очень скоро окажется переполненным, а клиентскому приложению, со временем, будет всё тяжелее выполнять агрегирование данных.

До того, как в RavenDB появилась поддержка временных рядов, для работы с ними требовалось нечто вроде InfluxDB, что усложняло архитектуру соответствующих приложений.

Встроенная поддержка временных рядов в RavenDB решает вышеозначенные задачи благодаря следующим механизмам:

  1. Поддержка координированных, атомарных транзакций в масштабах кластера.
  2. Использование политик хранения данных, основанных на правилах, применение оптимизации хранилищ данных.
  3. Выполнение запросов на стороне сервера и агрегирование данных с применением индексов.

Временные ряды — это «расширения» документов, в результате при их обработке система может пользоваться уже существующей инфраструктурой, применяемой для хранения документов и выполнения запросов к ним. Это упрощает работу с временными рядами и означает наличие встроенных в СУБД механизмов оптимизации такой работы.

Работа с рыночными данными

Я написал приложение, демонстрирующее работу с временными рядами на примере цены Bitcoin. На создание этого проекта меня вдохновила популярная программа для трейдеров, написанная компанией Robinhood.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 3

Демонстрационное приложение

Его код, и написанный на C#, и рассчитанный на платформу Node.js, можно найти на GitHub. Там же есть и пример базы данных, которую можно импортировать в собственный экземпляр RavenDB (ещё можно бесплатно попробовать эту СУБД, воспользовавшись RavenDB Cloud).

Данные о цене берутся с криптобиржи KuCoin. В архитектуре моего приложения можно выделить три основных части:

  • Система получения данных из внешнего мира: имеется фоновая задача, которая берёт данные из KuCoin и помещает их в RavenDB.
  • Бэкенд: конечная точка HTTP, которая выполняет запрос данных из RavenDB.
  • Фронтенд: приложение, обладающее веб-интерфейсом, которое выводит интерактивный график цены.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 4

Схема демонстрационного приложения

Рассказывая об этом проекте, я исхожу из предположения о том, что читатель обладает начальными навыками работы с RavenDB. Я продемонстрирую приёмы работы с временными рядами с применением графического инструмента RavenDB Studio и с использованием программного интерфейса — SDK RavenDB. Если вы не работали с RavenDB — вот курс для самостоятельного освоения, который позволит вам узнать всё что нужно для понимания того, о чём я расскажу.

Управление временными рядами в RavenDB Studio

Начнём с рассказа о том, как работать с временными рядами в RavenDB Studio.

▍Добавление временных рядов в документы

В нашей учебной базе данных имеется документ, представляющий биржевой символ Bitcoin (BTC-USDT).

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 5

Документ, представляющий BTC-USDT

Как видите, в самом документе нет временных рядов.

▍Создание коллекций временных рядов

Временные ряды при работе в RavenDB Studio можно добавить в базу данных с помощью боковой панели документа.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 6

Панель свойств документа в RavenDB Studio

Не существует пустых временных рядов. Временные ряды «создаются» при присоединении к документу нового значения временного ряда. А после того, как из временного ряда удаляют последнее значение — он исчезает.

Записи временных рядов включают в себя различные данные. В частности — имя ряда, отметку времени и значения, соответствующие этой отметке времени.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 7

Структура элемента временного ряда

Здесь первые четыре значения являются именованными (Open, Close, High, Low), а последнее значение идентифицируется лишь его номером (Value #4). По умолчанию значения временных рядов являются неименованными. Доступ к ним осуществляется по индексам. Но, если нужно, им можно давать имена.

▍Подготовка к использованию именованных значений временных рядов

В RavenDB Studio, в разделе Settings > Time Series, можно найти интерфейс для управления настройками временных рядов с привязкой к коллекции документов.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 8

Настройка временных рядов в RavenDB Studio

Каждое имя связано с индексом значения записи и с именем временного ряда.

Работа с временными рядами посредством API

Работа с временными рядами в RavenDB Studio организована очень удобно. Но с данными временных рядов работают, в основном, из кода.

▍Добавление к временным рядам новых данных и обновление существующих значений

В нашем приложении данные, попадающие во временной ряд, загружаются из API KuCoin с помощью Node.js-приложения (фонового задания), написанного на TypeScript и работающего на Azure Functions.

Примеры кода, приведённые в этом разделе, написаны на TypeScript.

После открытия сессии RavenDB работа с API, дающим доступ к данным временных рядов, начинается с обращения к соответствующему документу. В нашем случае сначала, по ID, загружается документ, имеющий отношение к Bitcoin:

let symbolDoc = await session.load(`MarketSymbols/${marketSymbol}`);

if (!symbolDoc) {
  symbolDoc = {
    symbol: marketSymbol,
    "@metadata": {
      "@collection": "MarketSymbols",
    },
  };
  await session.store(symbolDoc, `MarketSymbols/${marketSymbol}`);
}

Так как временной ряд должен быть связан с документом, нам, прежде чем мы сможем пользоваться API timeSeriesFor, нужно будет создать временной ряд, поступив так в том случае, если он пока не создан.

const timeSeries = session.timeSeriesFor(symbolDoc, "history");

API timeSeriesFor принимает сущность (документ) и имя коллекции значений временного ряда. Это (пока) не приводит к загрузке каких-либо данных.

API KuCoin возвращает данные об изменениях цены между двумя отметками времени в виде «японской свечи». Вот как это выглядит.

Обработка данных криптовалютного рынка в RavenDB с использованием временных рядов - 9

Японские свечи (взято с Investopedia)

KuCoin представляет данные свечей в виде кортежа. Каждый его элемент соответствует набору значений, характеризующий одну свечу.

Наш TypeScript-код перебирает элементы, характеризующие свечи, и, применяя деструктурирование массивов, извлекает значения, после чего помещает их во временной ряд RavenDB:

for (const bucket of buckets) {
  const [
    startTime, 
    openPrice, 
    closePrice, 
    highPrice, 
    lowPrice
  ] = bucket;

  const timestamp = dayjs.unix(startTime);

  timeSeries.append(
    timestamp.toDate(),
    [openPrice, closePrice, highPrice, lowPrice]
  );
}

Метод append принимает отметку времени (тут используется вспомогательная библиотека dayjs) и массив значений (сохранённых в порядке, соответствующем их индексам).

await session.store(symbolDoc, `MarketSymbols/${marketSymbol}`);
await session.saveChanges();

Как и при выполнении других операций, основанных на работе с документами, изменения временного ряда не вносятся в базу данных до вызова saveChanges(). Операции внесения изменений во временные ряды, выполняемые различными клиентами, не приводят к конфликтам, так как в RavenDB используется механизм автоматического разрешения конфликтов.

После того, как фоновая задача загрузит данные и поместит их в RavenDB, мы можем обращаться к этим данным, можем передать их веб-фронтенду и построить график.

Загрузка данных временных рядов при загрузке документа

Наш фронтенд обращается к конечной точке REST, используя параметры запроса. В ответ он получает JSON-представление нужных данных, после чего строит график, пользуясь библиотекой Apex Charts. Здесь используется C# и .NET RavenDB SDK.

Здесь и далее приведены примеры кода, написанного на C#.

Когда клиент запрашивает данные по биржевому символу, код загружает документ, пользуясь его ID:

var symbol = await _session.LoadAsync<MarketSymbol>(
  $"MarketSymbols/{marketSymbol}", includes => 
    includes.IncludeTimeSeries("history", 
      from: DateTime.UtcNow.AddDays(-1), to: DateTime.UtcNow));

API IncludeTimeSeries используется для загрузки данных временных рядов вместе с документами (тут используется стратегия «eager fetch»). Благодаря этому уменьшается количество сетевых запросов, отправляемых к базе данных и выполняется кеширование данных временных рядов в сессии. Аргументы from и to позволяют предотвратить загрузку в память всего набора данных.

После того, как документ будет загружен, можно воспользоваться API сессии TimeSeriesFor для обращения к элементам временного ряда:

var historyTimeSeries = _session.TimeSeriesFor<SymbolPrice>(symbol, "history");

Обратите внимание на то, что мы используем тут дженерик-аргумент типа SymbolPrice. Это — структура, которая строго типизирует значения.

▍Организация поддержки именованных значений с помощью классов

Я рассказывал о том, как вручную настраивать имена значений в RavenDB Studio. Но то же самое можно сделать и в коде. Это позволяет рассматривать код приложения как источник истинной информации.

Создадим структуру для именования значений временного ряда:

public struct SymbolPrice
{
    [TimeSeriesValue(0)] public double Open;
    [TimeSeriesValue(1)] public double Close;
    [TimeSeriesValue(2)] public double High;
    [TimeSeriesValue(3)] public double Low;
}

TimeSeriesValue принимает индекс записи и связывает значение со свойством.

Теперь, после инициализации DocumentStore, зарегистрируем временной ряд, указав тип коллекции и имя временного ряда:

store.Initialize();
store.TimeSeries.Register<MarketSymbol, SymbolPrice>("history");

Обратите внимание на то, что подобное возможно лишь в SDK, рассчитанных на платформы и языки, в которых применяется строгая типизация, вроде .NET и Java.

▍Загрузка исходных данных временного ряда

После того, как будет загружен документ, можно получить самые свежие записи временного ряда:

var historyTimeSeries = _session.TimeSeriesFor<SymbolPrice>(symbol, "history");

var latestEntries = await historyTimeSeries.GetAsync(
  from: DateTime.UtcNow.AddDays(-1), to: DateTime.UtcNow);

Если бы мы не воспользовались подсказкой IncludeTimeSeries, то вызов GetAsync привёл бы к выполнению дополнительных обращений к базе данных. А при таком подходе данные загружаются из кеша сессии.

Мы, учитывая возможности RavenDB по поддержанию данных в актуальном состоянии, можем воспользоваться последней записью для получения последней цены,:

var latestEntry = latestEntries.LastOrDefault();

viewModel.LastUpdated = latestEntry?.Timestamp;
viewModel.LastPrice = latestEntry?.Value.Close ?? 0;

▍Выполнение запросов к данным временных рядов и их агрегирование

Нашим следующим шагом на пути построения графика будет агрегирование данных с применением выбранного временного окна — вроде «прошлый день» или «прошлая неделя».

При использовании традиционных баз данных нам пришлось бы загрузить весь набор данных и самостоятельно привести данные к нужному виду. Это привело бы к необходимости поместить в память все данные временного ряда.

В RavenDB данные временных рядов индексированы. Их можно группировать и сортировать на сервере баз данных, а уже потом — возвращать клиенту. В нашем распоряжении оказываются и другие полезные возможности, которые дают разработчикам индексы.

Воспользуемся API сессии Query для выполнения запроса к коллекции:

var aggregatedHistoryQueryResult = await _session.Query<MarketSymbol>()
  .Where(c => c.Id == symbolId)

Потом можно, для конструирования выражения запроса к временному ряду, применить вспомогательную функцию из RavenQuery.TimeSeries:

var aggregatedHistoryQueryResult = await _session.Query<MarketSymbol>()
  .Where(c => c.Id == symbolId)
  .Select(c => RavenQuery.TimeSeries<SymbolPrice>(c, "history")
    .Where(s => s.Timestamp > fromDate)
    .GroupBy(groupingAction)
    .Select(g => new
    {
      First = g.First(),
      Last = g.Last(),
      Min = g.Min(),
      Max = g.Max()
    })
    .ToList()
  ).ToListAsync();

Для построения запроса к временному ряду мы пользуемся двумя переменными: fromDate и groupingAction.

Значение fromDate вычисляется на основании параметров временного окна, нужного для вывода данных во фронтенде:

var marketTime = GetMarketTime();
var fromDate = aggregation switch
{
  AggregationView.OneDay => marketTime.LastTradingOpen,
  AggregationView.OneWeek => DateTime.UtcNow.AddDays(-7),
  AggregationView.OneMonth => DateTime.UtcNow.AddMonths(-1),
  AggregationView.ThreeMonths => DateTime.UtcNow.AddMonths(-3),
  AggregationView.OneYear => DateTime.UtcNow.AddYears(-1),
  AggregationView.FiveYears => DateTime.UtcNow.AddYears(-5)
};

Этот код (основываясь на «наивных» предположениях) определяет моменты начала и окончания торгов, после чего возвращает подходящую дату, которую можно использовать в запросе для фильтрации данных.

Объект groupingAction — это, в конечном счёте, то, что группирует точки данных временного ряда. В его основе лежат сведения том, данные в каком временном окне надо вывести программе. API ITimePeriodBuilder применяется для построения корректного выражения запроса на группировку данных с использованием простых единиц измерения времени:

Action<ITimePeriodBuilder> groupingAction = aggregation switch
{
  AggregationView.OneDay => builder => builder.Minutes(5),
  AggregationView.OneWeek => builder => builder.Minutes(10),
  AggregationView.OneMonth => builder => builder.Hours(1),
  AggregationView.ThreeMonths => builder => builder.Hours(24),
  AggregationView.OneYear => builder => builder.Hours(24),
  AggregationView.FiveYears => builder => builder.Days(7),
};

Объект groupingAction сообщает RavenDB о том, как нужно сгруппировать данные перед возвратом. Делается это за счёт внутренних механизмов перевода соответствующих сведений на язык запросов Raven. Благодаря этому вся нагрузка по обработке данных перекладывается на сервер баз данных.

После того, как в нашем распоряжении оказываются запрошенные данные — мы строим модель и назначаем значения цен каждой группе:

var historyBuckets = new List<MarketSymbolTimeBucket>();
foreach (var seriesAggregation in aggregatedHistory.Results)
{
  historyBuckets.Add(new MarketSymbolTimeBucket()
  {
    Timestamp = seriesAggregation.From,
    OpeningPrice = seriesAggregation.First.Open,
    ClosingPrice = seriesAggregation.Last.Close,
    HighestPrice = seriesAggregation.Max.High,
    LowestPrice = seriesAggregation.Min.Low,
  });
}

Используя структуру, обеспечивающую работу с именованными значениями (SymbolPrice), мы можем обращаться к агрегированным значениям для каждой ценовой группы. Например, цена открытия будет первым значением в группе, Open, а цена закрытия будет последним значением — Close.

Такое представление модели после этого возвращается клиенту в формате JSON, на основании этих данных можно построить график.

Итоги

Поддержка временных рядов в RavenDB соперничает со специализированными продуктами, вроде InfluxDB. В RavenDB, при этом, имеется намного больше возможностей, чем те, о которых мы говорили. Это, например, политики агрегирования данных одних временных рядов и помещения их в другие временные ряды, тегирование, пользовательское индексирование временных рядов, «инкрементирование» временных рядов.

Хотите лучше со всем этим разобраться? Обратитесь к материалам моего проекта на GitHub.

Какими СУБД вы пользуетесь для работы с временными рядами?

Автор:
ru_vds

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js