Badoo time-series storage: итак, она звалась Кассандрой

в 9:15, , рубрики: apache cassandra, nosql, rrdtool, time series, Блог компании Badoo, высокая производительность, Программирование

enter image description here

Привет! Меня зовут Евгений Гугучкин, я – разработчик Badoo в команде «Платформа».

Наша команда работает над интересными и нужными задачами. Одна из них – разработка распределённого хранилища временных рядов, в решении которой я принимал непосредственное участие.

Недавно мы завершили большой и сложный этап, и нам захотелось поделится с вами нашими успехами, рассказать, почему мы занимались этой задачей и каких достигли результатов.

Зачем нам вообще хранилище временных рядов

Представьте систему со сложным поведением, состоящую из множества компонентов и связей между ними. Оценить состояние этой системы, отклонение от нормы, причину этого отклонения – это нетривиальная задача. Badoo – это один из примеров такой системы. Мы собираем и храним значения огромного числа метрик, счёт идёт на сотни миллионов. И это жизненно важно для компании, поскольку позволяет обнаруживать проблемы на ранних этапах и оперативно находить источники этих проблем.

Чтобы иметь как можно более полную картину, мы собираем довольно много различных метрик, начиная от сугубо технических, связанных с профилированием кода и компонентов нашей архитектуры, и заканчивая множеством «продуктовых» метрик.

Этот большой поток данных мы должны сохранить. Куда? Правильно! В хранилище временных рядов.

На данный момент мы храним более 300 миллионов метрик, обновляя их со скоростью около 200 000 значений в секунду. Эти данные занимают 16 Тб на 24 серверах.

Какое хранилище мы использовали до сих пор

Уже в течение многих лет мы используем RRDtool – набор утилит для работы с временными рядами. Несмотря на то, что он был создан 18 лет назад и уже длительное время не развивается, он имеет свои плюсы:

  • «из коробки» умеет рисовать графики;
  • может на лету производить вычисления с временными рядами;
  • объём, занимаемый одной метрикой, не меняется независимо от того, сколько данных туда записано.

Последнее свойство для нас особенно важно, и я расскажу о нём подробнее. Каждая метрика хранится в отдельном файле в формате RRD. Файл содержит специальные структуры: кольцевые архивы.

enter image description here

Мы можем представить себе кольцевой архив как фиксированный набор ячеек, в каждой из которых хранится одно значение за определённый интервал времени.

Допустим, интервал времени, покрываемый ячейкой, равен пяти минутам, а наша агрегирующая функция – вычисление среднего арифметического. Значит, в ячейку попадёт сумма всех измерений, попадающих в нашу пятиминутку, поделённая на количество этих измерений.

Схематично этот процесс изображён на рисунке выше. Назовём его даунсемплинг, поскольку с его помощью мы понижаем уровень детализации данных.

Поскольку недавние измерения нам хочется видеть с хорошей детализацией, мы храним несколько кольцевых архивов для каждой метрики. И даже в этом случае снижение уровня детализации даёт нам существенную экономию места на диске.

В общем, за годы использования RRDtool мы оценили:

  • высокую скорость чтения: RRDtool написан на C, считывает данные с локального диска, производит все вычисления и отдаёт сразу готовую картинку в формате PNG; вряд ли какое-то другое решение превзойдёт его по скорости выполнения запроса;
  • отсутствие необходимости думать обо всех нюансах, связанных с закольцованностью и агрегацией: RRDtool делает всё сам, мы просто отправляем туда данные;
  • контролируемый размер датасета: нам достаточно знать количество метрик и не нужно учитывать частоту измерений.

Почему же мы задумались о новом хранилище? Самое время рассказать о недостатках и проблемах, с которыми мы столкнулись.

Почему нам понадобилось новое хранилище

Неэффективное использование дисков

Судя по всему, разработчики RRDtool не рассчитывали на запись больших объёмов данных, потому что уже при обновлении 300 метрик в секунду мы начинаем упираться в диск.

Причина, во-первых, в том, что используется read before write, то есть при каждом обновлении необходимо прочитать файл метрики с диска и записать новую его версию или часть.

