CubeDB: минималистичное хранилище счётчиков с многомерными ключами

в 12:38, , рубрики: badoo, BI, CubeDB, java, open source, redis, Блог компании Badoo, высокая производительность, Программирование

CubeDB: минималистичное хранилище счётчиков с многомерными ключами - 1

Привет! Меня зовут Дима Станко, я работаю в BI-команде Badoo в лондонском офисе. Так уж сложилось в нашей компании, что мы стараемся проводить как можно больше измерений активности пользователей. Это необходимо многим специалистам: разработчики тестируют работоспособность кода, коллеги из продуктовых команд убеждаются в гениальности своих идей, админы – в том, что энтропия восторжествует не этой ночью, а коллеги из отдела антиспама – что добро как раз побеждает зло в вечной и эпической схватке.

Обо всём этом мы писали много раз и будем писать ещё, поскольку считаем, что наступать на грабли и не рассказывать о них другим – нехорошо.

Мобильная аналитика

Несколько лет назад у Badoo появилась потребность в анализе действий пользователей в мобильном приложении. Каждый раз, когда кто-то нажимал на кнопочку, загружал экран, открывал приложение, писал сообщение, мы получали уведомление об этом и радовались. Пару сотен миллионов раз в день.

Вскоре возникла потребность радоваться и в более специализированных случаях, таких как прокрутка пользователем экрана до конца, просмотр профиля другого пользователя, отправка подарка. Но и этого было мало нашим коллегам из продуктовой команды. Так у нас появилась Hotpanel, система трекинга мобильных приложений, и сегодня мы получаем около 320 типов уведомлений в общем количестве 12 млрд в день. Мы об этом ещё обязательно напишем, утаивать такую красоту просто грешно.

Одним из основных компонентов интерфейса Hotpanel являются почасовые и подневные графики с различными разбивками. Описывать тысячей слов то, что можно описать картинкой, нет смысла, так что – вот:

CubeDB: минималистичное хранилище счётчиков с многомерными ключами - 2

Фильтровать можно по любым комбинациям полей:

Предыстория

На заре этого проекта, когда единственным разработчиком был я, а сообщений было мало, проблему отображения я решил очень быстро, дёшево и сердито, но, увы, не совсем эффективно: на фронте это был dc.js, а «сзади» это всё поддерживал Redis, где каждый тип сообщения и каждая единица времени имели свой HASHMAP, ключами которого выступали поля и их значения, а значением – количество раз, которое сообщение с такой комбинацией полей поступило к нам.

Пример:

  • Название HASHMAP: hourly:view_screen:2016-09-17
  • Ключи внутри выглядели так*: screen_name=view_photo,previous_screen=welcome,platform=android,app_version=1.2.3,gender=male
  • Итоговое значение: 1 500 000

Всё это означало, что 17 сентября 2016 года пользователь мужского пола на своем Android-устройстве открыл наше приложение версии 1.2.3 и ринулся рассматривать картинку.

Если в тот же день ещё один пользователь мужского пола на Android-устройстве открыл наше приложение версии 1.2.3 и тоже ринулся рассматривать картинку, у нас случался HINCRBY, значение становилось 1500001, что доставляло много радости нашему продуктовому менеджеру, ответственному за картинки на Android в версии 1.2.3.

Еще рядом возле Redis жил сервис на Python и Flask, который подключался к Redis, HGETALL’ал все словари с hourly:view_screen:2016-07-17 по hourly:view_screen:2016-10-17, лепил одну чудесную JSON-структуру и сливал это всё клиенту на dc.js. Было, конечно, много оптимизаций, но рассказывать о них я не буду, потому что всё это – как говорится, дела давно минувших дней**.

Всё было просто восхитительно, пока комбинаций было мало. Скорость была потрясающей (Crossfilter, который лежит в основе dc.js, написан автором пакета d3 и имеет время отклика менее 30 мс). В общем, это был успех. Недолгий.

Этот успех нас и погубил. С увеличением количества типов сообщений, появлением новых полей и значений количество комбинаций росло не по дням, а по часам. Использование интерфейса превращалось в пытку. Мы упёрлись в такие экзотические потолки, как, например, ограничение максимального размера принимаемого JSON-обьекта. Но этот «знак свыше» мы проигнорировали и придумали хитрое (как нам казалось) решение с разбивкой JSON-а на кусочки и последующим его «склеиванием» на клиенте. Отдельной пыткой были вопросы наших коллег из отдела веб-разработки: «Ваша страничка за две минуты сожрала полтора гигабайта оперативной памяти и убила Chrome, поздравляю! Как это у вас получилось? И почему вы всё ещё у нас работаете?!»

