Свой S3-server: что делать, если ваши десятки петабайт уже не лезут в коробочные объектные хранилища

в 8:24, , рубрики: ceph, lusca, ozon tech, rgw, s3, s3server, Scylla, scylladb, storage

В 2024 году уже незачем рассказывать об S3-интерфейсе и сравнивать его с другими вариантами организации объектного хранилища. Вот и мы в Ozon, конечно, предоставляем такое платформенное решение широкому спектру внутренних потребителей. От сервисов, которые хранят картинки товаров для каталога, до бэкапов баз данных. От собственных внутренних разработок, до open-source-решений, таких как Gitlab и Thanos.

Пока у вас десятки терабайт и сотни RPS, вас устраивают такие решения, как MinIO. Но по мере роста объёмов и запросов приходится смотреть в сторону таких решений, как Ceph с RGW (RADOS Gateway / Object Gateway). Ну, а когда у вас 3 дата-центра, десятки петабайт данных, миллиарды объектов и десятки тысяч запросов в секунду — в таких условиях и у RGW начинаются проблемы.

Эта история началась с того, что и мы с проблемами масштабирования столкнулись. Под хабракатом вы узнаете, как мы прошли через отрицание проблемы, гнев на Ceph, торг с CTO и разработку собственного решения. Как выбирали технологии, на какие грабли наступили, и что в итоге получилось.

Свой S3-server: что делать, если ваши десятки петабайт уже не лезут в коробочные объектные хранилища - 1

Привет! Меня зовут Максим и я ведущий инженер в команде разработки объектного хранилища в Ozon. Я расскажу вам, как мы пришли к необходимости разработки собственного S3-сервера, как выбирали архитектуру, на какие грабли наступили в процессе разработки и запуска. А ещё о том, как мы переезжали на своё решение и как живём теперь.

Инфраструктура объектных хранилищ в Ozon — это порядка 60 PiB данных и 5 млрд объектов, распределённых по примерно 20 тысячам бакетов, каждый объёмом от 1 MiB до 1,5 PiB. А ещё это порядка тысячи разных сервисов, использующих S3. На разных языках, с разными данными и с очень отличающимися паттернами по создаваемой нагрузке. Всё это живёт в трёх разных дата-центрах и отвечает требованию ДЦ-1, то есть должно продолжить работу при полной потере одного из дата-центров.

Проблемы с Ceph RGW

Прежде чем рассказать о том, во что мы упёрлись, стоит поговорить о Ceph. Основной подход к хранению — размещение однородных блоков Rados objects равномерно по демонам, отвечающим за хранение (OSD). Но для поддержки различных протоколов доступа, таких как RBD, CephFS, S3, Swift, требуются не только данные, но и метаинформация. Работа с метаданными плохо вписывается в концепцию однородных блоков по 5 MiB.

Когда-то в Ceph для решения этой задачи использовали развёрнутую поверх OSD файловую систему (например, ext4 или xfs) и это называлось FileStore. Начиная с 11-ой версии в stable перевели новое решение: BlueStore, когда метаданные хранятся в key-value-базе данных RocksDB. Она используется как embeded-хранилище метаданных и по-прежнему размещается на тех же OSD, включая саму базу и wal-логи. Но, в отличие от FileStore, такое решение обеспечивает транзакционность.

До Pacific-версии Ceph, RocksDB была нешардированной и, как любая плоская нешардированная база, могла становиться слишком большой и деградировать при большом количестве модифицирующих операций. Начиная с 16-ой версии Pacific, добавили возможность её шардировать и решардировать. Может показаться, что проблема масштабирования решена, но для инициализации RocksDB всё ещё требует чтения большого объёма данных. Что создаёт одновременную нагрузку на все OSD, где она размещена.

К скорости чтения метаданных предъявляются существенно более высокие требования, чем к дата-пулу. Модифицирующие операции создают большую нагрузку, даже с учётом шардирования. Тот факт, что блоки сравнительно большие и иммутабельны, создаёт проблемы: мы должны вычитать RocksDB, внести изменения и флашнуть wal на OSD. Чтобы это происходило консистентно, wal флашатся в один поток. Поэтому эти OSD крайне рекомендуется размещать на быстрых дисках, например SSD или NVMe SSD.

Но и быстрые диски — не панацея. Мы в любой момент можем получить деградацию на больших объёмах операций удаления и записи объектов, если шард большой, а изменений много. Для решения этой проблемы, многие бакеты приходится сильно решардировать, например, на тысячи шардов индекса. С другой стороны, большое количество шардов тоже создают проблему, ведь при каждой операции листинга приходится поднимать блоки сразу в большом количестве OSD. Стоит добавить ещё, что количество шардов должно быть взаимно простым относительно числа физических серверов. Иначе мы можем получить неожиданный перекос нагрузки и взаимное влияние разных бакетов, которые одинаково распределились по серверам.

В итоге, балансируя вручную количество шардов, нам долгое время удавалось добиваться относительной стабильности работы Ceph RGW. Всё это немного походило на чёрную магию, ведь даже при наличии большого количества метрик, в моменте очень сложно понять, что происходит внутри конкретного OSD.