Во-вторых, такая операция производится с каждой метрикой. Это значит, что для трёх сотен метрик мы имеем соответственно несколько сотен дисковых операций с произвольным доступом в секунду.

Для решения этой проблемы мы используем специальный демон rrdcached, который буферизует несколько поступающих значений одной метрики в памяти, скажем, за десять минут и записывает их потом за одну итерацию. Так мы смогли на порядок уменьшить количество дисковых операций и, следовательно, увеличить пропускную способность.

Следующим шагом стало использование SSD, что также дало десятикратное увеличение пропускной способности на запись.

В итоге мы добились показателя в 30 тысяч записей в секунду. И тем не менее в силу вышеперечисленных особенностей RRDtool на наших rrd-серверах мы имеем:

  • тысячи дисковых операций в секунду со случайным доступом и, как следствие, высокий iowait: 15–20%;
  • высокую интенсивность записи, что в итоге приводит к износу SSD;
  • расход оперативной памяти: в нашей конфигурации это более 30 Гб на буферизацию, и более 16 Гб ОС выделяет под inode cache (у нас десятки миллионов файлов-метрик в файловой системе).

Проблемы с резервным копированием

C RRDtool не работает подход инкрементальных бэкапов: отправляя одно значение в метрику, мы изменяем весь файл с метрикой, и при следующем бэкапе нам надо сохранить целиком новую версию. И так для каждой из десятков миллионов метрик на сервере. Это занимает так много времени, что от регулярных бэкапов нам пришлось отказаться.

Локальный доступ к файлам

Drawing Эта особенность сама по себе не является проблемой, пока мы помещаемся на одном сервере и не планируем горизонтально масштабироваться.

Когда же мы шагнули за пределы одного сервера, мы решили эту проблему консервативным способом: каждое приложение, работающее с временными рядами, прописано на определённом сервере и запускается только там, где хранятся его данные.

Drawing Это работает, хотя и доставляет неудобства:

  • каждый раз при создании нового приложения приходится думать, куда его лучше «подселить»;
  • если вдруг твой коллега одновременно с тобой выбрал тот же сервер, может возникнуть ситуация «перенаселения»;
  • нередко приходится переносить приложения со всеми его данными вручную с сервера на сервер.

В конце концов, и этот подход себя исчерпал: рано или поздно должно появиться приложение, данные которого не помещаются на один сервер. И этот момент настал – появилось приложение, под которое мы выделили отдельный сервер, однако свободное место стремительно заканчивалось. Счёт шёл на дни. Тогда мы оперативно сделали относительно простую реализацию для распределения записи и чтения через единую точку входа в виде REST API.

Это решение с самого начала рассматривалось нами как временное. Однако мы решили тактическую задачу: устранили препятствия для роста. Это сработало, и в таком виде датасет нашего приложения вырос до шести серверов.

Но это добавило и новых трудностей: мало того, что временное решение было неудобным в эксплуатации и требовало доработок, добавилась критичная для нас проблема – низкая отказоустойчивость: при остановке одной ноды останавливалась вся запись. К тому же это хранилище унаследовало все проблемы RRDtool, которые я описывал выше.

В общем, на этом этапе стало очевидно, что надо искать более подходящее решение, лишённое этих недостатков.

А раз уж мы решили искать замену, было бы неплохо заодно учесть и ещё несколько проблемных моментов, чуть менее критичных для нас.

  • Хотелось бы иметь возможность переписывать или дописывать данные задним числом. RRDtool, например, требует, чтобы каждое новое значение метрики было за время, большее предыдущего. Это вынуждает собирать все данные в одном месте, упорядочивать их по времени и отправлять в хранилище по очереди. К тому же мы вынуждены резервировать какой-то временной интервал, в течение которого ждём, чтобы по возможности все данные добрались до нашей центральной очереди.
  • Проблема с длинными именами метрик. Поскольку в RRDtool мы отображаем имя метрики на имя файла, то упираемся в ограничения файловой системы, где полный путь файла не должен превышать 255 символов.
  • Компактность. В случае хранения разреженных временных рядов в формате RRD фиксированный размер файла поворачивается к нам своей… тёмной стороной: даже одно записанное значение резервирует на диске место как для полноценной метрики.

