Возможности языка Q и KDB+ на примере сервиса реального времени

в 14:05, , рубрики: big data, kdb+, nosql, базы данных, Блог компании Технологический Центр Дойче Банка, хранилища данных

О том, что такое база KDB+, язык программирования Q, какие у них есть сильные и слабые стороны, можно прочитать в моей предыдущей статье и кратко во введении. В статье же мы реализуем на Q сервис, который будет обрабатывать входящий поток данных и высчитывать поминутно различные агрегирующие функции в режиме “реального времени” (т.е. будет успевать все посчитать до следующей порции данных). Главная особенность Q состоит в том, что это векторный язык, позволяющий оперировать не единичными объектами, а их массивами, массивами массивов и другими сложносоставными объектами. Такие языки как Q и родственные ему K, J, APL знамениты своей краткостью. Нередко программу, занимающую несколько экранов кода на привычном языке типа Java, можно записать на них в несколько строк. Именно это я и хочу продемонстрировать в этой статье.

Возможности языка Q и KDB+ на примере сервиса реального времени - 1

Введение

KDB+ — это колоночная база данных, ориентированная на очень большие объемы данных, упорядоченные определенным образом (в первую очередь по времени). Используется она, в первую очередь, в финансовых организациях – банках, инвестиционных фондах, страховых компаниях. Язык Q – это внутренний язык KDB+, позволяющий эффективно работать с этими данными. Идеология Q – это краткость и эффективность, понятность при этом приносится в жертву. Обосновывается это тем, что векторный язык в любом случае будет сложен для восприятия, а краткость и насыщенность записи позволяет увидеть на одном экране гораздо большую часть программы, что в итоге облегчает ее понимание.

В статье мы реализуем полноценную программу на Q и вам, возможно, захочется попробовать ее в деле. Для этого вам понадобится собственно Q. Скачать бесплатную 32-битную версию можно на сайте компании kx – www.kx.com. Там же, если вам интересно, вы найдете справочную информацию по Q, книгу Q For Mortals и разнообразные статьи на эту тему.

Постановка задачи

Есть источник, который присылает каждые 25 миллисекунд таблицу с данными. Поскольку KDB+ применяется в первую очередь в финансах, будем считать, что это таблица сделок (trades), в которой есть следующие колонки: time (время в миллисекундах), sym (обозначение компании на бирже – IBM, AAPL,…), price (цена, по которой куплены акции), size (размер сделки). Интервал 25 миллисекунд выбран произвольно, он не слишком маленький и не слишком большой. Его наличие означает, что данные приходят в сервис уже буферизованные. Можно было бы легко реализовать буферизацию на стороне сервиса, в том числе динамическую, зависящую от текущей нагрузки, но для простоты остановимся на фиксированном интервале.

Сервис должен считать поминутно для каждого входящего символа из колонки sym набор агрегирующих функций – max price, avg price, sum size и т.п. полезную информацию. Для простоты мы положим, что все функции можно вычислять инкрементально, т.е. для получения нового значения достаточно знать два числа – старое и входящее значения. Например, функции max, average, sum обладают этим свойством, а функция медиана нет.

Также мы предположим, что входящий поток данных упорядочен по времени. Это даст нам возможность работать только с последней минутой. На практике достаточно уметь работать с текущей и предыдущей минутами на случай, если какие-то апдейты запоздали. Для простоты мы не будем рассматривать этот случай.

Агрегирующие функции

Ниже перечислены необходимые агрегирующие функции. Я взял их как можно больше, чтобы увеличить нагрузку на сервис:

  • high – max price – максимальная цена за минуту.
  • low – min price – минимальная цена за минуту.
  • firstPrice – first price – первая цена за минуту.
  • lastPrice – last price – последняя цена за минуту.
  • firstSize – first size – первый размер сделки за минуту.
  • lastSize – last size — последний размер сделки за минуту.
  • numTrades – count i – число сделок за минуту.
  • volume – sum size – сумма размеров сделок за минуту.
  • pvolume – sum price – сумма цен за минуту, необходимо для avgPrice.
  • turnover – sum price*size – суммарный объем сделок за минуту.
  • avgPrice – pvolume%numTrades – средняя цена за минуту.
  • avgSize – volume%numTrades – средний размер сделки за минуту.
  • vwap – turnover%volume – взвешенная по размеру сделки средняя цена за минуту.
  • cumVolume – sum volume – накопленный размер сделок за все время.