Периодически мы ловили такие эффекты, как непредсказуемые кратковременные всплески latency. Постфактум их удавалось объяснять ростом количества запросов в конкретный бакет. Но не удавалось точно воспроизвести, ведь при той же нагрузке на тот же самый бакет проблема не повторялась. Очевидно, что тут складывалось много условий одновременно, которые в синтетическом тесте не получалось повторить.

Другая проблема — очень большие бакеты. Начиная примерно с 25 миллионов объектов, операции листинга и удаления становятся ну очень тяжёлыми. А ближе к 100 миллионам, даже просто однократный полный рекурсивный листинг может вызвать кратковременную деградацию кластера. Аналогичные деградации получались из-за «фантомных» объектов, которые не видны при листинге, но они замусоривают индекс, — это один из багов неконсистентности RGW.

Очевидный выход — не хранить метаданные в той же архитектуре, вытащить их наружу и использовать такой вот внешний индекс для хранения метаданных и карты разбиения логических объектов на физические (почти) блоки Rados Object. Это соображение и составляет ядро архитектуры нашего S3-сервера Lusca, но об этом чуть позже.

Требования к своему решению

Теперь, когда мы выяснили основные проблемы, можно подумать над тем, какие требования мы бы ещё хотели закладывать в архитектуру.

Базовые функциональные требования

  1. CRUD — основные операции S3-протокола (PutObject/GetObject/DeleteObject/ListObjects).

  2. Политики доступа AIM.

  3. Lifecycle — фоновое удаление по сроку жизни объектов и другим условиям.

  4. Статистика по бакетам и квотирование — пользователям в нашем PaaS хочется знать количество объектов, занимаемое место и другие параметры. Этого нет в спецификации S3, но часто делают в подобных системах.

А что ещё?

Проанализировав паттерны использования S3, мы заметили ряд сервисов, которые используют хранилище как стейт-машину. Например, сначала они сохраняют объект по пути /tmp/, а потом другой воркер разбирает эти временные файлы, обрабатывает и перекладывает по другому пути. В терминах S3 это: PutObject в /tmp/, CopyObject с другим названием, а потом DeleteObject для первого объекта. Звучит не очень оптимально, правда? Особенно учитывая, что RGW для этого делает 3 обращения к дискам, физически копирует данные во второй объект и потом удаляет первый. Хотелось бы сократить это до одной операции с данными — записи исходного объекта.

Другими словами, нужна дедупликация данных в blob storage. Мы получаем более оптимальное копирование объектов — копируем только метаданные. Кроме того, это позволит нам сделать дешёвые ретраи запросов — мы сможем записывать только новую часть данных, которую не получилось записать за прошлую попытку. А ещё мы сможем сэкономить деньги на HDD, ведь часть данных в бакетах дублируется, но об этом позже.

Версионирование

При работе с данными никакая система не защищена от человеческого фактора, но можно постараться снизить его влияние. Мы решили, что за счёт дедупликации данных мы можем для повышения надёжности и уменьшения рисков пользовательских ошибок включить всем бакетам версионирование по умолчанию. Версионирование — это хорошо описанный в документации функционал, он позволяет хранить предыдущие версии для каждого объекта в бакете. В таком случае, операция удаления — это всего лишь создание новой версии объекта типа delete marker.

Таким образом, восстановление — очень простая операция. Нужно удалить новую версию и тогда старая версия будет сразу доступна. Для того чтобы не хранить версии объектов бесконечно, мы можем воспользоваться правилом Lifecycle NoncurrentVersionExpiration. Мы решили, что можем по умолчанию хранить версии объектов несколько дней. Конечно, этот параметр можно настроить. 

Классы хранения

По умолчанию все наши системы проектируются отказоустойчивыми с большой доступностью. Это позволяет пережить падение одного из дата-центров, в нашем случае это означает использование реплика-фактора 3. Но есть системы, где мы можем позволить себе другие гарантии, такие, где данные в основном «холодные». Часть таких систем имеют характер write-only, то есть критически важно данные записать, но не так критично иметь доступность на чтение в моменты аварий.

Для таких случаев нам нужна поддержка нескольких классов хранения в S3. В спецификации есть понятие разных классов хранения, но для нас не нашлось подходящего готового названия, поэтому мы решили использовать свой — erasure code (EC). Он позволяет хранить данные с коэффициентом хранения 1.5, подробнее о нём можно прочитать в документации Ceph.

Вместе с поддержкой дополнительного класса хранения нам нужен transition lifecycle — фоновый процесс, который позволяет перемещать объекты из одного класса хранения в другой по набору правил. Например, перемещать все объекты старше 1 месяца или объекты с определённым префиксом в названии.

Ещё одна функциональность, которая нам нужна — Event Notifications. Это набор API, который позволяет внешним сервисам подписаться на события для бакета, например, о создании или удалении объектов, и читать эти события из Kafka. Для некоторых сценариев это может использоваться как хорошая оптимизация — не нужно периодически синхронизировать метаданные сервиса в БД и в S3 либо придумывать способы транзакционной работы S3 с брокерами сообщений. Эта сложность переходит на сторону нашего решения.