Помимо этого, мы добавили такое требование, как долговечность, поскольку, как оказалось, некоторые из рассматриваемых решений при определённых условиях теряют сохранённые в них данные.

Какие альтернативы мы рассматривали

Итак, мы составили список требований и оценили наиболее популярные open-source-решения в данной предметной области. Вот что у нас получилось:

image

На самом деле, не всегда возможно чётко определить соответствие тому или иному требованию (там, где ситуация неоднозначна, я поставил знак вопроса).

Лично мне очень понравились решения, которые мы рассматривали. Во всех есть интересные идеи и просто удобные функции. Но сейчас я не буду останавливаться на них подробно, а постараюсь обозначить только важные для нас моменты и критичные недостатки.

Graphite

Обладает большинством недостатков RRDtool, к тому же потребляет очень много CPU. Но главная претензия – Graphite «молча» теряет данные, когда ему не хватает ресурсов, если, например, не успевает обрабатывать или записывать.

OpenTSDB

При всех достоинствах этого решения оно обладает и рядом досадных ограничений:

  • отсутствие даунсемплинга при записи и хранении;
  • невозможность понять, какие метрики (и сколько) в него записаны;
  • ограничения на названия метрик;
  • отсутствие простого способа удалить метрику (а мы регулярно удаляем устаревшие);
  • запрос на получение последнего значения метрики приводит к сканированию данных на диске и очень неэффективен;
  • ограничения в API массового чтения и, как следствие, затруднения при реализации даунсемплинга.

Причём все эти проблемы – не следствие архитектурных ограничений, а просто недоделанный функционал.

InfluxDB

Это решение казалось идеальным. Не хватало только одного, самого важного, пункта – оно не масштабировалось «из коробки». Впрочем, его open-source-версия не масштабируется и сейчас: авторы работали над кластеризацией больше года и в итоге решили закрыть этот функционал. А жаль, мы очень рассчитывали…

Druid

Когда мы знакомились с ним, это был не столько сервис, сколько фреймворк. К тому же почти без документации. Чтобы получить от него то, что мы хотели, пожалуй, пришлось бы дописывать или переписывать какие-то его части.

Elasticsearch

Elasticsearch – вообще-то продукт немного из другой области. Но в последнее время в нём появились функции для аналитических запросов. В целом, он доказал, что справляется с задачей хранения «сырых» данных, но при этом требовал в четыре раза больше места на диске и потреблял в пять раз больше CPU, чем OpenTSDB. И ещё мы столкнулись со странной особенностью: чем больше шардов в индексе, тем медленнее он работает на запись (хотя здравый смысл подсказывает, что должно быть наоборот).

В итоге ни одно из этих решений из коробки не удовлетворило наши требования. Нужно было либо продолжать поиски, либо «взяться за напильник», чтобы добиться желаемого.

Cassandra. Почему мы выбрали её

Надо сказать, что два решения из таблицы нас особенно заинтересовали, так что мы интегрировали эти хранилища на уровне proof of concept в наш фреймворк для испытания в реальных приложениях на реальной нагрузке.

Речь про InfluxDB и OpenTSDB. К сожалению, InfluxDB отпал после того, как разработчики отказались от поддержки кластеризации в открытой версии, а OpenTSDB не поддерживал даунсемплинг и ещё некоторые нужные нам функции, о которых я уже упоминал (удаление метрик, получение последнего значения и пр.).

Однако OpenTSDB показал очень хорошую производительность, масштабируемость, отказоустойчивость. В общем, самые важные пункты в нашей таблице.

Если разобраться, OpenTSDB – это «обёртка» над распределённой СУБД Apache HBase, добавляющая REST API-доступ и умеющая упаковывать данные для СУБД соответствующим образом. Все понравившиеся нам качества обеспечиваются именно распределённой СУБД, а все недостающие функции – легко реализуются поверх неё.

Когда мы, наконец, осознали, что подходящего хранилища не существует, мы решили сделать его сами по аналогии с OpenTSDB на основе распределённой СУБД, добавив туда кольцевые архивы, как в RRDtool.