Сразу обсудим один неочевидный момент – как инициализировать эти колонки в первый раз и для каждой следующей минуты. Некоторые колонки типа firstPrice каждый раз нужно инициализировать значением null, их значение не определено. Другие типа volume нужно устанавливать всегда в 0. Еще есть колонки, которые требуют комбинированного подхода – например, cumVolume необходимо копировать из предыдущей минуты, а для первой установить в 0. Зададим все эти параметры используя тип данных словарь (аналог записи):

// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже

Я добавил sym и time в словарь для удобства, теперь initWith – это готовая строчка из финальной агрегированной таблицы, где осталось задать правильные sym и time. Можно использовать ее для добавления новых строк в таблицу.

aggCols нам понадобится при создании агрегирующей функции. Список необходимо инвертировать из-за особенностей порядка вычислений выражений в Q (справа налево). Цель – обеспечить вычисление в направлении от high к cumVolume, поскольку некоторые колонки зависят от предыдущих.

Колонки, которые нужно скопировать в новую минуту из предыдущей, колонка sym добавлена для удобства:

rollColumns:`sym`cumVolume;

Теперь разделим колонки на группы согласно тому, как их следует обновлять. Можно выделить три типа:

  1. Аккумуляторы (volume, turnover,..) – мы должны прибавить входящее значение к предыдущему.
  2. С особой точкой (high, low, ..) – первое значение в минуте берется из входящих данных, остальные считаются с помощью функции.
  3. Остальные. Всегда считаются с помощью функции.

Определим переменные для этих классов:

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

Порядок вычислений

Обновлять агрегированную таблицу мы будем в два этапа. Для эффективности мы сначала ужмем входящую таблицу так, чтобы там осталась одна строка для каждого символа и минуты. То, что все наши функции инкрементальные и ассоциативные, гарантирует нам, что результат от этого дополнительного шага не изменится. Ужать таблицу можно было бы с помощью селекта:

select high:max price, low:min price … by sym,time.minute from table

У этого способа есть минус – набор вычисляемых колонок задан заранее. К счастью, в Q селект реализован и как функция, куда можно подставить динамически созданные аргументы:

?[table;whereClause;byClause;selectClause]

Я не буду подробно описывать формат аргументов, в нашем случае нетривиальными будут только by и select выражения и они должны быть словарями вида columns!expressions. Таким образом, ужимающую функцию можно задать так:

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

Для понятности я использовал функцию parse, которая превращает строку с Q выражением в значение, которое может быть передано в функцию eval и которое требуется в функциональном селекте. Также отметим, что preprocess задана как проекция (т.е. функция с частично определенными аргументами) функции селект, один аргумент (таблица) отсутствует. Если мы применим preprocess к таблице, то получим ужатую таблицу.

Второй этап – это обновление агрегированной таблицы. Напишем сначала алгоритм в псевдокоде:

for each sym in inputTable
  idx: row index in agg table for sym+currentTime;
  aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
  aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
  …

В Q вместо циклов принято использовать функции map/reduce. Но поскольку Q – векторный язык и все операции мы можем спокойно применять ко всем символам сразу, то в первом приближении мы можем обойтись вообще без цикла, проделывая операции со всеми символами сразу:

idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…

Но мы можем пойти и дальше, в Q есть уникальный и исключительно мощный оператор – оператор обобщенного присваивания. Он позволяет изменить набор значений в сложной структуре данных используя список индексов, функций и аргументов. В нашем случае он выглядит так:

idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

К сожалению, для присвоения в таблицу нужен список строк, а не колонок, и приходится транспонировать матрицу (список колонок в список строк) с помощью функции flip. Для большой таблицы это накладно, поэтому вместо этого применим обобщенное присваивание к каждой колонке отдельно, используя функцию map (которая выглядит как апостроф):