Архитектура

Архитектурно наш S3-сервис состоит из двух основных частей: индекс с метаинформацией об объектах и blob storage, который отвечает непосредственно за хранение данных объектов.

Blob storage

Как уже было сказано, у нас было много проблем с эксплуатацией Ceph RGW, то есть со стандартной реализацией S3 поверх Ceph. Но сам Ceph, как хранилище, нас полностью устраивает: это распределённое объектное хранилище, которое поддерживает разные реплика-факторы, умеет масштабироваться до сотен петабайт и больше, устойчиво к отказу части нод, self-healing, не имеет единого центрального компонента — то, что нам надо. И у нашей команды эксплуатации есть огромный опыт поддержки этого решения. Поэтому мы решили использовать низкоуровневый интерфейс Rados, который предоставляет CRUD-like key-value-доступ к объектам в хранилище. Librados написан на C и имеет биндинги на разных языках программирования, в том числе для Go.

Мы используем два вида хранения: с реплика-фактором 3 и EC для холодного хранилища. У Ceph есть такая особенность, что все операции — и запись и чтение — проходят через primary-зону. Таким образом гарантируется консистентность данных. И если для записи это совершенно нормально, операции записи всё равно синхронно реплицируются, то есть нагружают кластер равномерно, то с операциями чтения не так, они проходят только с primary-зоны и нагружают только её. Чтобы обойти эту особенность и максимально утилизировать диски, мы используем 3 логических пула данных. То есть у нас есть один кластер Ceph на 3 ДЦ и 3 пула данных, каждый пул имеет свой ДЦ как primary, а в каждом из ДЦ есть OSD с данными каждого пула. Таким образом, все диски нагружаются на чтение и на запись со всех пулов данных, что повышает утилизацию. Для холодного хранилища EC поднимается по отдельному кластеру Ceph на каждый ДЦ.

Свой S3-server: что делать, если ваши десятки петабайт уже не лезут в коробочные объектные хранилища - 2

Индекс

Первый вопрос, который перед нами возникает — какую СУБД использовать? Уже понятно, что нам нужно решение, которое позволяет бесконечное горизонтальное масштабирование и высокую доступность. Мы рассматривали PostgreSQL, ScyllaDB, YDB. По PostgreSQL у нас была наибольшая экспертиза. ScyllaDB имела наиболее близкие к нашему паттерну кейсы использования и хорошую репутацию. А YDB только вышла в open source и подавала большие надежды. 

По итогу исследований, в PostgreSQL не было удобного шардирования из коробки, а писать и поддерживать свой велосипед не очень хотелось, поэтому решили отказаться. YDB звучало перспективно, но это был совсем новый для нас (и не только нас) зверь, поэтому решили, что это слишком рискованно. Документации, статей и примеров использования было мало. Кажется, рынок ещё не выработал best practices на тот момент. Выбор пал на ScyllaDB.

ScyllaDB — это современная альтернатива Apache Cassandra, распределённая NoSQL-база данных с master-master-архитектурой, соответствующая AP (Availability и Partition Tolerance) свойствам согласно CAP-теореме. ScyllaDB имеет поддержку шардирования данных из коробки благодаря кольцевой архитектуре, где всё кольцо представляет собой распределение значений от -263 до +263-1, и каждой ноде в кластере принадлежит лишь диапазон этих значений, а данные распределяются по нодам согласно хэшу от ключей партиционирования для таблиц.

В ScyllaDB таблицы группируются в логическую сущность, называемую keyspace. Чтобы соответствовать структуре хранения данных в Ceph, а именно, один логический пул с данными на каждый ДЦ, для одного кластера ScyllaDB было решено использовать 3 keyspace, но только уже без привязки самих keyspace к ДЦ, так как в этой БД нет понятия primary и реплик, но есть master-master-архитектура. Это ещё хорошо тем, что таблицы получаются в 3 раза меньше и операции обслуживания ScyllaDB, например, repair, проводятся гранулярнее по keyspace и быстрее. Каждую связку keyspace + data pool было решено назвать зоной, так как по сути это часть кластера, а кластер там один и тот же. И S3-server деплоится отдельно на каждую зону с полностью независимой конфигурацией, мониторингом.

Поскольку ScyllaDB — распределённая БД, помимо прочего, в ней можно для каждой операции указывать желаемый уровень консистентности. То есть можно указать, от какого количества нод нужно дождаться ответа, чтобы отдать ответ клиенту. Мы стремимся к strong consistency, поэтому мы выбрали для записи EACH_QUORUM — это кворум в каждом ДЦ, и LOCAL_QUORUM для чтения — это кворум в одном ДЦ, куда пришёл запрос.

Хранение объектов

Верхнеуровнево в S3 достаточно простая предметная область: есть сущности пользователей, бакетов, объектов, мультипартов и это примерно всё. Есть ещё метаданные к ним, например, политики доступа к бакетам Policy и ACL, настройки Lifecycle, тегирование объектов и другое, но в такие подробности мы вдаваться не будем.