Но теперь перед нами встал выбор распределённой СУБД. На этот раз мы не стали проводить масштабное исследование, а ограничились сравнением уже упомянутого Apache HBase и ещё одной распределённой СУБД – Apache Cassandra. Обе СУБД основаны на модели данных Google Bigtable. Apache Cassandra к тому же позаимствовала некоторые идеи у Amazon DynamoDB, что пошло ей только на пользу.

Как водится, у каждого варианта есть как плюсы, так и минусы. Тем не менее они оба способны справиться с поставленной нами задачей. Как же мы выбирали между этими СУБД?

На мой взгляд, каждая история выбора – это всегда немного субъективная история. И тем не менее я попробую обосновать наш выбор. Ниже в таблице – список различий, которые повлияли на наше решение. Было бы неправильно говорить, что это недостатки HBase – нет, просто особенности, которые надо иметь в виду при разработке.

Cassandra HBase
Отсутствие зависимостей Требуются ZooKeeper и Hadoop
Простота настройки Нужно настраивать все компоненты
Язык запросов CQL Громоздкий API
Децентрализованная Есть мастер нода – SPOF
Равномерное распределение нагрузки Проблема hotspot
Материализованные представления Реализация на стороне приложения
Вторичные индексы Реализация на стороне приложения
Logged Batches Реализация на стороне приложения
Производительный драйвер для PHP 7 Thrift-клиент для PHP

Как видно из таблицы, работать с Cassandra проще и удобнее по многим пунктам. Это и более простая установка, и настройка, и свой язык запросов, и хорошая документация. В отличие от HBase, в Cassandra нет мастер-ноды и, соответственно, не требуется её дублирование для обеспечения высокой отказоустойчивости. C Cassandra не надо следить за тем, чтобы данные распределялись равномерно между узлами, в то время как в HBase возможны перекосы в нагрузке, и приложение само должно следить за тем, как распределяются данные. Не последнюю роль сыграло и наличие драйвера для PHP 7, на который мы тогда перешли.

Как мы храним временные ряды в Cassandra

Подробное описание Apache Cassandra не уместилось бы в целую статью. К тому же в интернете есть достаточное количество хороших материалов на эту тему, в том числе и на Хабре. Однако некоторые общие моменты стоит упомянуть.

  • Данные в Cassandra распределяются в кластере таким образом, что каждая запись имеет копии на соседних узлах. Количество копий записи (фактор репликации) – конфигурируемый параметр.
  • В случае если одна из нод становится недоступной, запросы как на чтение, так и на запись продолжают обслуживаться соседними узлами, содержащими реплики недоступной ноды.
  • Добавление новых узлов происходит без простоя. Данные автоматически перераспределяются по кластеру.
  • Производительность увеличивается линейно с ростом размера кластера.
  • Для хранения данных используется LSM-дерево – структура, обеспечивающая очень эффективное обновление данных и вставку новых записей.
    Drawing
  • Для доступа к данным Cassandra предлагает табличную модель данных и язык запросов CQL, однако это всё синтаксический сахар, а в реальности «под капотом» лежит модель key-value. Но в отличие от простой key-value-модели, где и ключ и значения имеют тип BLOB, в Cassandra value является структурой, а именно – ассоциативным массивом, упорядоченным по значению ключа. При этом:
    • можно изменять и добавлять за раз по одному или более элементу такого массива;
    • элементы каждого такого ассоциативного массива хранятся на диске по возможности последовательно;
    • это позволяет очень эффективно одним обращением к диску получать как один элемент массива, так и диапазон значений, и, разумеется, весь массив целиком.

Вот именно на уровне этой key-value-модели я и попробую на пальцах объяснить, как мы реализовали хранение временных рядов.

Drawing Для начала давайте рассмотрим хранение первичных, неагрегированных, данных. Они состоят из имени метрики и её значений. Каждое значение – это пара timestamp: double. Поместим эти данные в таблицу Points.

Как видно, такая структура очень хорошо ложится на внутреннее представление данных в Cassandra. Но есть один нюанс: так мы можем легко превысить ограничение Cassandra на размер ассоциативного массива. Формально это 2 миллиарда, но на практике лучше не превышать 100 тысяч.