.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];

Мы снова используем проекцию функции. Также заметьте, что в Q создание списка – это тоже функция и мы можем вызвать ее с помощью функции each(map), чтобы получить список списков.

Чтобы набор вычисляемых колонок не был фиксирован, создадим выражение выше динамически. Определим сначала функции для вычисления каждой колонки, используя переменные row и inp для ссылки на агрегированные и входные данные:

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
 ("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");

Некоторые колонки особые, их первое значение не должно вычисляться функцией. Мы можем определить, что оно первое по колонке row[`numTrades] – если в ней 0, то значение первое. В Q есть функция выбора — ?[Boolean list;list1;list2] – которая выбирает значение из списка 1 или 2 в зависимости от условия в первом аргументе:

// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];

Тут я вызвал обобщенное присваивание с моей функцией (выражение в фигурных скобках). В нее передается текущее значение (первый аргумент) и дополнительный аргумент, который я передаю в 4-м параметре.

Отдельно добавим аккумуляторные колонки, поскольку для них функция одна и та же:

// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;

Это обычное по меркам Q присваивание, только присваиваю я сразу список значений. Наконец, создадим главную функцию:

// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";

Этим выражением я динамически создаю функцию из строки, которая содержит выражение, которое я приводил выше. Результат будет выглядеть так:

{[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}

Порядок вычисления колонок инвертирован, поскольку в Q порядок вычисления справа налево.

Теперь у нас есть две основные функции, необходимые для вычислений, осталось добавить немного инфраструктуры и сервис готов.

Финальные шаги

У нас есть функции preprocess и updateAgg, которые делают всю работу. Но необходимо еще обеспечить правильный переход через минуты и вычислить индексы для агрегации. В первую очередь определим функцию init:

init:{
  tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
  currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
  currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
  offset::0; // индекс в tradeAgg, где начинается текущая минута 
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
 }

Также определим функцию roll, которая будет менять текущую минуту:

roll:{[tm]
  if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
  rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
  offset::count tradeAgg;
  currSyms::`u#`$();
 }

Нам понадобится функция для добавления новых символов:

addSyms:{[syms]
  currSyms,::syms; // добавим в список известных
  // добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
  // Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 }

И, наконец, функция upd (традиционное название этой функции для Q сервисов), которая вызывается клиентом, для добавления данных:

upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц 
  tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
  updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
  data:select from data where time=tm; // фильтрация
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
  updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
 };

Вот и все. Вот полный код нашего сервиса, как и обещалось, всего несколько строк:

initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;

accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;

selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];

aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '

init:{
  tradeAgg::0#enlist[initWith];
  currTime::00:00;
  currSyms::`u#`symbol$();
  offset::0;
  rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
 };
roll:{[tm]
  if[currTime>tm; :init[]];
  rollCache,::offset _ rollColumns#tradeAgg;
  offset::count tradeAgg;
  currSyms::`u#`$();
 };
addSyms:{[syms]
  currSyms,::syms;
  `tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
 };

upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
  if[tm<>currTime; roll tm; currTime::tm];
  data:select from data where time=tm;
  if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
  updateAgg[`tradeAgg;offset+currSyms?syms;data];
 };

Тестирование.

Проверим производительность сервиса. Для этого запустим его в отдельном процессе (поместите код в файл service.q) и вызовите функцию init:

q service.q –p 5566

q)init[]

В другой консоли запустите второй Q процесс и подсоединитесь к первому:

h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте

Сначала создадим список символов – 10000 штук и добавим функцию для создания случайной таблицы. Во второй консоли:

syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}

Я добавил в список символов три настоящих, чтобы было удобнее искать их в таблице. Функция rnd создает случайную таблицу с n строками, где время меняется от t до t+25 миллисекунд.

Теперь можно попробовать послать данные в сервис (добавим первые десять часов):

