Есть система с множеством пользователей. Каждый пользователь системы может осуществлять действия по отношению друг к другу. На основании этих действий рассчитывается вес. Необходимо иметь возможность для каждого пользователя получать список остальных пользователей системы, отсортированный в порядке убывания веса. Характеристики весов у бездействующего пользователя меняться не должны.
В своей прошлой статье я описал базовые понятия и средства для начала роботы с тарантулом. В этой статье попробую уделить больше внимания использованию хранимых процедур в Тарантуле на примере одной игровой задачи.
Для упрощения представим, что система — это игра, а пользователь — это игрок.
Например, у вас есть онлайн игра, в которой игроки могут просматривать достижения друг друга, нападать друг на друга, отправлять сообщения, переводить виртуальные деньги, торговать и т. д. Для каждого игрока вам необходимо уметь сортировать остальных игроков. При этом вес, по которому происходит сортировка, должен зависеть не только от активности, но и от того, какая именно активность была проявлена игроком. Например, при просмотре профиля вес увеличивается не сильно, а вот при торговой операции гораздо больше.
Довольно стандартная статистическая задача в наши нелёгкие программистские будни. В этой статье я покажу как ее можно реализовать при помощи Tarantool + Lua. В своей прошлой статье я рассказывал как собирается и конфигурируется тарантул; в этой более подробно остановлюсь не только на схеме но и подробно расскажу про Lua и про то как работать с полученным добром из Perlа.
Что нам потребуется для решения этой задачи:
Таблица, состоящая из полей, в которых надо хранить идентификатор игрока, для которого будем делать выборку, идентификатор игрока, который находится в выборке, и вес. Другими словами это, представление графа в виде массива граней с весом. Но, учитывая что вес не может только увеличиваться, а его изо дня в день надо еще и уменьшать, то придется хранить дату последнего обновления веса игрока. Конечно, дату можно не хранить, но тогда придётся ежедневно пробегаться по всем записям и пересчитывать все веса. Немного упростим задачу, т.к. нам не дана формула по которой нужно рассчитывать уменьшение веса, допустим, что вес равномерно уменьшается до нуля в течении 30 дней, при условии, что действий по отношения к этому игроку не производится, но производятся по отношению к другим игрокам. Поэтому нам понадобится еще и дата последнего действия по отношению к игроку, а лучше уже рассчитанный коэффициент, который рассчитывается один раз в момент активного действия. Увеличиваться вес будет в зависимости от действия произведённого по отношению к участнику. Однако в задаче сказано, что веса не должны изменяться, если игрок вообще не проявлял активности. Собственно для реализации этого условия нам понадобится хранить дату последней активности пользователя.
График весов участников должен выглядеть примерно так:
Одним из показательных мест графика является отметка 05.02 в этот момент видно, что действия начинают производиться только по отношению к игроку, график веса которого обозначен оранжевым цветом. А на отметке 17.02 действия не производятся по отношению ни к одному пользователю, поэтому веса остаются на одном и том же уровне.
Итого, получаем 2 спейса. Первый из 5 полей:
Игрок1
Игрок2
Вес
Дата последнего обновления
Коэффициент снижения веса в день
Второй из 2-х полей:
Игрок1
Дата последней активности игрока1
Теперь проанализируем, какие же нам понадобятся индексы:
Раз мы ходим выбирать отношение одного игрока ко всем другим, то нам нужен неуникальный индекс по первой записи. Также мы хотим получать отсортированный список по весу, значит нужен составной индекс по 1 + 3 полю. Ну и конечно нужно построить уникальный индекс по 1 + 2 полю для точечной выборки одного игрока по отношению к другому, он будет первичным. Со вторым спейсом всё проще, нужен только один уникальный индекс по первой записи.
Конфигурация тарантула:
slab_alloc_arena = 1
pid_file = "box.pid"
logger="cat - >> Tarantool.log"
primary_port = 33013
secondary_port = 33014
admin_port = 33015
rows_per_wal = 5000000
# Индексы для спейсов
space[0].enabled = 1
space[0].index[0].type = "HASH"
space[0].index[0].unique = 1
space[0].index[0].key_field[0].fieldno = 0
space[0].index[0].key_field[0].type = "NUM"
space[0].index[0].key_field[1].fieldno = 1
space[0].index[0].key_field[1].type = "NUM"
space[0].index[1].type = "TREE"
space[0].index[1].unique = 0
space[0].index[1].key_field[0].fieldno = 0
space[0].index[1].key_field[0].type = "NUM"
space[0].index[2].type = "TREE"
space[0].index[2].unique = 0
space[0].index[2].key_field[0].fieldno = 0
space[0].index[2].key_field[0].type = "NUM"
space[0].index[2].key_field[1].fieldno = 2
space[0].index[2].key_field[1].type = "NUM"
space[1].enabled = 1
space[1].index[0].type = "HASH"
space[1].index[0].unique = 1
space[1].index[0].key_field[0].fieldno = 0
space[1].index[0].key_field[0].type = "NUM"
Тарантул готов, можно инициализировать хранилище:
$ tarantool_box --init-storage
tarantool/src/box/tarantool_box: space 0 successfully configured
tarantool/src/box/tarantool_box: space 1 successfully configured
tarantool/src/box/tarantool_box: creating `./00000000000000000001.snap.inprogress'
tarantool/src/box/tarantool_box: saving snapshot `./00000000000000000001.snap'
tarantool/src/box/tarantool_box: done
Хранилище подготовлено, можно писать Lua процедуры.
Понадобятся нам две процедуры:
- регистрация действия пользователя (increase_score)
- получение отсортированного списка пользователей (get_top)
Реализуем их в файле box_popular_user.lua, конфигурационные переменные выносим в box_config_popular_user.lua
Тогда для подключения этого плагина в init.lua нужно написать 1 строку: dofile(«box_popular_user.lua»)
Итак, теперь подробнее про increase_score().
function increase_score(user_id, friend_id, action_type)
-- Проверяем входящие параметры
local id = tonumber(user_id)
local fid = tonumber(friend_id)
if not id or not fid or not map_type_score[action_type] then
return false
end
-- Возьмём дату без времени
local dt = os.date('*t')
local cd = box.time{year = dt.year; month=dt.month; day=dt.day}
-- Если это первое действие пользователя, то добавим запись в первый спейс с сегодняшней датой
local last_update = box.select('1', '0', id)
if not last_update then
last_update = box.insert('1',id,cd)
end
-- Вычислим сколько дней назад пользователь производил действия
local difft = math.floor(( cd-box.unpack('i',last_update[1]) )/24/60/60)
-- Если действия производились сегодня, то никаких пересчётов делать не нужно
if difft ~= 0 then
if difft > 1 then
-- Если действия производились более дня назад, то по условиям задачи веса меняться не должны, поэтому надо пододвинуть дату в нулевом спейсе
-- Возможно, нам надо обновлять много данных поэтому отдадим это в соседний поток, чтобы быстрее отпустить пользователя
table.insert(updates_in_fibers,id,box.fiber.wrap(function() _move_last_update(id, cd, difft) end))
difft = difft-1
else
difft = 0
end
box.update('1',id,'=p',1,cd)
end
-- Возьмём вес отношений двух переданных пользователей
local tup = box.select('0','0', id, fid)
-- Для пересчета веса и коэффициента его снижения вызовем функцию _get_score_koef
if not tup then
-- Если такой записи еще не было, то добавим
local s_k = _get_score_koef(nil, nil, map_type_score[action_type], nil, cd)
tup = box.insert('0', id, fid, s_k[1], cd, s_k[2])
else
-- Если запись была то обновим
local s_k = _get_score_koef(box.unpack('i', tup[3])+difft, box.unpack('i', tup[2]), map_type_score[action_type], box.unpack('i',tup[4]), cd)
-- Если обновление дат ушло в отдельный поток, то вес обновим на дату последнего обновления, но саму дату сдвигать не будем
tup = box.update('0',{id;fid},'=p=p=p',2, s_k[1],3,cd,4,s_k[2])
end
return tup
end
У нас может попасться сильно «богатый» игрок на отношения с другими игроками в своем списке, который давно не заходил в систему, поэтому обновлять дату последнего пересчета веса по отношению к другим игрокам будем в отдельном потоке, чтобы быстрее отдать ответ. Ну и конечно же не забудем защититься от параллельных задач на одного и того же пользователя и не будем пересчитывать веса, пока не обновятся все даты. Собственно, функция обновления дат:
function _move_last_update(id, cd, diff)
weights = box.space[0]
for tup in weights.index[1]:iterator(box.index.EQ, id) do
-- Обновляем только те записи которые не обновлялись
if box.unpack('i',tup[3]) < cd then
weights:update({box.unpack('i',tup[1]); box.unpack('i',tup[2])},'=p', 3, box.unpack('i',tup[3])+diff)
end
end
-- После завершения обновления очистим id из таблицы потоков
table.remove(updates_in_fibers, id)
end
Тут мы используем update который отдает управление в другой поток, чем мы обезопасим себя от того, что пока не будут обновлены даты, больше не один клиент не получит ответа.
Теперь рассмотрим функцию пересчета веса и коэффициента его снижения.
function _get_score_koef(last_update, last_score, add_score, koef, current_date)
local score = 0;
if not last_score then
-- Если это первое действие по отношению к пользователю, то пересчитывать ничего не надо, просто берём нужный вес действия и считаем его за вес отношения
score = add_score
else
if current_date == last_update then
-- Если это не первое действие за сегодня, то просто добавим вес действия к существующему весу пользователя
score = last_score + add_score
else
-- Если предыдущее действие производилось не сегодня, то вычислим дату последнего обновления веса и умножим на коэффициент
local diff = (current_date-last_update)/24/60/60*koef
if diff > last_score then
-- Подстрахуемся, если вышли за рамки, то считаем, что оставшийся вес - это тот вес который надо вычесть
diff = last_score
end
-- Добавим к получившемуся весу отношений пользователей, вес действия
score = last_score + add_score - diff
end
end
if add_score then
-- Если производилось действие, то пересчитаем коэффициент, если функция вызывалась для получения текущего веса пользователя, то коэффициент оставляем прежний
koef = score/box_pu_default['score_day']
end
return {score,koef};
end
Для повышения производительности изменяем только те записи, которые этого требуют. Точно так же поступаем и при выборке записей, если запись нужно обновить то обновим, если нет то не будем. Логика определяющая необходимость обновления записи такая же, как и в функции изменения веса. Но усложним себе жизнь и сделаем возможность выборки только указанных пользователей отсортированных в порядке убывания.
Рассмотрим функцию получения пользователей
function get_top(user_id, count_users, ids)
-- Проверим входящие параметры, если необязательные не переданы то возьмем значения по умолчанию (указаваются в файле "box_config_popular_user.lua")
local id = tonumber(user_id)
if not id then
return false
end
local cu
if not count_users then
cu = box_pu_default['count_users']
else
cu = tonumber(count_users)
end
local id_users = {}
local count = 0
-- Если были переданны id пользователей для фильтрации, то разберём их, и если их кол-во меньше чем кол-во возвращаемых элементов, то приравняем их
if ids then
for id in string.gmatch(ids, "%d+") do
id_users[tonumber(id)] = 1
count=count+1
end
if(count < cu) then
cu = count
end
end
-- Получим текущую дату без времени
local ret
local last_update = box.select('1', '0', id)
-- Делаем 2 прохода на случай если при первом проходе мы нашли хоть одного пользователя по отношению к которому сегодня не производились действия
for iter = 1, 2 do
local need_update = {}
ret = {}
for v in box.space[0].index[2]:iterator(box.index.LE, id) do
-- Выйдем если закончился индекс или мы набрали нужное кол-во пользователей
if not v or #ret == cu or box.unpack('i',v[0]) ~= id then break end
-- Обрабатываем только переданных пользователей, а если список пустой, то всех
if not ids or id_users[box.unpack('i',v[1])] == 1 then
-- Если дата обновления конкретного пользователя не равна дате последнего действия пользователя и в данный момент не происходит обновление
-- этой даты, то пересчитаем вес на момент последнего действия пользователя
if not updates_in_fibers[id] and box.unpack('i', v[3]) ~= box.unpack('i', last_update[1]) then
table.insert(need_update, v)
else
-- Добавим в результирующий массив пользователя из индекса
table.insert(ret, v)
end
end
end
local need_another_req = 0
for i = 1, #need_update do
local v = need_update[i]
local s_k = _get_score_koef(box.unpack('i', v[3]), box.unpack('i', v[2]), 0, box.unpack('i',v[4]), box.unpack('i', last_update[1]))
-- Если вес нулевой, то удалим пользователя из индекса
if s_k[1] == 0 then
box.delete('0',{v[0],v[1]})
else
-- Обновим вес в индексе и выставим флаг, что нужен второй проход, т.к. последовательность записей могла поменяться
box.update('0',{v[0];v[1]},'=p=p=p',2, s_k[1],3,box.unpack('i', last_update[1]),4,s_k[2])
need_another_req = 1
end
end
if need_another_req == 0 then break end
end
return unpack(ret)
end
В конфигурационный файл box_config_popular_user.lua кладём такие значения:
map_type_score={user_page=100} -- вес действий пользователей передаваемых в increase_score, ключи любые, тут главное, чтобы ключи совпадали с первым параметром передаваемым в increase_score
box_pu_default={count_users=10;score_day=30} -- кол-во пользователей возвращаемых из get_top и кол-во дней на протяжении которых вес имеет своё значение, соответственно.
Первой строкой в файле box_popular_user.lua подключаем конфиг:
dofile("box_config_popular_user.lua")
Собственно, для установки этого плагина необходимо в рабочую директорию положить эти два файла. Прописать нужные значения конфигурации. Записать в init.lua строку для подключения плагина. Перезапустить тарантул и пользоваться. Ну и, конечно, не забывать про конфигурацию спейсов.
Теперь посмотрим на эту систему с точки зрения использования.
Напишем самый простой тест на perl, который синхронно в один поток будет скармливать Тарантулу данные на основании некоторого лога.
В логе будут строки содержащие имя функции, которую надо дёрнуть в тарантуле и набор параметров, который надо туда передать. Это синтетический лог файл, но такой он нужен, чтобы не нагружать наш скрипт дополнительной работой, мы же проверяем тарантул на выдержанность.
Для работы с тарантулом нам понадобится модуль DR::Tarantool, его можно установить с Сpan, можно поставить через любимый менеджер пакетов yum, apt,…
use strict;
use DR::Tarantool ':constant', 'tarantool';
my $client = tarantool
host => 'localhost',
port => 33013,
spaces => {
0 => {
name => 'user_score',
default_type => 'NUM',
fields => [
qw(id fid score last_update koef),
],
indexes => {
0 => {
name => 'idx_id_fid',
fields => [ 'id', 'fid' ]
},
1 => {
name => 'idx_id',
fields => [ 'id' ]
},
2 => {
name => 'idx_id_score',
fields => [ 'id', 'score' ]
},
}
},
1 => {
name => 'user_last_update',
default_type => 'NUM',
fields => [
qw(id last_update),
],
indexes => {
0 => {
name => 'idx_id',
fields => [ 'id' ]
},
}
},
};
while(<>){
my $line = $_;
my @params = split( " ", $line);
$client->call_lua($params[0], @params[1..$#params],($params[0] eq 'increase_score' ? ('user_page') : ());
};
Вот такой небольшой скрипт, в котором бОльшую его часть занимает описание спейсов, умеет работать с тарантулом. На самом деле из этого скрипта выброшена часть которая отправляла данный в графит, т. к. нас больше интересует работа с тарантулом. Собственно в один поток я получил суммарно около 3500 запросов в секунду.
Однако, тарантул кушал всего около 17% CPU. Попробуем запустить скрипт в 4 потока.
Каждый из скриптов успел скормить тарантулу примерно 1050 записей из лога в секунду, а все четыре одновременно около 4200 записей в секунду. Тарантул при этом скушал около 38% процессора. Особого прироста не получили. Хотя и наши скрипты которые отправляют данные в тарантул тоже в процессор не упёрлись.
В ходе разбирательств в чём же проблема и куда затупляет наша система, я нашел, что DR::Tarantool в синхронном режиме тоже использует AnyEvent и более того, на каждый пришедший запрос но порождает новый AE::io, что в свою очередь вызывает системный select, что, по моему мнению и вызывает некоторые задержки. Более глубоко я решил не копать, а просто написал еще одну тестовую программку, но уже с использованием Client::Tarantool. Всё что мне пришлось сделать – это изменить use и инициализацию клиента.
use Client::Tarantool;
my $client = Client::Tarantool->new...
И так запускаем и смотрим на граффик:
Вначале работал новый скрипт в 1 поток, уперся в 100% CPU, потом был запущен в 3 потока, каждый из которых тоже упёрлись в 100% CPU и, что очень обрадовало, так это то, что кол-во запросов к тарантулу росло! Потом запустили в 4 потока и тут Тарантул упёрся в 100% CPU. Итого получилось порядка 18-20К запросов в секунду. Далее, для проверки был запущен еще и пятый процесс, и, ожидаемо, кол-во запросов на каждый поток упало и равномерно распределилось, оставшись на уровне 18-20К.
Этот синхронный протокол не использует AnyEvent, он использует чистый syswrite и sysread, что в какой то мере подтверждает теорию с таймаутами при использовании «лишних» AE::io.
Ну а теперь, напоследок, чтобы совсем не раскритиковывать AnyEvent напишем асинхронный тест на основе DR::Tarantool::AsyncClient.
#!/usr/bin/perl
use strict;
use DR::Tarantool::AsyncClient 'tarantool';
use AnyEvent;
use AnyEvent::Handle;
my $cv = condvar AnyEvent;
my $counts = {};
my $hdl;
my $w;
my $client;
# саба следящая за корректным завершением всех событий
my $done_sub = sub {
$counts->{ae}--;
$cv->send unless $counts->{ae};
};
# Саба отправляющая данные в тарантул
my $read_line_sub = sub {
my $line = shift;
my @params = split( " ", $line);
$counts->{ae}++;
$counts->{$params[0]}++;
$client->call_lua($params[0], [@params[1..$#params],($params[0] eq 'increase_score' ? ('user_page') : ())], $done_sub);
};
my $limit_concur_req = 12000; # ограничим кол-во одновременных событий
DR::Tarantool::AsyncClient->connect(
host => '127.0.0.1',
port => 33013,
spaces => {
0 => {
name => 'user_score',
default_type => 'NUM',
fields => [
qw(id fid score last_update koef),
],
indexes => {
0 => {
name => 'idx_id_fid',
fields => [ 'id', 'fid' ]
},
1 => {
name => 'idx_id',
fields => [ 'id' ]
},
2 => {
name => 'idx_id_score',
fields => [ 'id', 'score' ]
},
}
},
1 => {
name => 'user_last_update',
default_type => 'NUM',
fields => [
qw(id last_update),
],
indexes => {
0 => {
name => 'idx_id',
fields => [ 'id' ]
},
}
},
},
sub {
($client) = @_;
$counts->{ae}++;
$hdl = new AnyEvent::Handle
fh => *STDIN,
on_error => sub {
# При ошибке завершим работу
my ($hdl, $fatal, $msg) = @_;
AE::log error => $msg;
$hdl->destroy;
$cv->send;
},
on_eof => $done_sub;
my @start_request;
# Функция описывающая алгоритм работы с входным файлом
@start_request = (line => sub {
my ($hdl, $line) = @_;
$line =~ s/[rn]//g;
$read_line_sub->($line) if $line;
# Считываем строки из файла только если кол-во одновременных событий не привысило лимит
$hdl->push_read (@start_request) if $counts->{ae} < $limit_concur_req;
});
$w = AnyEvent->timer (after => 0, interval => 1, cb => sub{
# скорректируем лимит одновременных событий и запустим процедуру чтения строк если лимит позволил
$limit_concur_req += 1000 if $counts->{ae} < 1000;
$hdl->push_read (@start_request) if $counts->{ae} < $limit_concur_req;
});
}
);
$cv->recv;
Скрипт получился более развесистым и, возможно, не очень хорошо читаем, но это в большой степени зависит от привычки читать событийный код.
Запускаем 4 таких скрипта параллельно на той же машине и на тех же данных, что бы уровнять шансы между тестами.
И получаем прирост по отношению к первому синхронному тесту в 7 раз! И почти в 2 раза по отношению ко второму тесту. Это примерно 30000 запросов в секунду. При этом перловые скрипты отправляющие данные в тарантул работают почти на пределе 100% CPU
Теперь сделаем самые плохие условия для нашего алгоритма, это когда все игроки не заходили несколько дней, а потом все вместе вернулись, собственно такого не должно быть в нормальной жизни, поэтому этот тест синтетический, но он покажет нижнюю планку производительности. Чтобы был понятен порядок цифр, то у нас в первом спейсе на момент запуска теста находиться 14169698 записей. Наибольшее кол-во отношений игроков 5000.
Итого, в среднем получаем около 20000 запросов в секунду, правда разброс по воркерам стал побольше, кто-то из них попадает на игроков с большим кол-вом связей, кто-то на игроков с меньшим кол-вом связей.
Итого, у нас получилась коробка, которая умеет решать поставленную задачу с производительностью в 30000 запросов в секунду (20000 в пессимистичном варианте), что, в общем случае, удовлетворит нужды множества проектов.
Автор: shulyakovskiy