Из требований к хранению есть: хранить версии объектов и дедупликация данных в blob storage. Для оптимального хранения данных в blob storage мы решили разбивать данные чанками до 5 MiB и работать с ними как с immutable. Помимо прочего, это позволяет распараллеливать загрузку и скачивание данных из стораджа — это можно делать в несколько потоков на каждый чанк. Для простоты дедупликацию мы решили сделать на уровне этих чанков — если sha256-хэш для чанков совпадает, значит данный чанк можно переиспользовать, а не загружать в сторадж новый.

Таким образом, мы пришли к архитектуре, когда есть 3 основных таблицы. Таблица s3_objects, отвечающая за метаданные S3-объектов, таблица rados_objects, содержащая в себе описание всех объектов в blob storage, а именно, их id и sha256-хэш с индексом по хэшу, который нужен для дедупликации. И таблица chunks, которая связывает s3_objects и rados_objects — она указывает для какого S3-объекта какие id объектов из blob storage используются и в каком порядке.

Свой S3-server: что делать, если ваши десятки петабайт уже не лезут в коробочные объектные хранилища - 3

Внимательный читатель может заметить проблему — отсутствие транзакционности операций между индексом и blob storage. Мы не можем записать сначала данные в rados, а потом в индекс, потому что если при записи в индекс будет ошибка, у нас будет утечка — в blob storage данные есть, а в индексе про них не знают, и они никогда не удалятся. Решить это противоречие смогли с помощью статусной модели для rados_objects. То есть мы сначала делаем запись в индекс со статусом pending, потом записываем данные в rados, а затем выставляем записи в индексе статус OK, что означает что объект успешно сохранён и его можно использовать. Для удаления данных из blob storage был добавлен фоновый процесс garbage collector, об этом подробнее будет ниже.

В итоге алгоритм записи объекта в S3 выглядит примерно так:

  1. читаем данные из стрима, разбиваем на чанки по 5 MiB;

  2. параллельно в N потоков обрабатываем чанки:

    1. считаем хэш,

    2. по хэшу находим существующие радос-объекты, чтобы переиспользовать. Если находим, используем его id,

    3. если переиспользовать не получилось, генерируем id и пишем в rados_objects со статусом pending,

    4. записываем данные в rados,

    5. проставляем статус OK или удаляем запись;

  3. собираем итоговый объект, записываем батчем в таблички s3_objects и chunks.

Тогда таблицы будут выглядеть примерно так:

Hidden text
CREATE TABLE s3_objects {
    bucket text,
    object_key text,
    version_id text,
    PRIMARY KEY ((bucket, object_key), version_id) // шардирование по паре bucket + object_key
}

create table chunks
(
    bucket      text,
    object_key  text,
    version_id  text,
    chunk_index int,
    oid         text,
    rados_pool  text,
    size        int,
    primary key ((bucket, object_key), version_id, chunk_index)
)

create table rados_objects
(
    oid                      text primary key,
    creation_date            timestamp,
    hash                     text,
    object_status            int,
    rados_pool               text,
    size                     int,
    status_modification_date timestamp
)

Листинг объектов

Для получения информации об объектах в бакете в S3 существует операция ListObjects. На самом деле их несколько, есть обычный ListObjects, есть ListObjectsV2, а ещё есть отдельный листинг для версионированных бакетов, ListObjectVersions, который позволяет получить разные версии по ключам объектов, а не только последнюю актуальную версию, как в обычном листинге.

Они все довольно похожи: есть возможность листить бакет рекурсивно (без указания разделителя), с разделителем /, то есть вернуть всё содержимое конкретной «папки». Да, в S3 нет понятия папок, S3 — это key-value-хранилище, но для операций листинга ввели термин CommonPrefixes, который и есть папки. Также для любого вида запроса поддерживается пагинация, когда результат не влезает в один ответ (1000 элементов по умолчанию), в ответе приходит NextMarker. И ещё можно указать Prefix для названия объекта, это может быть как префикс-папка, так и префикс начала названия объекта. Результат листинга всегда сортирован лексикографически.

Между V1- и V2-листингами различия незначительные, основное изменение в том, как работает пагинация. В V1 Marker для пагинации представляет собой ключ объекта, после которого нужно возвращать результат. В V2 же вместо него сделали ContinuationToken, который является обфусцированной строкой, а не ключом объекта. Для метода ListObjectVersions в AWS решили пойти ещё дальше и там Marker работает по принципу «возвращать объекты, начиная с этого объекта», а не как в V1 — «после этого объекта». Мы в своей реализации решили, что нам нет смысла делать 3 разные логики маркеров пагинации и остановились на классическом варианте из V1-листинга. К слову, ребята из alibaba cloud в своей реализации S3 тоже решили так сделать, судя по документации на их сайте.

Исходя из требований к листингу, а именно: доступ к объектам по папкам, лексикографическая сортировка всех ключей объектов, задача выглядит как реализация дерева префиксов, так мы и поступили. У нас получилась следующая табличка:

CREATE TABLE prefixes {
    bucket text,
    parent_prefix text,
    prefix text,
    is_folder boolean,
    is_delete_marker boolean,

    PRIMARY KEY ((bucket, parent_prefix), prefix)
}