{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10

Можно проверить в сервисе, что таблица обновилась:

c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL

Результат:

sym time high low firstPrice lastPrice firstSize lastSize numTrades volume pvolume turnover avgPrice avgSize vwap cumVolume
AAPL 09:27 9.258904 9.258904 9.258904 9.258904 8 8 1 8 9.258904 74.07123 9.258904 8 9.258904 2888
AAPL 09:28 9.068162 9.068162 9.068162 9.068162 7 7 1 7 9.068162 63.47713 9.068162 7 9.068162 2895
AAPL 09:31 4.680449 0.2011121 1.620827 0.2011121 1 5 4 14 9.569556 36.84342 2.392389 3.5 2.631673 2909
AAPL 09:33 2.812535 2.812535 2.812535 2.812535 6 6 1 6 2.812535 16.87521 2.812535 6 2.812535 2915
AAPL 09:34 5.099025 5.099025 5.099025 5.099025 4 4 1 4 5.099025 20.3961 5.099025 4 5.099025 2919

Проведем теперь нагрузочное тестирование, чтобы выяснить сколько данных сервис может обрабатывать в минуту. Напомню, что мы установили интервал для апдейтов в 25 миллисекунд. Соответственно, сервис должен (в среднем) укладываться хотя бы в 20 миллисекунд на апдейт, чтобы дать время пользователям запросить данные. Введите следующее во втором процессе:

tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}

4800 – это две минуты. Можно попробовать запустить сначала для 1000 строк каждые 25 миллисекунд:

start 1000

В моем случае результат получается в районе пары миллисекунд на апдейт. Так что я сразу увеличу количество строк до 10.000:

start 10000

Результат:

min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030

Снова ничего особенного, а ведь это 24 миллиона строк в минуту, 400 тысяч в секунду. Больше 25 миллисекунд апдейт тормозил только 5 раз, видимо при смене минуты. Увеличим до 100.000:

start 100000

Результат:

min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532

Как видим, сервис едва справляется, но тем не менее ему удается удержаться на плаву. Такой объем данных (240 миллионов строк в минуту) чрезвычайно велик, в таких случаях принято запускать несколько клонов (или даже десятков клонов) сервиса, каждый из которых обрабатывает только часть символов. Тем не менее, результат впечатляющий для интерпретируемого языка, который ориентирован в первую очередь на хранение данных.

Может возникнуть вопрос, почему время растет нелинейно вместе с размером каждого апдейта. Причина в том, что ужимающая функция – это фактически С функция, которая работает гораздо эффективнее updateAgg. Начиная с какого-то размера апдейта (в районе 10.000), updateAgg достигает своего потолка и дальше ее время выполнения не зависит от размера апдейта. Именно за счет предварительного шага Q сервис в состоянии переваривать такие объемы данных. Это подчеркивает, насколько важно, работая с большими данными, выбирать правильный алгоритм. Еще один момент – правильное хранение данных в памяти. Если бы данные хранились не по-колоночно или не были упорядочены по времени, то мы бы познакомились с такой вещью, как TLB cache miss – отсутствие адреса страницы памяти в кэше адресов процессора. Поиск адреса занимает где-то в 30 раз больше времени в случае неудачи и в случае рассеянных данных может замедлить сервис в несколько раз.

Заключение

В этой статье я показал, что база KDB+ и Q пригодны не только для хранения больших данных и простого доступа к ним через селект, но и для создания сервисов обработки данных, которые способны переваривать сотни миллионов строк/гигабайты данных даже в одном отдельно взятом Q процессе. Сам язык Q позволяет исключительно кратко и эффективно реализовывать алгоритмы, связанные с обработкой данных за счет своей векторной природы, встроенного интерпретатора диалекта SQL и очень удачного набора библиотечных функций.

Я замечу, что изложенное выше, это лишь часть возможностей Q, у него есть и другие уникальные особенности. Например, чрезвычайно простой IPC протокол, который стирает границу между отдельными Q процессами и позволяет объединять сотни этих процессов в единую сеть, которая может располагаться на десятках серверов в разных концах света.

Автор: Quintanar

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js