Альтернатива MapReduce при поиске в распределенной БД

в 10:42, , рубрики: tarantool, Администрирование баз данных, Блог компании VK

Привет, меня зовут Сатбек, я работаю в команде Tarantool. Расскажу, как реализовать в шардированном кластере поиск, скорость которого не зависит от количества мастеров и объёма хранимых данных. Условно назову этот способ индексным слоем:

  • Опишу общую схему построения поиска.

  • Приведу пример реализации.

  • Дам рекомендации по разработке.

В статье я последовательно реализую простой CRUD-сервис с шардированным хранилищем, а также поиск по данным. Это поможет вам лучше понять, когда возникают проблемы с поиском по распределённым данным и как их решает индексный слой.

Использовать будем БД Tarantool (версия ≥ 1.10), а также фреймворк для построения кластеров Tarantool-Cartridge (версия 2.7.0).

Для лучшего понимания желательно познакомиться с фреймворком Tarantool-Cartidge, модулем vshard, а также языком Lua, так как пример написан на нём.

Общее описание приложения

В качестве примера реализуем простой CRUD-сервис с одной таблицей user, шардированной по id. В Tarantool используется термин space, далее по тексту "спейс". Исходный код приложения находится здесь.

Искомые данные будут храниться в спейсе user в таком формате:

|id(uuid)|bucket_id(number)|name(string)|birthdate(number)|phone_number(string)|

Пользователи шардируются по id. Есть primary index по id и secondary index по name.

Как искать в таком кластере?

Поиск по id

-- router side
local bucket_id = vshard.router.bucket_id(user_id)
user = vshard.router.callrw(bucket_id, 'get_user_by_id', {user_id}, {timeout = 1})
-- storage side
local M = {}
function M.get_user_by_id(id)
    local user_t = get_user_tuple_by_id(id)
    if user_t == nil then
        return nil, M.user_not_exists_err:new('id=%s', id)
    end
    return user_tuple_to_output(user_t)
end

Для поиска по id используется стандартный для Tarantool подход. Мы вычисляем bucket_id на стороне роутера, чтобы узнать, на каком storage-узле лежат наши данные, а затем шлём туда запрос. В результате не более чем за два запроса получаем искомое.

Альтернатива MapReduce при поиске в распределенной БД - 1

Поиск по name

Далее реализуем поиск по name. Здесь не обойтись одним запросом: необходимо обойти каждый master-storage в нашем кластере, поскольку неизвестно, где лежат наши данные.

Альтернатива MapReduce при поиске в распределенной БД - 2
-- router side
local M = {}
function M.find_users_by_name(name)
    local storage_uris = cartridge_rpc.get_candidates('app.roles.storage', {leader_only = true})
    local res_by_uri, err = cartridge_pool.map_call('storage_api.get_users_by_name', {name}, {uri_list = storage_uris, timeout = M.vshard_timeout})

    if err ~= nil then
        return nil, err
    end

    local result = {}
    for _, res in pairs(res_by_uri) do
        for _, user in pairs(res) do
            table.insert(result, user)
        end
    end
    return result
end

-- storage side
local M = {}
function M.get_users_by_name(user_name)
    local yield_every = 100
    local count = 1

    local result = {}
    for _, t in box.space.user.index.name:pairs({user_name}, 'EQ') do
        if count % yield_every == 0 then
            fiber.yield()
        end

        count = count + 1
        table.insert(result, user_tuple_to_output(t))
    end

    return result
end

В этом случае запросов будет не меньше, чем количество мастеров в кластере. Однако такой подход допустим, если искомые данные точно лежат на нескольких мастерах и если таких мастеров немного. В противном случае для эффективного поиска необходим дополнительный индекс по полю name. В контексте поиска по «имени человека» описанные выше допущения вполне справедливы при достаточно большом объёме данных.

Расширяем поиск

Теперь предположим, что вы хотите искать пользователя и по номеру телефона. Для этого можно использовать описанный выше поиск по имени: делаем на storage дополнительный индекс и реализуем MapReduce для поиска. Схема будет работать, и её даже относительно просто реализовать, особенно с помощью модуля Tarantool/crud. Но всё же у неё есть существенный недостаток: поле «номер телефона», как правило, есть только в одной записи user. Соответственно, запись с искомым значением будет находиться только на одном мастере. Но в схеме с MapReduce мы будем вынуждены обойти все мастеры, и если у вас их две штуки в кластере, то придётся сделать два запроса на чтение, если 10 — то 10 запросов, и т.д. И всё это будут сетевые запросы, которые очень дороги по сравнению с чтением из индекса, да ещё и могут упасть. Возникает новая проблема — избыточные сетевые запросы при поиске по уникальным полям.

Схема индексного слоя

Трудность в том, что нам нужно найти уникальные данные, но мы не знаем, на каком мастере они лежат. Значит, нужно хранить об этом информацию. Пожертвовать памятью, чтобы уменьшить количество сетевых запросов. 

Создадим специальный спейс user_search_index:

|user_id(string-uuid)|bucket_id(unsigned)|data_hash(string)|data(any)|

Этот спейс и будет поисковым слоем. При записи данных мы будет вычислять data_hash от информации, используемой для поиска:

--один из множества вариантов, как это можно сделать
local msgpack = require('msgpack')
local digest = require('digest')