Благодаря тому, что в ScyllaDB все строки внутри партиций сортированы, реализовать дерево получилось очень удобно. Партиционирование выбрали по бакету + папке, а внутри партиции находятся все объекты из этой папки. Нужен листинг с разделителем по конкретной папке — select по (bucket, parent_prefix). Если нужна пагинация, то есть использование маркера, то, поскольку партиция сортированная, условие prefix > marker работает оптимально, как по индексу. Для рекурсивного листинга, например, бакета целиком, будет обычный обход дерева вглубь, начиная с (bucket, /), затем в порядке сортировки, если полученный префикс является папкой, то идём на уровень ниже и так далее, пока не наберётся MaxKeys на запрос. Если нужен неверсионированный листинг, то добавляется условие is_delete_marker == false.

При создании объекта добавляется запись в таблицу prefixes, то есть эта таблица содержит все объекты из s3_objects в сильно упрощённом виде и все «папки». Чтобы были гарантии консистентности, мы используем logged batch. С префиксами-папками чуть интереснее, мы можем их писать в таблицу всегда, без батчей и параллельно, не проверяя их наличия. И не бояться, что они останутся после удаления, поскольку можно переложить удаление пустых папок на время работы листинга — это как раз единственное место, где мы их используем. И в этот момент у нас есть информация о том, является ли папка пустой или нет, тогда можем и удалить, нет смысла делать это при удалении объекта.

Фоновые операции

У нас в сервисе есть две основные фоновые операции — Lifecycle (LC), который применяет правила LC для бакетов, и garbage collector (GC), который физически удаляет блоки данных из blob storage.

Для этих операций активно используется фулскан таблицы. В ScyllaDB все данные хранятся в кольцевой архитектуре. Если по-простому, кольцо — это все возможные значения signed int64, а каждой записи в таблице присваивается место в этом кольце, partition, на основе хэша от partition key-строки. В такой архитектуре фулскан таблицы выглядит довольно эффективно и просто. Мы разбиваем всё кольцо на какое-то количество подотрезков (token range) и делаем paging-запрос по каждому из них, а для продолжения фулскана при ретрае операции можно начинать с последнего обработанного token range. Каждый из таких подотрезков можно обрабатывать параллельно.

В таком случае LC-операция выглядит как регулярный фулскан таблицы объектов и применение к ним правил по дате создания, размеру, тегам, количеству версий и так далее. Есть один нюанс: мы сканируем таблицу всех версий объектов, а многие expiration-правила применяются именно к последней версии объекта. Чтобы понять, какая версия последняя, надо получить все версии и взять самую новую из них. Но при сканировании token range у нас нет гарантий, что все версии объекта попадут в один paging у запроса на чтение. Мы решили, что если дробить весь token ring на достаточное количество кусков, то для того чтобы буферизировать в памяти один из них, не понадобится много памяти, не больше нескольких MiB. Как итог, обработка 1 млрд объектов занимает меньше часа при всего лишь 30 воркерах.

Для garbage collector blob-объектов в хранилище мы реализовали простой mark and sweep-алгоритм. Нам хотелось избежать использования блокировок перед удалением и ручного ведения счётчика ссылок на объекты, так как в распределённой системе это, скорее, антипаттерн и ведёт к повышению вероятности ошибок, поэтому мы реализовали GC через статусную модель, grace period нахождения на статусе и индекс «какие чанки ссылаются на объект».

Получился приблизительно следующий алгоритм:

  1. Mark phase, сканируем таблицу rados_objects

    1. проверяем, что с момента проставления статуса прошло X времени. Это позволяет не сканировать свежесозданные объекты и даёт время на то, чтобы объекты были переиспользованы, например, в случае ретраев от клиента или пока клиент долго пишет огромный файл;

    2. по индексу ищем, ссылается ли кто-то на этот объект;

    3. если никто не ссылается, проставляем объекту статус to_delete. В этом статусе rados-объекты не используются для дедупликации при записи в S3. 

  2. Delete phase, сканируем индекс для rados_objects по статусу to_delete

    1. проверяем, что с момента проставления статуса прошло X времени. Это нужно, чтобы исключить возможность гонки данных, когда параллельно кто-то после создания S3-объекта начинает использовать этот радос-объект;

    2. по индексу ищем, ссылается ли кто-то на этот объект. Если да, возвращаем статус OK;

    3. если никто не ссылается, сначала удаляем объект из blob storage, а потом из индекса. Важно именно в таком порядке, чтобы предотвратить утечку данных, так как у нас нет транзакционности между индексом и blob storage. При ошибке удаления из blob storage операция заретраится на следующей итерации работы GC.

Конечно, для этого алгоритма есть возможности оптимизаций. Например, мы можем не сканировать всю таблицу так часто, а помечать объекты к удалению в момент удаления ссылки на объект и асинхронно, чтобы не влиять на latency пользовательского трафика. 

