У нас было 2 кластера Clickhouse, 1 кластер GreenPlum’a, 1 кластер Kubernetes’a, DataVault 2.0, гора dbt витрин и всего такого, а еще Dagster. Не то чтобы это все было нужно в архитектуре, но раз начал коллекционировать сервисы, то иди в своем увлечении до конца. Единственное, что нас беспокоило - это стоимость.
Решение о полной переделке архитектуры пришло внезапно, как галлюцинация в ночной пустыне - мы поняли, что больше нельзя ждать. Теперь наши данные обрабатываются быстрее, чем мысли в голове на ЛСД, и мы можем персонализировать клиентский опыт так, что он становится почти реальным.
Всем привет! На связи Артем, и если вы готовы погрузиться в мир цифрового безумия и увидеть, как мы превратили хаос в порядок, то держитесь крепче и читайте дальше.
Когда-то мы уже писали о том, как построить DataVault на Greenplum, со статьей можно ознакомиться здесь. Кратко расскажем о причинах, подтолкнувших нас пересмотреть подход - это оптимизация расходов и отсутствие экспертов в администрировании GreenPlum.
В первом случае нам бы хотелось платить только за время фактического использования ресурсов, а во втором, несмотря на то, что наш кластер был управляемым, мы регулярно сталкивались с проблемами поддержки и масштабирования системы, что также приводило к непредсказуемым затратам.
Выбор нового решения
При выборе нового решения мы учитывали несколько ограничений:
-
Dagster: для управления задачами
-
Kubernetes: для оркестрации контейнеров
-
Yandex Cloud: как основная облачная платформа
Основные компоненты нового решения:
-
Yandex Object Storage: для хранения
-
Delta Table: для управления
-
Apache Spark: для обработки
Почему именно эти технологии?
Переход с архитектуры Data Vault 2.0 на архитектуру Delta Lake (S3 + Delta Table) может предложить ряд существенных преимуществ:
-
Транзакционная целостность и ACID-свойства - Delta Lake обеспечивает надежность и целостность данных при параллельных операциях
-
Производительность и оптимизация запросов - структура Data Vault 2.0 может приводить к сложным и менее оптимизированным запросам, требующим значительных усилий для оптимизации
-
Гибкость и масштабируемость - Data Vault 2.0 хорошо подходит для интеграции из различных источников, но может потребовать значительных ресурсов для масштабирования под высокие нагрузки, тогда как Delta Lake спроектирован для работы с большими объемами данных в облачной среде и легко масштабируется в зависимости от нагрузки без серьезных затрат
-
Упрощенное управление схемой данных - delta table поддерживают автоматическую эволюцию схемы и принудительное соблюдение схемы, что упрощает внесение изменений в структуру данных и управление ими
-
Экосистема и интеграция с другими инструментами - delta table легко интегрируется с apache spark, что упрощает разработку и эксплуатацию аналитических приложений
Apache Spark
Одним из преимуществ Apache Spark является возможность его локального развертывания, что особенно удобно для разработки и тестирования. На текущий момент обработка наших данных происходит именно таким образом.
Как это работает?
В самом простом виде нашу архитектуру можно представить следующим образом:
Данные хранятся в Yandex Object Storage в формате Delta Table. Это позволяет нам воспользоваться преимуществами версионирования данных и транзакционной целостности. Помимо этого, мы используем S3 для хранения артефактов ML моделей.
В работе с форматом Delta Table правильное распределение данных по партициям играет ключевую роль. Учитывая разнообразие наших источников данных, мы внедрили партиционирование не только по дате, но и по источнику. Это значительно повышает скорость и эффективность чтения данных. При грамотном выборе партиции Spark применяет partition pruning, избегая необходимости читать все файлы в таблице, что оптимизирует производительность системы.
Для нашего MVP мы выбрали использование кластера Spark на одном узле, настроив SparkSession следующим образом:
SparkSession.builder.appName().master(f"local[*]")
Полная конфигурация Spark’a для работы с S3 может выглядеть следующим образом:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession
packages = [
"io.delta:delta-core_2.12:2.0.0",
"org.apache.hadoop:hadoop-aws:3.3.1",
]
builder = (SparkSession.builder.appName().master(f"local[*]")
.config("spark.driver.memory", "16g")
.config("spark.hadoop.fs.s3a.bucket.<bucket_name>.access.key", "<backet_access_key>")
.config("spark.hadoop.fs.s3a.bucket.<bucket_name>.secret.key", "<backet_secret_key>")
.config("spark.hadoop.fs.s3a.bucket.<bucket_name>.endpoint", "<backet_endpoint>")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("spark.hadoop.fs.s3a.fast.upload", "true")
.config("spark.sql.ui.explainMode", "extended")
.config("spark.sql.parquet.datetimeRebaseModeInWrite", "CORRECTED")
.config("spark.sql.parquet.int96RebaseModeInWrite", "CORRECTED")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.config("spark.driver.extraJavaOptions", _spark_with_newer_jvm_compatibility_options)
)
spark = builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
Такой подход позволяет запускать Spark локально на той же машине, где создается сессия. Благодаря этому, мы можем запускать Spark непосредственно в подах Dagster и удобно отлаживать запросы локально.
В работе с Dagster мы используем ассеты и менеджеры. Вся бизнес-логика размещается внутри ассета, а управление вводом-выводом передается менеджеру. Таким образом, нам было достаточно написать Spark менеджер и адаптировать запросы в коде для работы со Spark.
Для экономии ресурсов во время простоя мы внедрили автоскейл группы прерываемых узлов в Managed Service for Kubernetes от Яндекс Облака. Развернули с помощью Terraform, используя готовый модуль. Мы настроили группы так, чтобы они могли масштабироваться до нуля узлов при отсутствии нагрузки. Для подов Dagster у нас выделена отдельная группа узлов, где всегда работает хотя бы один узел. Таким образом, система практически не потребляет ресурсы в периоды простоя.
При необходимости также можно запросить чуть больше ресурсов для конкретных задач, настроив конфигурацию для каждого задания Dagster:
@job(
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
},
},
},
)
def my_job():
my_op()
Либо, при использовании ассетов:
my_job = define_asset_job(
name=’my_job’,
selection=AssetSelection.assets(my_asset),
tags={
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
},
},
},
)
Для записи данных в Clickhouse мы используем библиотеку clickhouse-connect, так как наш исходный код уже был написан с ее помощью. Однако стоит отметить, что для Spark есть драйвер для работы с Clickhouse, позволяя использовать Spark SQL для выполнения запросов и обработки данных в Clickhouse.
Важно отметить, что для контроля расходов на использование хранилища необходимо регулярно обслуживать все таблицы с помощью команд VACUUM и OPTIMIZE. Команда VACUUM удаляет старые, неиспользуемые данные и освобождает пространство, что помогает снизить затраты на хранение. Команда OPTIMIZE, в свою очередь, не только уменьшает размер таблиц, но и значительно улучшает производительность запросов, ускоряя чтение данных.
Важное замечание: Spark + Delta Table по умолчанию поддерживают механизм оптимистичных блокировок для write-запросов, выполняемых в одном Spark кластере, но не для write-запросов из разных Spark кластеров. Чтобы включить такую поддержку, необходимо добавить в конфигурацию DynamoDB. В Яндекс Облаке YDB реализует интерфейс DynamoDB, но его интеграция оказалась для нас сложной задачей. Поэтому мы решили использовать механизм пессимистичных блокировок, встроенный в Dagster, через функцию tag_concurrency_limits. Все джобы, записывающие данные в таблицу, мы помечаем тегом с названием этой таблицы. Dagster, в свою очередь, предотвращает одновременное выполнение таких джоб, что обеспечивает корректное управление блокировками и исключает конфликтные write-запросы.
Таким образом, конфигурация вашего dagsterDaemon будет выглядеть как-то так:
dagsterDaemon:
enabled: true
image:
repository: "docker.io/dagster/dagster-celery-k8s"
tag: ~
pullPolicy: Always
heartbeatTolerance: 300
runCoordinator:
enabled: true
type: QueuedRunCoordinator
config:
queuedRunCoordinator:
maxConcurrentRuns: 10
tagConcurrencyLimits:
- key: "single-thread"
value:
applyLimitPerUniqueValue: true
limit: 1
После чего в настройки вашего задания можно добавить:
my_job = define_asset_job(
name=’my_job’,
selection=AssetSelection.assets(my_asset),
tags={
“single-thread”: “my_job”,
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "250m", "memory": "64Mi"},
"limits": {"cpu": "500m", "memory": "2560Mi"},
},
},
},
)
Преимущества нового подхода:
-
Экономия расходов
Мы платим только за фактическое время работы, что позволяет эффективно управлять бюджетом и уменьшить издержки.
-
Гибкость и масштабируемость
Мы легко адаптируемся под изменяющиеся нагрузки, обеспечивая высокую производительность и надежность
-
Поддержка существующих процессов
Мы по-прежнему поставляем готовые витрины в Clickhouse, что позволяет сохранить существующие процессы для аналитиков и избежать значительных изменений.
Заключение
Переход на новую архитектуру оказался успешным и принес значительную экономию расходов, улучшение гибкости и масштабируемости. Наш опыт показывает, что такие изменения могут быть менее сложными, чем ожидается, и приносить значительные выгоды.
А в данный момент мы подготавливаем инфраструктуру для внедрения горизонтально масштабируемого Spark’a и поддержки Hive Metastore, о чем постараемся рассказать в следующей статье.
Ну вот и все!) Благодарю за внимание и буду рад услышать ваше мнение) Поделитесь своим опытом, задавайте вопросы и расскажите, как вы справляетесь с подобными задачами)
Автор: Art_DV