К этому стыду и позору добавились ещё и белые и пушистые гости из Заполярья, которые начали буянить с Redis по ночам. Потребление памяти росло, и оказалось, что даже 192 ГБ – это не так уж и много. Звонки от ребят из мониторинга в три часа утра были совсем некстати (даже мой полуторагодовалый сын так никогда не делал!).

В общем, назрела та самая ситуация, когда «низы не хотят по-старому и верхи не могут по-старому». Пришло время действовать.

Требования к системе

Нужно было найти или придумать чудо-приблуду, которая висела бы на бэкенде и умела бы следующее:

  1. Хранить данные счётчиков за 120 дней (это примерно 100 млн разных комбинаций; в несжатом виде порядка 27 ГБ данных).
  2. Фильтровать по любой комбинации полей и по интервалу дат. Формат типа field1 in ('val11', 'val12' ... ) AND field2 in ('val21', 'val22', ...) .... AND dt between x and y, и из этого ясно, что индексы нам не помогли бы, к сожалению.
  3. Представлять результаты в виде фасетов. Если в сообщении есть восемь полей, то должны выдаваться восемь словарей – для каждого поля. Каждый словарь должен содержать счётчики для каждого возможного значения поля. Если быть уж очень педантичным, то в SQL это должно выглядеть так:

    select
    'G1' as name,
    G1,
    SUM(M)
    from T
    WHERE 
    D2 in (DQ2) and D3 in (DQ3) ... -- skip all filters related to G1
    and p between PFrom and PTo
    group by name, G1
    
    UNION ALL
    
    select
    'G2' as name,
    G2,
    SUM(M1)
    from T
    WHERE 
    D1 in (DQ1) and D3 in (DQ3) ... -- skip all filters related to G2
    and p between PFrom and PTo
    group by name, G2
    
    UNION ALL
    ...
    UNION ALL
    
    select
    'GN' as name,
    GN,
    SUM(M1)
    from T
    WHERE 
    D1 in (DQ1) ... and D(N-1) in (DQ(N-1)) ...  -- skip all filters related to GN
    and p between PFrom and PTo
    group by name, GN
    
    UNION ALL
    
    select
    'p' as name,
    p,
    SUM(M1)
    from T
    WHERE 
    D1 in (DQ1) ... and Dn in (DQn) ...  
    group by 'name', p

  4. Не напрягаться, если добавляются новые поля и новые значения полей.
  5. Не напрягаться, если появляются новые типы сообщений.
  6. Выдавать результат почти мгновенно, то есть в среднем за 100 мс, включая сеть, в худшем случае – за 2 с (на этот случай мы привинтим к нашей страничке крутящееся колесо).
  7. Уметь вставлять 3 млн новых комбинаций максимум за минуту.
  8. Уметь быстро удалять данные за прошедшие дни.
  9. Всё это должно работать на существующей инфраструктуре, то есть либо на одной машине (192 ГБ памяти, 48 коров), либо на Hadoop-кластере, либо на кластере Exasol, который у нас там как раз под рукой.
  10. Всё это должно быть простым в поддержке и позволять себя мониторить, почечуя не вызывать и загадками о своих болячках не говорить.

Облегчающие обстоятельства:

  1. Настоящая персистентность, чтобы сохранять данные сразу после каждого изменения, нам была не нужна. Новые агрегаты добавлялись раз в час, поэтому сохранять всё на диск нужно было один раз сразу после загрузки. Тем не менее сохранение не должно было быть блокирующим.
  2. Количество значений каждого поля – не больше 1000.
  3. Количество полей – не больше 100 (все они типа String).
  4. Никакого ACL (пока).
  5. Никаких транзакций и т. д. Естественно, счётчики должны обновляться атомарно.

Промежуточные выводы:

  1. Обработать 27 ГБ за 100 мс можно только в случае хитрого сжатия либо хитрого индексирования и использования всех CPU.
  2. Key-value stores нам не помогут. Помогли бы разные скриптовые возможности, которые есть на Lua в Redis и Tarantool, но они всё равно однопоточные и обработать столько данных в срок вряд ли смогут.
  3. Реляционные базы данных тоже не прокатят ввиду пунктов 4 и 5 требований.
  4. Presto, Impala и иже с ними – конечно, молодцы, но за 100 мс ничего сделать не смогут. Да и 100м записей для них — это как стрелять из пушки по воробьям. О Hadoop и MapReduce даже не заикаемся.
  5. Хитрые и интересные вещи вроде Druid и InfluxDB, наверное, решили бы эту задачу, но они слишком сложные. Возможности ставить для этого всего отдельный кластер просто не было. Мои более способные и менее ленивые коллеги об этом уже писали.
  6. Вы наверняка заметили, что это всё смахивает на time-series. Да, практически так и есть, только не в фас, а в профиль. На самом деле, каждый график, который у нас есть, – это не один time-series, а сумма миллионов комбинаций. Так что time-series store мы тоже отбросили.