Другая оптимизация — сканирование всей таблицы с указанием consistency level ONE, то есть читая только с одной ноды, а не кворумом, как обычно. В таком случае есть вероятность прочитать удалённую строку либо, наоборот, не прочитать записанную ранее строку, если есть какой-то рассинхрон в кластере. А саму проверку для маркировки объекта делать как обычно, читая кворумом. Мы так можем сделать, поскольку алгоритм толерантен к временному рассинхрону каких-то данных на нодах — если мы получим строку, которая на самом деле уже была удалена, то мы просто не найдём ссылок на этот объект и попытаемся ещё раз его удалить, а удаление идемпотентно. А если мы не прочитаем какую-то существующую строку, ничего страшного, eventually на следующей итерации работы GC она найдётся и обработается.

Планирование переезда

На наших масштабах переезд на другое решение — процесс непростой, тем более переезд на решение абсолютно новое и никем в продакшне не протестированное, поэтому мы подошли к проблеме комплексно.

Ещё до переезда возникает вопрос, как мы можем минимизировать проблемы от перехода на новую реализацию S3. К счастью, поскольку S3-реализаций уже много, существуют открытые S3 compatibility test suites. Мы остановились на использовании minio/mint, сделали свой форк и дописываем туда новые сценарии для других клиентов, которые активнее используются у нас.

Тесты — это хорошо, но хочется проверять на более реалистичной нагрузке. Нам повезло, в нашей S3-инфраструктуре в Ozon уже был важный центральный компонент — s3-proxy. Он отвечает за проксирование пользовательского трафика до нужного кластера, а также пишет метрики в разрезе S3-операций (в RGW метрик очень мало), сохраняет аудит-лог всех запросов. Мы реализовали в s3-proxy-зеркалирование запросов на произвольные эндпоинты, это помогло безопасно обнаружить проблемы и различия в реализациях S3 ещё до переключения реального трафика. На зеркалировании staging-окружения мы нашли несколько несовместимостей, как оказалось, мы у себя реализовали более строгую валидацию в некоторых методах. Например, на вызов DeleteObjects с пустым списком объектов мы возвращали InvalidRequest, а RGW возвращал 204, на чём была завязана логика нескольких сервисов. Помимо каких-то ошибок, также получилось выявить узкие места по производительности, которые не удалось найти на синтетических стресс-тестах.

После того как наш сервис вышел из стадии MVP, был реализован весь core-функционал, пора было начинать переходить на него. Основным требованием было сделать переезд прозрачным для пользователя, без даунтайма и с возможностью откатиться обратно в любой момент. Для этого мы придумали механизм primary- и fallback-маршрутов для бакетов у s3-proxy.

Этот механизм работает следующим образом. Когда для бакета указано несколько кластеров, все операции создания объектов направляются только в primary-кластер. А операции чтения, такие как Get/Head object и получение метаинформации идут сначала в primary, если primary отдаёт 404, то мы идём в fallback-кластер и возвращаем ответ из него. У этой схемы есть несколько корнер-кейсов, например, операции удаления должны происходить во всех кластерах, но в обратном порядке, начиная с fallback — чтобы в случае, если fallback-кластер вернёт ошибку, мы спроксируем ошибку пользователю и объект останется в primary-кластере и последующее чтение не вернёт ему 404, пока пользователь не заретраит удаление. Для других модифицирующих операций с метаданными объекта или бакета, такими как PutObjectTagging или DeleteBucketPolicy, похожая логика — мы должны применять их для всех кластеров. А для операций ListObjects нужно отправлять запрос на все кластеры, а потом склеивать ответ из них, согласно спецификации S3. То есть, по сути, реализовать метод на уровне s3-proxy.

Получается, схема переезда выглядит следующим образом:

  1. выставляем новый кластер как primary для бакета, а текущий кластер оставляем как fallback. То есть запись идёт в новый кластер, а чтение старых объектов по-прежнему из старого. Наблюдаем какое-то время, убеждаемся, что всё ок. Хотя, конечно, мы это проверяли массово ещё на стадии зеркалирования трафика, но всякое может случиться;

  2. запускаем фоновую миграцию данных из fallback-кластера в primary;

  3. когда все данные успешно смигрировались, убеждаемся, что походов в fallback-кластер не осталось и отключаем его из маршрутов для бакета.

При такой схеме, если что-то пойдёт не так, можно в любой момент поменять primary- и fallback-кластеры местами и запустить обратную миграцию новых данных.

Раньше при необходимости смигрировать бакет из одного кластера в другой мы использовали Rclone. Rclone — консольная утилита для копирования данных из одного источника в другой со всеми вытекающими из этого недостатками:

  • нет observability, есть только текстовые логи. Нельзя централизованно посмотреть статус миграции и какой процент данных уже переехал;

  • нет готовой автоматизации для запуска, нужно писать скрипты, которые сначала копируют бакет, а потом проверяют, что данные действительно доехали;

  • сложно интегрировать в нашу S3-инфраструктуру, например, чтобы из скриптов переключать fallback-маршруты;

  • для S3 работает не очень эффективно — Rclone сначала полностью листит бакет в оперативную память (и хранит в виде xml-структур, из-за чего потребляет много памяти) и только потом начинает переносить объекты, хотя можно было бы это делать параллельно. Для больших бакетов на десятки миллионов объектов это сильно затягивается, вспомним ещё, что для RGW листинг больших бакетов — дорогая операция. На этапе сверки Rclone опять листит бакет, но теперь ещё и в целевом кластере;

  • нет возможности поставить миграцию на паузу и потом продолжить с того же места. Аналогично нельзя изменить количество воркеров у запущенной миграции, можно только остановить и начать сначала, и Rclone будет листить весь бакет заново;

  • столкнулись с багами, например, Rclone не умеет копировать объекты с ключами с двумя слэшами вида some//dir/object, миграция такого объекта просто завершается с ошибкой.