Drawing Это ограничение легко обойти, если шардировать данные одной метрики по суткам. В названии ключа к имени метрики добавляется дата. Теперь значения метрики будут хранится в нескольких ассоциативных массивах – по одному на каждые сутки. Назовём эти шарды с первичными данными суточными сегментами, или просто сегментами. Если записывать значения в метрику каждую секунду, то размер сегмента не превысит 86 400 значений.

Drawing С первичными данными, кажется, всё ясно. Но напомню, мы хотели хранить исторические данные в виде агрегированных кольцевых архивов. Эти данные будем хранить в новой таблице Rollups.

Здесь у нас будет сложный ключ. Он будет определять и метрику, и свойства архива. В принципе, всё, как в RRDtool: архив определяется периодом архива, ценой ячейки и агрегирующей функцией.

А в ассоциативном массиве храним:

  • ключ – номер ячейки К, вычисляемый по формуле K = floor(time % retention / granularity);
  • значение – кортеж, состоящий из пары: время, соответствующее ячейке, и собственно значение ячейки.

В итоге данные одной метрики оказались разбросаны по разным таблицам и ключам. У метрики есть первичные данные в нескольких сегментах и несколько архивов с разной детализацией.

Чтобы прочитать все данные метрики, нужно знать её ключи – какие у неё сегменты и архивы. Такую информацию мы соберём в отдельную таблицу Meta.

enter image description here

Тут по имени метрики мы храним:

  • список её сегментов;
  • список её архивов;
  • самую старшую ячейку среди всех архивов метрики (на временной шкале это можно представить как правую границу заархивированных данных; с её помощью мы всегда можем и вычислить левую границу, и выбрать самый подходящий архив, соответствующий запросу).

Можно заметить, что название метрики входит в состав всех ключей. Это усложняет операцию её переименования и увеличивает расход места на диске, потому что метрики у нас действительно длинные, в среднем не менее 100 символов. Мы приняли решение вместо названия использовать идентификатор и хранить его в отдельной таблице Names.

Теперь нам достаточно создать новую запись в таблице Names – и мы переименовали метрику. Кроме того, в будущем метрикам можно будет назначать атрибуты и делать сложные выборки метрик по значениям этих атрибутов.

enter image description here

Перезапись данных и даунсемплинг

Мы решили хранить как первичные данные, так и агрегированные. Первичные, в таблице Points, хранятся несколько дней, после чего целыми сегментами агрегируются и отправляются в архив. Это позволяет нам с одной стороны перезаписывать данные задним числом (правда, только в пределах последних семи дней), а с другой – хранить данные в компактном архиве.

У нас высокие требования к производительности даунсемплинга. Чтобы успевать, надо обрабатывать 700 метрик в секунду, или более 60 миллионов в сутки. Именно столько сегментов у нас создаётся за сутки в данный момент. Как обеспечить это и не нагрузить диски? Cassandra позволяет последовательно сканировать всю таблицу или какую-то её часть. И за счёт этого удаётся избегать неэффективного рандомного обращения к диску. При текущих объёмах мы можем вычитать все первичные данные за шесть–восемь часов в несколько потоков. Благодаря этому максимальная производительность нашего даунсемплинга – 170 миллионов суточных сегментов в сутки.

Чтение данных

Запрос на чтение содержит следующие параметры:

  • metric_name – имя метрики;
  • start – начало запрашиваемого интервала;
  • end – конец запрашиваемого интервала;
  • func – агрегирующая функция;
  • step – шаг детализации.

Выполняется такой запрос по следующему алгоритму:

  1. По имени метрики находим идентификатор.
  2. По идентификатору получаем метаинформацию о метрике: список сегментов, список архивов и максимальное время архивов.
  3. Выбираем подходящий архив таким образом, чтобы он соответствовал агрегирующей функции, покрывал запрашиваемый интервал времени и имел подходящую детализацию.
  4. Выбираем сегменты, входящие в искомый интервал.
  5. Архивы и «сырые» данные объединяем и ресемплим с нужным нам шагом детализации.
  6. На выходе получаем набор значений метрики от start до end с шагом step.

enter image description here

REST API