Ввиду врождённой вредности и отсутствия времени на тестирование всех возможностей задачу подобрать инструмент для решения этой проблемы я любил наивным тоном ставить на собеседованиях. Но даже хорошие кандидаты мне ничего обнадёживающего сказать не могли; так исчезла последняя надежда, Elasticsearch, вроде как предназначенный раз для faceted search, – слишком медленно для нашего «атомного ледокола».

К тому времени постоянные унижения от коллег, отсутствие нормального сна из-за ночных дебошей Redis, всё слабеющая надежда найти что-то готовое и бесплатное и, чего греха таить, чисто гиковский интерес к решению захватывающей проблемы дали о себе знать: глаза мои налились кровью, рассудок помутился, и я решил создать решение сам.

Писал я дома, а прикрывал меня уже упомянутый полуторагодовалый сын. Мы с ним договорились, что, пока он после семи вечера мирно спит, я сижу в его детской и под видом отеческого бдения пишу код. Та пара недель до сих пор вспоминаются как самые счастливые за время отцовства.

Пару лет назад, когда мы выбирали новую аналитическую базу данных, я был поражён скоростью тех решений, которые оперировали только в памяти. Может, идея создать структуру данных, которая выдаёт результат за O(N) времени, очень неудачная на собеседовании, но на практике это вовсе не так печально (при условии, что все эти N элементов дружно живут в памяти).

У мене внутре неонка

Собственно говоря, проблемы было две: как данные вставлять и как их извлекать.

Извлечение счётчиков

Для хранения данных была придумана элегантная структура. Все агрегаты для некоторого типа сообщения предлагалось хранить в отдельном объекте (назовём его кубом). На самом верхнем уровне у нас как раз карта этих кубов с названием его как ключа карты.

Каждый куб партициирован по времени (в нашем случае – по дням и часам). Каждая партиция хранит агрегаты поколоночно. Добавление нового поля решается тривиальным добавлением нового элемента в карту этих полей.

Так как мы знаем, что максимальное количество всевозможных значений для каждого поля небольшое, мы можем закодировать его цифрой, причём в нашем случае хватит 10 бит (но для упрощения задачи и резерва я сделал его 16-тибитным). Это позволяет не только экономить память, но и осуществлять поиск быстрее, так как сравнивать теперь нужно только 16 бит, а не всю строку. В нашем случае есть два типа колонок: по которым ищутся значения, они 16-тибитные, и сами счётчики – 64-хбитные.

Таким образом, уже знакомый нам ключ screen_name=view_photo,prev_screen=welcome,platform=android,app_version=1.2.3,gender=male со значением 1500000 превращается в следующие данные: 1:1:1:1:1 1500000. И всё это теперь занимает 5x2 + 8 = 18 байтов. Экономия – налицо.

Поиск решается простым full scan, при каждой строке значение счётчика соответствующего значения поля увеличивается на значение из поля счётчика.

Скорость тоже неплохая. Текущая имплементация на Java спокойно может прочесать 20 млн значений в секунду в одном процессоре на Mac. Поскольку нам нужен faceted search, читать необходимо значения всех полей, а не только тех, по которым стоят значения фильтра, так что в случае пяти полей у нас всё ещё получается 4 млн счётчиков за секунду.

Вставка

Со вставкой, к сожалению, такая элегантная структура данных не справляется совсем. Если для вставки каждого элемента нам нужно будет прочёсывать всю память (что занимает, допустим, 100 мс), вставка 3 млн записей (а именно столько комбинаций у нас накапливается каждый час) займёт примерно 83 часа, что ни в какие ворота не лезет. Как-то даже обидно было бы за потраченные впустую CPU-циклы.

С другой стороны, мы знаем, что вставляются данные только за несколько последних часов или дней. А ещё мы знаем, что, если мы что-то вставляем, у нас есть все поля. Поэтому ничто не мешает нам создать обратный индекс, где ключом будут цифровые значения полей (1:1:1:1:1), а значением – номер строки, где эта комбинация находится. Создаётся такая структура за доли секунды, а время поиска ключа – вообще доли миллисекунды.

В нашем случае удалось достичь скорости порядка 150 000 записей в секунду. Это, конечно, при условии внутреннего вызова, не учитывая времени десериализации REST-запроса и сети. Такой обратный индекс живёт в кеше и создаётся для каждой партиции. Если к нему не обращались несколько дней, он удаляется и освобождает память.
Большинство наших данных вставляются в тот же (или предыдущий) день, когда они были отправлены, поэтому партиций с существующими обратными индексами действительно мало.