Для переезда тысяч бакетов и миллиардов объектов нам хотелось больше автоматики, поэтому мы написали свой сервис — s3-migrator. Он управляет всем циклом жизни миграции: от создания бакета в destination-кластере до завершения миграции и сохранения в S3 json лога сверки процесса миграции.

Миграция с помощью него состоит из таких этапов:

  1. запрос от пользователя на подготовку к миграции: создание пользователя, бакета в destination-кластере, копирование метаданных бакета (Lifecycle, Policy и другое), переключение primary-маршрута на новый кластер;

  2. запуск миграции. В фоне начинается листинг бакета с сохранением названий объектов в БД и параллельно идёт этап копирования объектов;

  3. после завершения копирования объектов идёт сверка — проверка, что объект находится и в source- и в destination-кластерах и имеет одинаковые метаданные;

  4. если есть какие-то расхождения, запускается этап домиграции для этих объектов;

  5. завершение миграции, отключение fallback-маршрута, сохранение лога миграции в S3.

Архитектурно это обычный микросервис, вся информация о миграциях и объектах хранится в PostgreSQL. У миграций и у объектов по ним используется статусная модель — на каждом этапе миграции объект переходит из одного статуса в другой. Поскольку объектов в бакетах много, десятки миллионов на бакет, записывать и удалять их в больших объёмах выходит неэффективно. Тут нам помогло партиционирование таблички по ID миграции — после завершения миграции можно очень просто и эффективно дропнуть всю партицию одним запросом. И индексы по статусам объектов в рамках партиции не так сильно распухают при переходах по статусам. Процесс миграций масштабируется по подам через распределённое выполнение задач. Например, для одной миграции может быть две задачи — листинг и миграция/сверка, по подам они распределятся равномерно случайным образом.

Ещё одна фича, которую мы добавили, — мигрирование даты создания объекта. По S3-спецификации, конечно, при создании объекта нельзя указать дату создания — используется время обработки запроса. Но если при миграции данных между кластерами будет обновляться дата, то сломаются правила Lifecycle для бакетов, объекты не удалятся, когда было запланировано, и пользователям придётся либо экстренно руками удалять данные, либо расширять квоты, чтобы хранить данные дольше. А для некоторых бакетов Lifecycle может стоять и на год. Также некоторые клиенты могут использовать дату создания объектов для внутренней бизнес-логики. Для пользователей это всё может быть критично, поэтому мы расширили протокол S3 в своей реализации ещё одним заголовком с датой создания объекта и интегрировали это с s3-migrator, теперь при переносе данных между кластерами сохраняются все метаданные объектов, включая дату создания.

После того как мы разобрались, как осуществлять переезд бесшовно и красиво, остаётся последний вопрос — кого перевозить?

Разработка такого масштаба — дело небыстрое, а нам хотелось идти итеративно, не дожидаясь реализации всех фич. Есть много бакетов с очень простыми паттернами использования S3 — вызовы Put + Get object и всё. А есть бакеты, в которых используют с десяток разных методов, пишут многогигабайтные объекты, выставляют правила Policy, Lifecycle с ретеншном по времени жизни объектов, а ещё и CORS настраивают для доступа к бакету через CDN. Поначалу мы проверяли паттерны использования бакетов вручную для первых тестовых запусков, но довольно быстро пришли к тому, что и для этого нужна автоматизация. В итоге мы написали свою CLI с командой check compatibility, которая запрашивает различную статистику по бакету из нашего аудит-лога всех операций в Clickhouse и проверяет, подходит ли бакет для переезда. Помимо базовых вещей, вроде подсчёта RPS по методам и проверки, реализован ли этот метод в новом решении, мы добавили и более сложные проверки. Например, все ли правила Lifecycle по этому бакету мы поддерживаем или поддерживаем ли мы некоторые сложные заголовки у методов, например, canned-acl или tagging для методов создания объектов. Это оказалось очень полезно, потому что даже на staging-окружении мы не можем себе позволить ломать пользователям бакеты — из-за этого может встать тестирование сервисов, а вместе с этим и релизы, а это может и к инциденту привести. А некоторые staging-бакеты для нас как прод — там хранятся бэкапы и метрики Thanos со staging-окружения.

Проблемы во время запуска