Чтобы иметь возможность писать в наше хранилище не только из PHP, мы реализовали доступ к нему через REST API. И реализовали его, конечно, на любимом нами nginx и PHP.

Важным моментом является использование кэша, куда мы сохраняем идентификаторы метрик по их именам. Нам нужен быстрый кэш, откуда мы планируем читать более 200 тысяч раз в секунду. Мы используем APCu cache.

Чтобы REST API не стал «бутылочным горлышком», мы сделали его распределённым. А чтобы повысить hit rate и сделать использование кэша метрик эффективным, мы закрепляем метрику за конкретным сервером по ее хешу.

Для клиента все узлы одинаковые, и ему необязательно знать о привязке метрики к серверу. Как в этом случае обрабатываются запросы? Давайте рассмотрим на примере пяти узлов.

Клиент выбирает один из пяти узлов кластера REST API случайным образом. Допустим, на узел №3 приходит пакетный запрос на запись 100 метрик. На сервере эти 100 метрик группируются в пять подзапросов. Так что в каждой группе оказывается приблизительно по 20 метрик, закреплённых за одним сервером. Впоследствии эти подзапросы отправляются на соответствующие им соседние узлы.

enter image description here

В такой схеме, когда метрика должна обрабатываться только на конкретном узле, мы получаем проблему в случае падения узла. То есть нам недостаточно иметь отказоустойчивое хранилище – нам нужен отказоустойчивый кластер REST API-узлов.

enter image description here

В нашем случае это не представляет труда. Мы реализовали простой failover.

Представим, что нода №4 оказывается недоступной. В этом случае один из подзапросов остаётся необработанным. А нам нельзя останавливать обработку и ждать, пока узел №4 поднимется.

Решение очевидно: мы незаметно для клиента повторяем процедуру с группировкой метрик, но теперь только для одного необработанного подзапроса, и распределяем подзапросы только между живыми узлами.

enter image description here

Данные о родных метриках кэшируются узлами на сутки, а данные о чужих – только на один час. Таким образом, падение одной ноды не особенно сказывается на hit rate: большинство метрик по-прежнему обслуживаются своими узлами, а чужие метрики не занимают кэш надолго.

Каких результатов нам удалось добиться

К настоящему моменту мы перенесли часть данных в новое хранилище. Это метрики того самого приложения, датасет которого не помещался на одном сервере и для которого мы сделали импровизированный кластер из шести серверов.

Вот что у нас в итоге получилось:

  • новый кластер состоит из девяти серверов;
  • в данный момент там хранится 180 миллионов метрик;
  • максимальная пропускная способность на запись – 250 тысяч в секунду;
  • максимальная скорость даунсемплинга –170 миллионов сегментов в сутки;
  • для самих временных рядов мы используем фактор репликации, равный двум;
  • объём данных – приблизительно 9 Тб;
  • REST API-кластер у нас размещается на тех же серверах, что и кластер Cassandra.

Сравнивая новое хранилище с нашим RRD-кластером, стоит отдельно упомянуть о таком показателе, как средний размер метрики. Формально при прочих равных условиях средний размер метрики в Cassandra на 30% меньше, чем в RRD-хранилище. Но используемые для хранения в Cassandra LSM-деревья требуют резервирования свободного места на диске под временные файлы (для процесса, называемого compaction).

В нашем случае приходится резервировать около 60% от размера датасета. В итоге средний размер метрики с учётом этого получился на 20% больше, чем в старом хранилище. Это можно рассматривать как цену за производительность и отказоустойчивость.

Подводя итоги, можно сказать, что наше новое решение для хранения временных рядов предоставляет нам отказоустойчивость и масштабируемость, превосходя при этом старое хранилище по производительности. И главное – пусть с некоторыми оговорками, но мы достигли практически всех целей, которые ставили перед собой.

К сожалению, в рамках одной статьи невозможно рассмотреть все вопросы реализации. Так, мы не затронули такие вопросы, как уровни консистентности при чтении и записи; как мы обходимся без транзакций, полноценная поддержка которых в Cassandra отсутствует; как мы используем материализованные представления; какие оптимизации мы планируем сделать; и многие другие. Возможно, этому будет посвящён один из наших следующих постов.

Автор: Badoo

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js