Как я уже говорил, поиск у нас тривиальный, а посему его можно тривиально параллелизировать. Поэтому партиции прочёсываются параллельно, в отдельных потоках. При наличии в среднем восьми полей и 48 процессоров на сервере у нас получается достичь скорости скана в 120 млн строк в секунду. То есть, если количество различных комбинаций не превышает 12 млн, мы всё ещё вписываемся в установленный нами период в 100 мс. Это, конечно, в идеальном мире, но мы почти уже там.

Сохранение на диск

Записывать данные на диск имеет смысл сразу в таком компактном виде. В начало файла записываются словари, а потом уже столбцы данных с цифрами вместо строк. И всё это хозяйство ещё и сжимается. Как ни странно, в случае с Java самым медленным компонентом оказался процессор (и gzip-сжатие). Переход на Snappy позволил уменьшить время сохранения с 60 с до 8 с.

«На всякий пожарный» предусмотрено ещё сохранение в формате JSON, чтобы можно было вручную перезалить данные в случае потери совместимости будущих систем.

Общение с миром

Весь интерфейс сделан через REST. Это, наверное, самая скучная часть всего ПО, так что, пожалуй, даже писать про неё ничего не буду. Вставка через PUT, запросы через GET, удаление через DELETE и т. д.

Java

Как я уже упоминал, написал я это всё на языке Java. Из всех так называемых быстрых языков я умею программировать только на нём. Но у меня есть очень сильное подозрение, что на C это работало бы ещё быстрее. Вкусняшки типа SIMD, я думаю, очень ускорили бы систему. Мечта жизни – вообще всё это переписать на Rust. Но поскольку на данном этапе производительность нас устраивает, а сын вырос и уже не соглашается ложиться спать в семь вечера, придётся с этим повременить***.

Вообще Java меня обрадовал и расстроил одновременно. В плане full scan там очень хорошая производительность, я даже не ожидал. Расстроил же меня сборщик мусора, который постоянно паникует, когда у него много висит и ничего не освобождается. Поэтому пришлось все эти структуры данных писать для off-heap, используя allocateDirect и unsafe’ы. И всё это, конечно, очень круто, но впечатление, что кодишь на C, а не на Java. Не уверен, что, когда мироздание создавало язык Java, оно предполагало именно такой вариант событий.

Ещё больше расстроил меня сборщик мусора, когда нужно было создавать эти самые счётчики для фасетов. Когда 48 процессоров одновременно создают HashMap на пару тысяч элементов, сборщик ведёт себя так, как будто у него это первый раз в жизни. В результате full scan пары миллионов строк занимает меньше времени, чем трансформация результатов из цифр в строки и последующая консолидация данных со всех партиций.

Настоящее

На данный момент наш единственный экземпляр содержит в себе около 600 кубов, которые состоят из порядка 500 млн записей. Всё это занимает около 80 ГБ резидентной памяти, а бекап в сжатом виде (Snappy) – около 5 ГБ на диске. Операция сохранения на диск занимает порядка 30 с.
Система работает очень стабильно и после фикса одной экзотической проблемы с хешами, равными MIN_VALUE не падала ни разу.

Будущее

Рассуждения о будущем – мои любимые, так как говорить можно что угодно, а цифры показывать не нужно.

Итак, есть несколько вариантов улучшить мир:

  1. Во-первых, как я уже упоминал, переписать решение на языке, более пригодном для таких вещей, как быстрая и многопоточная обработка данных, где сборщик мусора не будет смущать всех и самого себя своим присутствием.
  2. Во-вторых, было бы круто научить CubeDB общаться с себе подобными и разрастись в целый кластер.
  3. В-третьих, раз у нас всё так быстро и в памяти, имеет смысл перетащить некоторые хитрые алгоритмы поближе к данным. Например, некоторое подобие Anomaly Detection, как это сделали в Фейсбучной Горилле.

Так как идей у меня явно больше, чем времени, мы разместили наше решение в открытом доступе. И раз уж вы дочитали до этого места, вам наверняка интересно. Приходите, посмотрите, играйтесь, используйте. Я считаю, что идея очень простая, но интересная, и там ещё много что можно улучшить. Дерзайте!

P.S. Данные без визулизации – это как дискотека без музыки, потому мы одновременно выпустили набор front-end-компонентов для работы с API CubeDB, а также слепили простую страничку для демонстрации возможностей. Хочется предупредить однако, что демка бежит в тучке на машине с одним ядром и настоящую скорость обработки так оценить сложно: в нашей внутренней системе на настоящем железе и сорока восьми ядрах скорость отличается кардинально.

* На самом деле нет – мы там всё в JSON хранили, что вообще ужасно неэффективно.
** На самом деле нет, я только что посмотрел. Забыли снести – просто перестали пользоваться. Я вот отключил – мне сразу же ребята из отдела мониторинга позвонили. Молодцы!
*** А почему бы вам не попробовать?

Автор: Demeter Sztanko

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js