Основная проблема, с которой мы столкнулись, — большие партишны в таблицах ScyllaDB. Это приводит к тому, что шарды с большими партишнами оказываются более нагруженными, например, для некоторых операций приходится выгружать весь партишн, что приводит к скачкам латенси для всех запросов в этот шард. Также это влияет на фоновые операции compaction, что может приводить к деградациям. По умолчанию в ScyllaDB стоит warning на превышение количества строк в партишне 100000 и на превышение размера в 1000 MiB. Но на практике рекомендуют делать партиции ещё меньше — на 1-2 порядка. В целом для архитектуры сциллы считается, что чем меньше размер партиции, тем лучше. Тогда данные более равномерно распределяются по нодам, нагрузка распределяется равномернее и меньше шансов получить hot shard, из-за которого появляются спайки латенси. При моделировании структуры БД мы учитывали этот фактор, поэтому, например, наши основные таблицы имеют партиционирование по (bucket, object_key), что даёт нам хорошее распределение, но не всё удалось учесть.

Дедупликация данных работала так хорошо, что в индексе ссылок чанков данных на блобы в хранилище образовывалось до миллиона строк. Последний случай был, когда на staging-окружении кто-то загрузил одинаковый тестовый файл 25 миллионов раз. Решение было простое — решардинг данных, а именно физическое дублирование данных в хранилище и перезапись ссылок на них с целью, чтобы на каждый объект ссылались не более 100000 раз.

Другой случай оказался сложнее: у нас для операций листинга объектов использовалась структура таблицы, где партиционирование было по «папке», чтобы иметь сортировку в ответе, как этого требует спецификация S3. Как выяснилось, у нас есть несколько сервисов, которые решили хранить все объекты в корне бакета, все десятки миллионов объектов. Такие случаи сильно влияли на общую производительность кластера, поэтому, прежде чем перевозить их в новое решение, мы сделали механизм шардирования таких бакетов. Было предложено два варианта реализации. Первый — это range sharding, то есть присваивать каждому шарду свой диапазон названий объектов, например, для шарда №1 все объекты, начинающиеся на A-C, для №2 — начинающиеся на D-F и так далее. Это позволило бы сохранить сортировку внутри шарда и, для того чтобы получить значения при листинге, нужно было бы запросить только несколько шардов, а не все. Но у этого подхода есть и проблемы — может быть плохое распределение по шардам при определённых данных, когда нет какого-то фиксированного паттерна наименования объектов. От этого варианта было принято отказаться, потому что он подразумевает частое решардирование и от этого имеет более сложную реализацию, к тому же не для всех случаев подходит. Поэтому мы остановились на классическом варианте алгоритмического шардирования через hash(value) % shards. Такой подход даёт равномерное распределение по шардам и предсказуемую нагрузку.

Ещё одна проблема с листингом объектов была полностью противоположна предыдущей — слишком ветвистый бакет. Некоторые сервисы хорошо подумали про шардирование, что у них оказалась такая структура объектов в бакете, что на каждую папку приходилось всего по 1-3 объекта. Для операции листинга без разделителя, так называемого рекурсивного листинга, в результате нужно вернуть лексикографически отсортированные ключи объектов независимо от их папки. То есть, если объектов в папках мало, нам нужно рекурсивно обойти большое количество папок. Худший случай был с одним бакетом с большой вложенностью, его листинг занимал по 10-15 секунд на запрос и приходилось проходить через тысячи папок. Мы придумали эвристику, которая находит такие случаи и распараллеливает обход дерева вширь. С этой оптимизацией удалось ускорить работу листинга для ветвистых бакетов в 5-30 раз!

Результаты

Нам удалось сделать решение, которое позволяет полностью отказаться от использования Ceph RGW. Наш новый s3-server не имеет таких архитектурных недостатков и позволяет горизонтально масштабироваться с ростом потребностей бизнеса. Мы успешно прошли учения по отключению одного дата-центра DC-1 и перевезли 80% продакшн-данных.

Основным требованием к производительности было «сделать не хуже» и исключить нестабильность работы, как было в RGW при листинге и удалении объектов в больших бакетах, что у нас и получилось. Тайминги по некоторым операциям с индексом стали лучше за счёт оптимальной реализации хранения индекса на более производительной ScyllaDB. Следующим шагом будет оптимизация работы с дисками, то есть оптимизация взаимодействия с Rados, либо реализация своего blob storage, но об этом уже в другой раз.

Благодаря дедупликации мы получили большую экономию данных — в среднем около 13% по всем бакетам. Но бывает и больше, например, по одному из бэкапов аналитической базы данных на 1000 TiB вышла дедупликация в 37%, то есть около 370 TiB. Оптимальный подход к фоновым процессам (Garbage collector и Lifecycle) позволяет обрабатывать данные чаще, чем раз в сутки, как было с RGW, что позволяет эффективно удалять объекты и физические данные, опять же уменьшая тайминги основных операций.

Мы продолжаем развивать своё хранилище, и в планах много интересных оптимизаций и новых фич, спасибо за внимание.

P.S. Мы выбрали для нашего решения гордое имя Lusca (читается как «Ласка»). Ceph — от слова цефалопод, то есть осьминог. А Lusca — карибский демон полуосьминог и полуакула. Ну, то есть быстрый и хваткий, как акула, но надёжный и хорошо масштабирующийся, как осьминог. Это же прямо про нас :)

Автор: Максим Мержанов

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js