local data = {'phone-number', '202-555-0165'}
local data_hash = digest.md5_hex(msgpack.encode(data))

Спейс user_search_index шардируется по data_hash. И чтобы исключить коллизии, нужно хранить data. Здесь можно найти модуль приложения, реализующий логику построения хэшей:

local M = {}

local digest = require('digest')
local msgpack = require('msgpack')
M.cmp_data = {}

local Identifier = {}

function Identifier:new(data)
    local obj = {}
    obj.hash = digest.md5_hex(msgpack.encode(data))
    obj.data = data
    return obj
end

function M.phone_number(phone_number)
    local data = {'phone_number', phone_number}
    return Identifier:new(data)
end

function M.cmp_data.phone_number(data_one, data_two)
    return #data_one == 2 and #data_two == 2 and (data_one[1] == data_two[1]) and (data_one[2] == data_two[2] ~= nil)
end

return M

Read

При поиске:

  • вычисляем хэш от поисковых данных;

  • находим по нему данные из user_search_index;

  • шлём запрос на storage и получаем данные по primary-key.

Пример кода для поиска по номеру телефона:

-- api.lua
local M = {}
function M.find_users_by_phone_number(phone_number)
    local user_ids, err = M.search_index.user_id.get_by_phone_number(phone_number)
    if err ~= nil then
        err = errors.wrap(err)
        return nil, err
    end

    return get_users_by_ids(user_ids)
end

-- search_api.lua
local M = {}
function M.user_id.get_by_phone_number(phone_number)
    local identifier = M.identifier.phone_number(phone_number)

    local bucket_id = M.vshard_router.bucket_id_mpcrc32(identifier.hash)

    local ids, err = M.vshard_router.callrw(bucket_id, 'search_storage_api.user_id.get_by_phone_number',
        {identifier.hash, identifier.data}, {timeout = M.vshard_timeout}
    )
    if err ~= nil then
        err = errors.wrap(err)
        return nil, err
    end
    return ids
end

-- search_storage_api.lua
local M = {}
local function get_users_by_hash(hash, data, cmp_func)
    local result = {}
    for _, t in box.space.user_search_index.index.hash:pairs({hash}, 'EQ') do
        if t.data_hash ~= hash then
            break
        end
        if cmp_func(t.data, data) then
            result[#result + 1] = t.user_id
        end
    end
    if #result == 0 then
        return nil, not_found_err:new()
    end
    return result
end

function M.user_id.get_by_phone_number(hash, data)
    return get_users_by_hash(hash, data, M.cmp_data.phone_number)
end

Обошлись без MapReduce. В случае уникальных данных делаем один сетевой запрос для нахождения primary_key, и второй для получения искомых данных.

Индексация произвольных данных

Индексный слой позволяет перенести индексацию с уровня БД на уровень кода приложения. Это открывает широкие возможности по индексации произвольных данных. Например, можно достаточно легко сделать составной индекс на основе имени и даты рождения.

-- identifier.lua

local M = {}

local digest = require('digest')
local msgpack = require('msgpack')
M.cmp_data = {}

local Identifier = {}

function Identifier:new(data)
    local obj = {}
    obj.hash = digest.md5_hex(msgpack.encode(data))
    obj.data = data
    return obj
end

...

function M.name_birthdate(name, birthdate)
    local data = {'name', name, 'birthdate', birthdate}
    return Identifier:new(data)
end

function M.cmp_data.name_birthdate(data_one, data_two)
    return #data_one == 4 and #data_two == 4 and
            (data_one[1] == data_two[1]) and
            (data_one[2] == data_two[2] ~= nil) and
            (data_one[3] == data_two[3] ~= nil) and
            (data_one[4] == data_two[4] ~= nil)
end

return M

Остальное полностью аналогично поиску по номеру телефона.

Write

Выше я описал, как искать с помощью индексного слоя. Теперь рассмотрим его построение. При создании или обновлении данных необходимо создавать или обновлять и индексный слой. Реализация этих процессов зависит от конкретной задачи, но скорее всего она будет идти в фоне. Для этого можно воспользоваться модулями Tarantool/queue и Tarantool/sharded-queue. В этом примере есть наивная реализация построения индексного слоя с помощью Tarantool/queue.

При реализации индексного слоя мы усложним запись и обновление данных, зато существенно ускорим чтение. В нашем случае работа с индексным слоем при записи выглядит так:

Альтернатива MapReduce при поиске в распределенной БД - 3

Обновление выполняется в фоне уже после того, как данные обновились. То есть  возможна ситуация, когда поиск ничего не вернёт, но данные в БД уже будут. Или в случае обновления поиск по старым данным вернёт новые, так как содержимое обновилось, а поисковый слой ещё нет. Но в конечном итоге данные будут согласованы. Возможно, в вашем случае нужна иная логика: сначала обновить индексный слой, а затем — данные.

Итоги

Индексный слой: 

  • Позволит существенно уменьшить количество сетевых запросов на чтение при поиске уникальных данных.

  • Позволит не хранить производные данные для поиска по ним.

  • Позволит реализовать быстрый поиск по любым уникальным данным, хранящимся у вас.

Скачать Tarantool можно на официальном сайте, а получить помощь — в Telegram-чате.

Автор:
satbekt

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js