О том, что такое база KDB+, язык программирования Q, какие у них есть сильные и слабые стороны, можно прочитать в моей предыдущей статье и кратко во введении. В статье же мы реализуем на Q сервис, который будет обрабатывать входящий поток данных и высчитывать поминутно различные агрегирующие функции в режиме “реального времени” (т.е. будет успевать все посчитать до следующей порции данных). Главная особенность Q состоит в том, что это векторный язык, позволяющий оперировать не единичными объектами, а их массивами, массивами массивов и другими сложносоставными объектами. Такие языки как Q и родственные ему K, J, APL знамениты своей краткостью. Нередко программу, занимающую несколько экранов кода на привычном языке типа Java, можно записать на них в несколько строк. Именно это я и хочу продемонстрировать в этой статье.
Введение
KDB+ — это колоночная база данных, ориентированная на очень большие объемы данных, упорядоченные определенным образом (в первую очередь по времени). Используется она, в первую очередь, в финансовых организациях – банках, инвестиционных фондах, страховых компаниях. Язык Q – это внутренний язык KDB+, позволяющий эффективно работать с этими данными. Идеология Q – это краткость и эффективность, понятность при этом приносится в жертву. Обосновывается это тем, что векторный язык в любом случае будет сложен для восприятия, а краткость и насыщенность записи позволяет увидеть на одном экране гораздо большую часть программы, что в итоге облегчает ее понимание.
В статье мы реализуем полноценную программу на Q и вам, возможно, захочется попробовать ее в деле. Для этого вам понадобится собственно Q. Скачать бесплатную 32-битную версию можно на сайте компании kx – www.kx.com. Там же, если вам интересно, вы найдете справочную информацию по Q, книгу Q For Mortals и разнообразные статьи на эту тему.
Постановка задачи
Есть источник, который присылает каждые 25 миллисекунд таблицу с данными. Поскольку KDB+ применяется в первую очередь в финансах, будем считать, что это таблица сделок (trades), в которой есть следующие колонки: time (время в миллисекундах), sym (обозначение компании на бирже – IBM, AAPL,…), price (цена, по которой куплены акции), size (размер сделки). Интервал 25 миллисекунд выбран произвольно, он не слишком маленький и не слишком большой. Его наличие означает, что данные приходят в сервис уже буферизованные. Можно было бы легко реализовать буферизацию на стороне сервиса, в том числе динамическую, зависящую от текущей нагрузки, но для простоты остановимся на фиксированном интервале.
Сервис должен считать поминутно для каждого входящего символа из колонки sym набор агрегирующих функций – max price, avg price, sum size и т.п. полезную информацию. Для простоты мы положим, что все функции можно вычислять инкрементально, т.е. для получения нового значения достаточно знать два числа – старое и входящее значения. Например, функции max, average, sum обладают этим свойством, а функция медиана нет.
Также мы предположим, что входящий поток данных упорядочен по времени. Это даст нам возможность работать только с последней минутой. На практике достаточно уметь работать с текущей и предыдущей минутами на случай, если какие-то апдейты запоздали. Для простоты мы не будем рассматривать этот случай.
Агрегирующие функции
Ниже перечислены необходимые агрегирующие функции. Я взял их как можно больше, чтобы увеличить нагрузку на сервис:
- high – max price – максимальная цена за минуту.
- low – min price – минимальная цена за минуту.
- firstPrice – first price – первая цена за минуту.
- lastPrice – last price – последняя цена за минуту.
- firstSize – first size – первый размер сделки за минуту.
- lastSize – last size — последний размер сделки за минуту.
- numTrades – count i – число сделок за минуту.
- volume – sum size – сумма размеров сделок за минуту.
- pvolume – sum price – сумма цен за минуту, необходимо для avgPrice.
- turnover – sum price*size – суммарный объем сделок за минуту.
- avgPrice – pvolume%numTrades – средняя цена за минуту.
- avgSize – volume%numTrades – средний размер сделки за минуту.
- vwap – turnover%volume – взвешенная по размеру сделки средняя цена за минуту.
- cumVolume – sum volume – накопленный размер сделок за все время.
Сразу обсудим один неочевидный момент – как инициализировать эти колонки в первый раз и для каждой следующей минуты. Некоторые колонки типа firstPrice каждый раз нужно инициализировать значением null, их значение не определено. Другие типа volume нужно устанавливать всегда в 0. Еще есть колонки, которые требуют комбинированного подхода – например, cumVolume необходимо копировать из предыдущей минуты, а для первой установить в 0. Зададим все эти параметры используя тип данных словарь (аналог записи):
// list ! list – создать словарь, 0n – float null, 0N – long null, `sym – тип символ, `sym1`sym2 – список символов
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time; // список всех вычисляемых колонок, reverse объяснен ниже
Я добавил sym и time в словарь для удобства, теперь initWith – это готовая строчка из финальной агрегированной таблицы, где осталось задать правильные sym и time. Можно использовать ее для добавления новых строк в таблицу.
aggCols нам понадобится при создании агрегирующей функции. Список необходимо инвертировать из-за особенностей порядка вычислений выражений в Q (справа налево). Цель – обеспечить вычисление в направлении от high к cumVolume, поскольку некоторые колонки зависят от предыдущих.
Колонки, которые нужно скопировать в новую минуту из предыдущей, колонка sym добавлена для удобства:
rollColumns:`sym`cumVolume;
Теперь разделим колонки на группы согласно тому, как их следует обновлять. Можно выделить три типа:
- Аккумуляторы (volume, turnover,..) – мы должны прибавить входящее значение к предыдущему.
- С особой точкой (high, low, ..) – первое значение в минуте берется из входящих данных, остальные считаются с помощью функции.
- Остальные. Всегда считаются с помощью функции.
Определим переменные для этих классов:
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
Порядок вычислений
Обновлять агрегированную таблицу мы будем в два этапа. Для эффективности мы сначала ужмем входящую таблицу так, чтобы там осталась одна строка для каждого символа и минуты. То, что все наши функции инкрементальные и ассоциативные, гарантирует нам, что результат от этого дополнительного шага не изменится. Ужать таблицу можно было бы с помощью селекта:
select high:max price, low:min price … by sym,time.minute from table
У этого способа есть минус – набор вычисляемых колонок задан заранее. К счастью, в Q селект реализован и как функция, куда можно подставить динамически созданные аргументы:
?[table;whereClause;byClause;selectClause]
Я не буду подробно описывать формат аргументов, в нашем случае нетривиальными будут только by и select выражения и они должны быть словарями вида columns!expressions. Таким образом, ужимающую функцию можно задать так:
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size"); // each это функция map в Q для одного списка
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
Для понятности я использовал функцию parse, которая превращает строку с Q выражением в значение, которое может быть передано в функцию eval и которое требуется в функциональном селекте. Также отметим, что preprocess задана как проекция (т.е. функция с частично определенными аргументами) функции селект, один аргумент (таблица) отсутствует. Если мы применим preprocess к таблице, то получим ужатую таблицу.
Второй этап – это обновление агрегированной таблицы. Напишем сначала алгоритм в псевдокоде:
for each sym in inputTable
idx: row index in agg table for sym+currentTime;
aggTable[idx;`high]: aggTable[idx;`high] | inputTable[sym;`high];
aggTable[idx;`volume]: aggTable[idx;`volume] + inputTable[sym;`volume];
…
В Q вместо циклов принято использовать функции map/reduce. Но поскольку Q – векторный язык и все операции мы можем спокойно применять ко всем символам сразу, то в первом приближении мы можем обойтись вообще без цикла, проделывая операции со всеми символами сразу:
idx:calcIdx inputTable;
row:aggTable idx;
aggTable[idx;`high]: row[`high] | inputTable`high;
aggTable[idx;`volume]: row[`volume] + inputTable`volume;
…
Но мы можем пойти и дальше, в Q есть уникальный и исключительно мощный оператор – оператор обобщенного присваивания. Он позволяет изменить набор значений в сложной структуре данных используя список индексов, функций и аргументов. В нашем случае он выглядит так:
idx:calcIdx inputTable;
rows:aggTable idx;
// .[target;(idx0;idx1;..);function;argument] ~ target[idx 0;idx 1;…]: function[target[idx 0;idx 1;…];argument], в нашем случае функция – это присваивание
.[aggTable;(idx;aggCols);:;flip (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
К сожалению, для присвоения в таблицу нужен список строк, а не колонок, и приходится транспонировать матрицу (список колонок в список строк) с помощью функции flip. Для большой таблицы это накладно, поэтому вместо этого применим обобщенное присваивание к каждой колонке отдельно, используя функцию map (которая выглядит как апостроф):
.[aggTable;;:;]'[(idx;)each aggCols; (row[`high] | inputTable`high;row[`volume] + inputTable`volume;…)];
Мы снова используем проекцию функции. Также заметьте, что в Q создание списка – это тоже функция и мы можем вызвать ее с помощью функции each(map), чтобы получить список списков.
Чтобы набор вычисляемых колонок не был фиксирован, создадим выражение выше динамически. Определим сначала функции для вычисления каждой колонки, используя переменные row и inp для ссылки на агрегированные и входные данные:
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!
("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
Некоторые колонки особые, их первое значение не должно вычисляться функцией. Мы можем определить, что оно первое по колонке row[`numTrades] – если в ней 0, то значение первое. В Q есть функция выбора — ?[Boolean list;list1;list2] – которая выбирает значение из списка 1 или 2 в зависимости от условия в первом аргументе:
// high -> ?[isFirst;inp`high;row[`high]|inp`high]
// @ - тоже обобщенное присваивание для случая когда индекс неглубокий
@[`aggExpression;specialCols;{[x;y]"?[isFirst;inp`",y,";",x,"]"};string specialCols];
Тут я вызвал обобщенное присваивание с моей функцией (выражение в фигурных скобках). В нее передается текущее значение (первый аргумент) и дополнительный аргумент, который я передаю в 4-м параметре.
Отдельно добавим аккумуляторные колонки, поскольку для них функция одна и та же:
// volume -> row[`volume]+inp`volume
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
Это обычное по меркам Q присваивание, только присваиваю я сразу список значений. Наконец, создадим главную функцию:
// ":",/:aggExprs ~ map[{":",x};aggExpr] => ":row[`high]|inp`high" присвоим вычисленное значение переменной, потому что некоторые колонки зависят от уже вычисленных значений
// string[cols],'exprs ~ map[,;string[cols];exprs] => "high:row[`high]|inp`high" завершим создание присваивания. ,’ расшифровывается как map[concat]
// ";" sv exprs – String from Vector (sv), соединяет список строк вставляя “;” посредине
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}";
Этим выражением я динамически создаю функцию из строки, которая содержит выражение, которое я приводил выше. Результат будет выглядеть так:
{[aggTable;idx;inp] rows:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols ;(cumVolume:row[`cumVolume]+inp`cumVolume;… ; high:?[isFirst;inp`high;row[`high]|inp`high])]}
Порядок вычисления колонок инвертирован, поскольку в Q порядок вычисления справа налево.
Теперь у нас есть две основные функции, необходимые для вычислений, осталось добавить немного инфраструктуры и сервис готов.
Финальные шаги
У нас есть функции preprocess и updateAgg, которые делают всю работу. Но необходимо еще обеспечить правильный переход через минуты и вычислить индексы для агрегации. В первую очередь определим функцию init:
init:{
tradeAgg:: 0#enlist[initWith]; // создаем пустую типизированную таблицу, enlist превращает словарь в таблицу, а 0# означает взять 0 элементов из нее
currTime::00:00; // начнем с 0, :: означает, что присваивание в глобальную переменную
currSyms::`u#`symbol$(); // `u# - превращает список в дерево, для ускорения поиска элементов
offset::0; // индекс в tradeAgg, где начинается текущая минута
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg; // кэш для последних значений roll колонок, таблица с ключом sym
}
Также определим функцию roll, которая будет менять текущую минуту:
roll:{[tm]
if[currTime>tm; :init[]]; // если перевалили за полночь, то просто вызовем init
rollCache,::offset _ rollColumns#tradeAgg; // обновим кэш – взять roll колонки из aggTable, обрезать, вставить в rollCache
offset::count tradeAgg;
currSyms::`u#`$();
}
Нам понадобится функция для добавления новых символов:
addSyms:{[syms]
currSyms,::syms; // добавим в список известных
// добавим в таблицу sym, time и rollColumns воспользовавшись обобщенным присваиванием.
// Функция ^ подставляет значения по умолчанию для roll колонок, если символа нет в кэше. value flip table возвращает список колонок в таблице.
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime), (initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
}
И, наконец, функция upd (традиционное название этой функции для Q сервисов), которая вызывается клиентом, для добавления данных:
upd:{[tblName;data] // tblName нам не нужно, но обычно сервис обрабатывает несколько таблиц
tm:exec distinct time from data:() xkey preprocess data; // preprocess & calc time
updMinute[data] each tm; // добавим данные для каждой минуты
};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm]; // поменяем минуту, если необходимо
data:select from data where time=tm; // фильтрация
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms]; // новые символы
updateAgg[`tradeAgg;offset+currSyms?syms;data]; // обновим агрегированную таблицу. Функция ? ищет индекс элементов списка справа в списке слева.
};
Вот и все. Вот полный код нашего сервиса, как и обещалось, всего несколько строк:
initWith:`sym`time`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover`avgPrice`avgSize`vwap`cumVolume!(`;00:00;0n;0n;0n;0n;0N;0N;0;0;0.0;0.0;0n;0n;0n;0);
aggCols:reverse key[initWith] except `sym`time;
rollColumns:`sym`cumVolume;
accumulatorCols:`numTrades`volume`pvolume`turnover;
specialCols:`high`low`firstPrice`firstSize;
selExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`numTrades`volume`pvolume`turnover!parse each ("max price";"min price";"first price";"last price";"first size";"last size";"count i";"sum size";"sum price";"sum price*size");
preprocess:?[;();`sym`time!`sym`time.minute;selExpression];
aggExpression:`high`low`firstPrice`lastPrice`firstSize`lastSize`avgPrice`avgSize`vwap`cumVolume!("row[`high]|inp`high";"row[`low]&inp`low";"row`firstPrice";"inp`lastPrice";"row`firstSize";"inp`lastSize";"pvolume%numTrades";"volume%numTrades";"turnover%volume";"row[`cumVolume]+inp`volume");
@[`aggExpression;specialCols;{"?[isFirst;inp`",y,";",x,"]"};string specialCols];
aggExpression[accumulatorCols]:{"row[`",x,"]+inp`",x } each string accumulatorCols;
updateAgg:value "{[aggTable;idx;inp] row:aggTable idx; isFirst:0=row`numTrades; .[aggTable;;:;]'[(idx;)each aggCols;(",(";"sv string[aggCols],'":",/:aggExpression aggCols),")]}"; / '
init:{
tradeAgg::0#enlist[initWith];
currTime::00:00;
currSyms::`u#`symbol$();
offset::0;
rollCache:: `sym xkey update `u#sym from rollColumns#tradeAgg;
};
roll:{[tm]
if[currTime>tm; :init[]];
rollCache,::offset _ rollColumns#tradeAgg;
offset::count tradeAgg;
currSyms::`u#`$();
};
addSyms:{[syms]
currSyms,::syms;
`tradeAgg upsert @[count[syms]#enlist initWith;`sym`time,cols rc;:;(syms;currTime),(initWith cols rc)^value flip rc:rollCache ([] sym: syms)];
};
upd:{[tblName;data] updMinute[data] each exec distinct time from data:() xkey preprocess data};
updMinute:{[data;tm]
if[tm<>currTime; roll tm; currTime::tm];
data:select from data where time=tm;
if[count msyms:syms where not (syms:data`sym)in currSyms; addSyms msyms];
updateAgg[`tradeAgg;offset+currSyms?syms;data];
};
Тестирование.
Проверим производительность сервиса. Для этого запустим его в отдельном процессе (поместите код в файл service.q) и вызовите функцию init:
q service.q –p 5566
q)init[]
В другой консоли запустите второй Q процесс и подсоединитесь к первому:
h:hopen `:host:5566
h:hopen 5566 // если оба на одном хосте
Сначала создадим список символов – 10000 штук и добавим функцию для создания случайной таблицы. Во второй консоли:
syms:`IBM`AAPL`GOOG,-9997?`8
rnd:{[n;t] ([] sym:n?syms; time:t+asc n#til 25; price:n?10f; size:n?10)}
Я добавил в список символов три настоящих, чтобы было удобнее искать их в таблице. Функция rnd создает случайную таблицу с n строками, где время меняется от t до t+25 миллисекунд.
Теперь можно попробовать послать данные в сервис (добавим первые десять часов):
{h (`upd;`trade;rnd[10000;x])} each `time$00:00 + til 60*10
Можно проверить в сервисе, что таблица обновилась:
c 25 200
select from tradeAgg where sym=`AAPL
-20#select from tradeAgg where sym=`AAPL
Результат:
sym | time | high | low | firstPrice | lastPrice | firstSize | lastSize | numTrades | volume | pvolume | turnover | avgPrice | avgSize | vwap | cumVolume |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
AAPL | 09:27 | 9.258904 | 9.258904 | 9.258904 | 9.258904 | 8 | 8 | 1 | 8 | 9.258904 | 74.07123 | 9.258904 | 8 | 9.258904 | 2888 |
AAPL | 09:28 | 9.068162 | 9.068162 | 9.068162 | 9.068162 | 7 | 7 | 1 | 7 | 9.068162 | 63.47713 | 9.068162 | 7 | 9.068162 | 2895 |
AAPL | 09:31 | 4.680449 | 0.2011121 | 1.620827 | 0.2011121 | 1 | 5 | 4 | 14 | 9.569556 | 36.84342 | 2.392389 | 3.5 | 2.631673 | 2909 |
AAPL | 09:33 | 2.812535 | 2.812535 | 2.812535 | 2.812535 | 6 | 6 | 1 | 6 | 2.812535 | 16.87521 | 2.812535 | 6 | 2.812535 | 2915 |
AAPL | 09:34 | 5.099025 | 5.099025 | 5.099025 | 5.099025 | 4 | 4 | 1 | 4 | 5.099025 | 20.3961 | 5.099025 | 4 | 5.099025 | 2919 |
Проведем теперь нагрузочное тестирование, чтобы выяснить сколько данных сервис может обрабатывать в минуту. Напомню, что мы установили интервал для апдейтов в 25 миллисекунд. Соответственно, сервис должен (в среднем) укладываться хотя бы в 20 миллисекунд на апдейт, чтобы дать время пользователям запросить данные. Введите следующее во втором процессе:
tm:10:00:00.000
stressTest:{[n] 1 string[tm]," "; times,::h ({st:.z.T; upd[`trade;x]; .z.T-st};rnd[n;tm]); tm+:25}
start:{[n] times::(); do[4800;stressTest[n]]; -1 " "; `min`avg`med`max!(min times;avg times;med times;max times)}
4800 – это две минуты. Можно попробовать запустить сначала для 1000 строк каждые 25 миллисекунд:
start 1000
В моем случае результат получается в районе пары миллисекунд на апдейт. Так что я сразу увеличу количество строк до 10.000:
start 10000
Результат:
min| 00:00:00.004
avg| 9.191458
med| 9f
max| 00:00:00.030
Снова ничего особенного, а ведь это 24 миллиона строк в минуту, 400 тысяч в секунду. Больше 25 миллисекунд апдейт тормозил только 5 раз, видимо при смене минуты. Увеличим до 100.000:
start 100000
Результат:
min| 00:00:00.013
avg| 25.11083
med| 24f
max| 00:00:00.108
q)sum times
00:02:00.532
Как видим, сервис едва справляется, но тем не менее ему удается удержаться на плаву. Такой объем данных (240 миллионов строк в минуту) чрезвычайно велик, в таких случаях принято запускать несколько клонов (или даже десятков клонов) сервиса, каждый из которых обрабатывает только часть символов. Тем не менее, результат впечатляющий для интерпретируемого языка, который ориентирован в первую очередь на хранение данных.
Может возникнуть вопрос, почему время растет нелинейно вместе с размером каждого апдейта. Причина в том, что ужимающая функция – это фактически С функция, которая работает гораздо эффективнее updateAgg. Начиная с какого-то размера апдейта (в районе 10.000), updateAgg достигает своего потолка и дальше ее время выполнения не зависит от размера апдейта. Именно за счет предварительного шага Q сервис в состоянии переваривать такие объемы данных. Это подчеркивает, насколько важно, работая с большими данными, выбирать правильный алгоритм. Еще один момент – правильное хранение данных в памяти. Если бы данные хранились не по-колоночно или не были упорядочены по времени, то мы бы познакомились с такой вещью, как TLB cache miss – отсутствие адреса страницы памяти в кэше адресов процессора. Поиск адреса занимает где-то в 30 раз больше времени в случае неудачи и в случае рассеянных данных может замедлить сервис в несколько раз.
Заключение
В этой статье я показал, что база KDB+ и Q пригодны не только для хранения больших данных и простого доступа к ним через селект, но и для создания сервисов обработки данных, которые способны переваривать сотни миллионов строк/гигабайты данных даже в одном отдельно взятом Q процессе. Сам язык Q позволяет исключительно кратко и эффективно реализовывать алгоритмы, связанные с обработкой данных за счет своей векторной природы, встроенного интерпретатора диалекта SQL и очень удачного набора библиотечных функций.
Я замечу, что изложенное выше, это лишь часть возможностей Q, у него есть и другие уникальные особенности. Например, чрезвычайно простой IPC протокол, который стирает границу между отдельными Q процессами и позволяет объединять сотни этих процессов в единую сеть, которая может располагаться на десятках серверов в разных концах света.
Автор: Quintanar