Как небольшой команде переехать на ClickHouse: на какие грабли мы наступили и о каких фишках не знали

в 5:57, , рубрики: clickhouse, data engineering, open source, sql, базы данных

Привет!

Меня зовут Петр. Я работаю инженером по данным в Okko и обожаю ClickHouse. 

Примерно в середине прошлого года мы начали увлекательный процесс переезда хранилища с PostgreSQL (плюс частично HDFS) на ClickHouse. Причин для переезда было несколько, но одной из главных была низкая производительность — среднее время аналитического запроса составляло около минуты. Понятно, что запросы бывают не оптимальные. Но сейчас, после переезда, среднее время запроса в аналитическом кластере составляет около 2 с. И это не предел.

В этой статье я расскажу, как мы пришли к текущему состоянию нашего хранилища данных, какие ошибки успели совершить, какие шишки набить, и о каких фишках кликхауса мы бы предпочли узнать заранее.

Статья будет полезна тем, кто только начинает свой путь работы с кликхаусом: мы посмотрим и как делать не надо, и как можно сделать лучше.

В этой статье не будет объяснений, почему для переезда мы выбрали именно этот инструмент. Также не будет и глубокой теории о его внутреннем устройстве (хоть я постарался включить минимально необходимый для понимания статьи набор теории). Отметим лишь, что в правильных руках ClickHouse — одна из самых быстрых колоночных СУБД для OLAP запросов.

Поехали.

Как всё начиналось

Итак, год назад у нас было следующее:

  • свежеразвернутый кластер ClickHouse

  • 2-3 инженера по данным

  • около 30 аналитиков

  • ряд витрин, которые необходимо перенести с PostgreSQL

  • бэклог из около 50 витрин, которые надо написать с нуля (спойлер: учитывая число аналитиков, бэклог наполнялся быстрее, чем мы пилили витрины, поэтому нас сейчас чуть больше 🙂)

Нас было мало, а знаний о кликхаусе еще меньше. Поэтому мы писали витрины как умели, рассматривая ClickHouse как обычную колоночную СУБД. Наверное, это было одной из первых ошибок: стоило уделить больше времени подготовке, разобраться в новом инструменте и понять его особенности. Но витрины нужны были уже вчера, поэтому мы кинулись в бой. 

Грабли, на которые мы наступили

Далее приведу примеры решений, о которых мы впоследствии слегка пожалели.

Неконтролируемое использование ReplacingMergeTree движков

В кликхаусе много разных движков (engine) таблиц, выбор которых сильно зависит от целей использования. Самое популярное семейство движков — это -MergeTree движки.

Расскажу про них немного. Это семейство движков гарантирует как быструю запись, так и очень быстрый поиск по ключу. При записи в таблицу данные сортируются и записываются на диск т.н. кусками (parts). Далее, в фоне, эти куски сливаются (merge), отсюда и название движка — MergeTree. Заметим, что по дефолту слияния происходят неконтролируемо для пользователя. ClickHouse сам определяет, когда выполнить очередной мерж данных. Но если очень хочется, то можно и запускать насильный мерж при помощи команды optimize table xxx. Расскажу об этом чуть позже.

Под семейством движков -MergeTree подразумевается, что на их основе реализован ряд некоторых других движков. Помимо просто MergeTree, существуют также CollapsingMergeTree, AggregatingMergeTree, ReplacingMergeTree и другие.

Прочитав впервые про эти движки, мы были в полной уверенности, что ReplacingMergeTree — это движок почти на все случаи жизни. Подробнее прочитать про использование этого движка можно, например, в официальной доке. Основная идея в том, что это самый простой аналог апдейтов в классических СУБД. В ReplacingMergeTree таблицах старые записи заменяются новыми на основании выбранного ключа (происходит дедупликация строк по ключу):

-- создаем таблицу с движком ReplacingMergeTree
create table example_rmt (
   id UInt8,
   value UInt8
)
engine = ReplacingMergeTree()
order by id; -- именно по этому полю будет происходить дедупликация


-- инсертим 3 строки
insert into example_rmt
values (1,2), (2,3), (3,4);


-- решаем, что value для id = 1 хотим поменять с 2 на 42. 
-- движок ReplacingMergeTree нам позволяет это сделать просто еще одним инсертом
insert into example_rmt
values (1,42);


-- смотрим на текущий результат
select * from example_rmt;

   ┌─id─┬─value─┐
   │  1 │     2 │
   │  2 │     3 │
   │  3 │     4 │
   │  1 │    42 │
   └────┴───────┘


-- а теперь выполняем запрос с модификатором final, который мержит данные перед выдачей. 
-- т.е. в случае с нашим движком, оставляет только самую последнюю запись с id = 1
select * from example_rmt final;

   ┌─id─┬─value─┐
   │  1 │    42 │
   │  2 │     3 │
   │  3 │     4 │
   └────┴───────┘


-- заметим, что мы можем это реализовать и через optimize final (насильный мерж)
optimize table example_rmt final;


-- и теперь уже нет необходимости применять final при селектах, потому что все куски смержены
select * from example_rmt;

   ┌─id─┬─value─┐
   │  1 │    42 │
   │  2 │     3 │
   │  3 │     4 │
   └────┴───────┘

Как видим, мы реализовали update для id = 1.

Вот примерно таким образом мы и планировали работать с ReplacingMergeTree и реализовывать апдейты.

Главная проблема нашей работы с ReplacingMergeTree заключалась в том, что мы всегда стремились хранить только дедуплицированные данные. Это был своего рода трейдоф: сделать один раз тяжелый optimize table xxx final вместо необходимости применять final при каждом запросе (что значительно повышает использование оперативки). Физически, при таком процессе на диске переписывается вся таблица, то есть старые парты удаляются, а новые записываются. Это терпимый подход, если в вашей таблице сотни тысяч или миллионы строк. Но если в таблице их миллиарды, optimize table xxx final вам забьет все ресурсы кластера.

Например, в какой-то момент у нас был ежедневный DAG Airflow, который делал optimize table xxx final каждой партиции таблицы. Если не знакомы с Airflow, то здесь и далее слово DAG (Directed Acyclic Graph) можно интерпретировать как крон джоба, запускающая один или несколько SQL запросов. DAG ежедневно работал в течение 8 часов при том, что в результате дедупликации переписывалось всего одно поле, а все остальные поля оставались неизменными. Занятно, что это поле использовалось далеко не в каждом запросе. Но 8 часов ждать приходилось в любом случае.

А число строк в таблице каждый день растет

А число строк в таблице каждый день растет

Причем страдает не только кластер, но и заказчики-аналитики.

Во-первых, им приходится объяснять, что такое этот ваш final, и почему к некоторым таблицам с ReplacingMergeTree движком его надо приписывать при селекте, а к другим не надо. Во-вторых, им еще и надо понимать, у каких таблиц какой движок (к MergeTree в принципе final не применим).

Несмотря на то, что у нас и сегодня еще достаточно много витрин с этим движком, мы научились с ним работать эффективнее. 

И как же можно было сделать лучше?

Во-первых, если вам ReplacingMergeTree нужен только для идемпотентности DAG'а, то в таком случае гораздо эффективнее будет дропать партицию (дешевая операция по удалению директорий на диске) перед инсертом новой. Подробнее про дроп партиций будет еще в следующем разделе.

Если нет возможности обойтись без ReplacingMergeTree, то рекомендую не делать насильный optimize table xxx final. А для упрощения жизни аналитикам всегда создавать view с final, чтобы им никогда не приходилось ломать голову, нужен ли final при запросах к таблице. 

Также будет идеально, если получится реализовать архитектуру, при которой дедупликация будет необходима только в рамках одной партиции. Тогда во view с final можно еще добавить настройку do_not_merge_across_partitions_select_final = 1. Так Clickhouse будет производить дедупликацию только внутри каждой партиции, а не всей таблицы.

Если чуть нагляднее:

-- создаем таблицу с движком ReplacingMergeTree
create table example_rmt (
   id UInt32,
   partition_col UInt16,
   value UInt64
)
engine = ReplacingMergeTree()
partition by partition_col
order by id;


-- инсертим 100 млн строк при помощи numbers(N)
insert into example_rmt
select number, number % 100, rand64() from numbers(100000000);


-- инсертим еще 100 млн строк с теми же значениями в id и partition_col, но другим value
insert into example_rmt
select number, number % 100, rand64() from numbers(100000000);


-- выполняем select с final - результат после дедупликации
select countIf(value > 0) from example_rmt final;

1 row in set. Elapsed: 9.611 sec. Processed 209.90 million rows, 2.52 GB (21.84 million rows/s., 262.07 MB/s.)
Peak memory usage: 3.22 GiB.

Нижние строки — результат из CLI клиента clickhouse-client, который показывает такие метрики как число просканированных строк и потребление RAM в пике. Итого, наш бенчмарк указан в строках выше. Посмотрим, что будет, если добавить настройку do_not_merge_across_partitions_select_final = 1:

-- выполняем тот же select с final, но дедупликация только внутри одной партиции
select countIf(value > 0) from example_rmt final
settings do_not_merge_across_partitions_select_final=1;

1 row in set. Elapsed: 6.225 sec. Processed 209.90 million rows, 2.52 GB (33.72 million rows/s., 404.05 MB/s.)
Peak memory usage: 3.20 GiB.

Заметим, что мы просканировали то же число строк, было затрачено столько же RAM в пике, но посчитали гораздо быстрее. Строго говоря, разница в 3 секунды может быть и случайной (вызванной другими факторами), но здесь было запущено достаточно запросов, чтобы разницу можно было считать статистически значимой.

Также отметим, что этот подход можно немного улучшить. Если у нас одинаковые ключи лежат в одной партиции (например, дневной), то мы можем делать оптимайз не всей таблицы, а последней партиции: optimize table xxx partition 'prev_day' final. И тогда все последующие запросы с final и настройкой do_not_merge_across_partitions_select_final будут требовать значительно меньше ресурсов.

И, наконец, выполним дедупликацию и тот же запрос без final:

-- делаем насильный optimize таблицы
optimize table example_rmt final;

-- выполняем select без final, тк данные уже дедуплицированы
select countIf(value > 0) from example_rmt;

1 row in set. Elapsed: 0.560 sec. Processed 100.00 million rows, 800.00 MB (178.42 million rows/s., 1.43 GB/s.)
Peak memory usage: 420.35 KiB.

Посчитали примерно в 10 раз быстрее (на оперативку тоже рекомендую обратить внимание). Согласитесь, после такого сложно отказаться от подхода с использованием optimize table xxx final в конце каждого DAG'а. А стоило бы в большинстве случаев.

Использование alter delete там, где можно дропать партицию

Все наши Airflow DAG мы стараемся делать идемпотентными. Самый простой способ этого добиться, как нам казалось, было использование конструкции alter table xxx delete where col = 'yyy'. Например, если сегодня инсертим данные за вчерашний день, то перед инсертом мы удаляем все данные за вчерашний день. Нужно это например для перезапуска DAG за предыдущий период.

Alter table delete — тяжелая операция, т.н. мутация (mutation), которая переписывает куски данных удовлетворяющих условию удаления. Причем старые куски удаляются не сразу, они становятся «неактивными». Заметим, что эта операция на самом деле переписывает всю таблицу: для измененных кусков записывает новые данные, а для неизменных создает хардлинки. Мы использовали эту операцию практически повсеместно, потому что привыкли так делать в других СУБД.

Тем не менее, более эффективно было бы дропать партицию, потому что эта операция по сути является просто удалением директорий с данными на диске. То есть правильное партицирование таблицы (в данном случае по дням) значительно бы снизило потребление ресурсов.

Заметим также, что у реплицированных таблиц есть еще одна неочевидная особенность, которая важна при использовании конструкции alter table delete. Подробнее про нее можно почитать здесь и здесь. Основная идея в том, что при инсерте в реплицированную таблицу кликхаус дедуплицирует вставляемые блоки данных. Это означает, что можно столкнуться со следующей ситуацией:

Как небольшой команде переехать на ClickHouse: на какие грабли мы наступили и о каких фишках не знали - 2
  1. Инсертим данные в таблицу (блоки 1,2 и 3 на левой картинке)

  2. Кликхаус падает на предыдущем пункте, залиты только блоки 1 и 2

  3. Airflow ретраит таску, в которой сначала идет успешный delete, а потом insert (правая картинка)

  4. Блоки 1 и 2, которые успели залиться на этапе 1, дедуплицируются, все остальные инсертятся

  5. Я, который не понимаю, почему тут так мало данных:

Должен был все блоки отдать

Должен был все блоки отдать

Это поведение лечится либо настройкой insert_deduplicate = 0, либо тем самым дропом партиции. Но не alter table delete! Потому что эта операция удаляет куски не сразу, она временно делает их неактивными. Поэтому когда мы выполняем инсерт блока данных, который ранее инсертился, ClickHouse его дедуплицирует (несмотря на то, что изначально инсертнутый блок данных уже как бы «удален» — подвергнут операции alter table delete).

Если не знать о подобном поведении, то обнаружить проблему в данных можно далеко не сразу. В нашем случае можно == так и произошло.

Также важно отметить, что такие сложности и особенности связаны с тем, что в кликхаусе нет транзакционности. Кликхаус жертвует ACID в пользу скорости работы с огромными объемами данных.

Не рассказали аналитикам о сортировке и партицировании

Помните, в пункте про ReplacingMergeTree я рассказывал про очень быстрый поиск при определенных условиях. Так вот в -MergeTree движках он реализуется за счет хранения данных, отсортированных по первичному ключу. Это позволяет создать файл с первичным индексом (primary index), в котором хранятся засечки — отсортированные значения первичного ключа. Причем этот файл разреженный. Разреженность регулируется параметром index_granularity, по дефолту 8192. В результате каждого селекта мы будем считывать число строк не меньшее, чем гранулярность индекса. Кайф в том, что первичный индекс хранится в оперативке и поиск по нему крайне быстрый (O(log N)). А за счет разреженности этот файл еще и достаточно маленький.

-- создаем таблицу с обычным MergeTree движком
create table example_mt (
   id UInt64,
   value UInt64
)
engine = MergeTree()
order by id
settings index_granularity = 8192; -- дефолтное значение гранулярности первичного индекса

Заметьте, при создании таблицы мы опустили секцию primary key. Потому что кликхаус по дефолту использует ключ сортировки в качестве первичного ключа. Также будем поступать и мы — здесь и далее при повествовании не будем различать ключ сортировки и первичный ключ.

-- инсертим 1 млрд строк
insert into example_mt
select number, rand64() from numbers(1000000000);


-- выполняем селект с использованием поля сортировки
select * from example_mt where id = 500000000;

   ┌────────id─┬────────────────value─┐
   │ 500000000 │ 11227737062124590418 │
   └───────────┴──────────────────────┘

1 row in set. Elapsed: 0.002 sec. Processed 8.19 thousand rows, 131.07 KB (3.80 million rows/s., 60.79 MB/s.)
Peak memory usage: 469.39 KiB.

Заметим, что мы прочитали лишь 8192 строки из 1 миллиарда! И потом уже среди них нашли нужное нам значение id. И все это благодаря файлу с первичным индексом.

В целом, наша адаптация к кликхаусу шла быстро, и в некоторых дефолтных штуках типа ключа сортировки и партицирования мы разобрались в течение нескольких недель.

Но мы почему-то забыли рассказать аналитикам, какие бывают особенности использования ключа сортировки в запросах (как договорились ранее, считаем что ключ сортировки совпадает с первичным ключом). В итоге пришли к тому, что в какой-то момент в логах мы начали замечать аналитические запросы с full scan таблицы пользовательских действий на сайте (сотни миллиардов строк). Причем в этих запросах при фильтрации используется первичный ключ! 

select formatReadableQuantity(count())-- число действий за сегодня
from events -- DDL: order by ts; ts в unix timestamp
where fromUnixTimestamp64Milli(ts) > today();

1 row in set. Elapsed: 42.188 sec. Processed 217.51 billion rows, 1.74 TB (5.16 billion rows/s., 41.25 GB/s.)
Peak memory usage: 6.69 MiB.

Обратите внимание, что у нас ts в Unix timestamp, а первичный индекс также по ts. Также важно, что мы при фильтрации применили fromUnixTimestamp64Milli к ts — думаю не надо комментировать, что делает эта функция 🙂

Но мы то рассказывали аналитикам что во всех запросах надо по максимуму использовать поля, которые могут ограничить выборку данных (например, фильтровать с использованием первичного ключа)!

Название функций в ClickHouse - отдельный вид искусства

Название функций в ClickHouse - отдельный вид искусства

Ошибка здесь в том, что к первичному ключу была применена не строго монотонная функция. Поэтому кликхаус не смог воспользоваться первичным индексом и пришлось делать full scan. Такое поведение в данном примере лечится суперпросто:

select formatReadableQuantity(count())
from events
where ts > toUnixTimestamp64Milli(toDateTime64(today(), 3));
-- не трогаем поле сортировки

1 row in set. Elapsed: 2.711 sec. Processed 76.60 million rows, 612.81 MB (28.25 million rows/s., 226.01 MB/s.)
Peak memory usage: 3.38 MiB.

Здесь уже первичный индекс был использован, поэтому мы просканировали примерно в 3000 меньше строк.

На самом деле, сортировка и первичный индекс в ClickHouse — основополагающая фича, лучше всего описанная в статье из официальной доки.

Не использовали словари

У нас есть несколько достаточно широких витрин, в которых помимо прочего присутствует уникальный id элемента библиотеки (например, серии), его название, дата премьеры и ряд других атрибутов. И если id не меняется никогда, то, например, соответствующее название без проблем может поменяться. Предположим, что название тайтла это SCD1. Поэтому, если название надо поменять, то столбец широкой витрины придется перегружать. Что является достаточно затратным по ресурсам процессом.

Вероятно, более правильным подходом было бы иметь таблицу только с айди элемента, а его атрибуты прокидывать «на лету». Например, при помощи view с джойном на таблицу-измерение с атрибутами — названием, датой публикации и тп. Но при дефолтном джойне (hash join) кликхаус для правой таблицы создает в оперативке хеш-таблицу, поэтому рекомендуется по возможности избегать большого количества джойнов. Особенно на тяжелые таблицы.

Более эффективно было бы сделать view, но вместо джойна использовать словарь. 

Словарь — это оптимизированная структура данных, которая преимущественно хранится в оперативке. Поэтому его использование в сравнении с джойном как правило эффективнее. Применяется он при помощи функции dictGet. Таким образом, мы могли бы загрузить таблицу со значениями атрибутов в словарь (в предположении, что эта таблица не очень большая и не займет всю оперативку). 

И тогда мы всегда «на лету» будем иметь эффективный способ обращения к широкой витрине с актуальными значениями из словаря. 

Более того, если мы создаем вьюху с джойном на таблицу-измерение, то таблицы при обращении к вьюхе всегда будут джойниться. Даже если мы не используем атрибуты из измерения. В случае с dictGet такого не происходит.

-- создаем таблицу с обычным движком MergeTree
create table example_mt (
   id UInt32,
   element_id UInt32
)
engine = MergeTree()
order by id;


-- инсертим 10 млн строк
insert into example_mt
select number, rand32() from numbers(10000000);


-- создаем таблицу, которая впоследствии станет словарем
create table example_dict_source (
   element_id UInt32,
   value UInt32
)
engine = MergeTree()
order by element_id;


-- наполняем словарь
insert into example_dict_source
select rand32(0), rand32(1) from numbers(10000000);


-- выполняем джойн и смотрим на ресурсы
select
   eds.value,
   count()
from example_mt em
inner join example_dict_source eds using(element_id)
group by eds.value
format Null; -- чтобы не выводилось в консоль ничего, нас только ресурсы интересуют

0 rows in set. Elapsed: 1.627 sec. Processed 20.00 million rows, 120.00 MB (12.29 million rows/s., 73.74 MB/s.)
Peak memory usage: 1.06 GiB.

А теперь создадим словарь и сравним утилизацию ресурсов:

-- создаем словарь
create dictionary d (
   element_id UInt32,
   value UInt32
)
primary key element_id
source (CLICKHOUSE(table 'example_dict_source'))
lifetime(min 0 max 1000)
layout(hashed());


-- выполняем аналогичный запрос, но с dictGet
select
   dictGet(d, 'value', element_id) val,
   count()
from example_mt
group by val
format Null;

0 rows in set. Elapsed: 0.329 sec. Processed 10.00 million rows, 40.00 MB (30.42 million rows/s., 121.68 MB/s.)
Peak memory usage: 2.16 MiB.

Код выглядит более лаконично, а сам запрос с точки зрения ресурсов — достаточно оптимально.

Чему мы научились в процессе (или какие топовые фишки есть в кликхаусе)

Помимо того, что мы активно набивали шишки, мы еще и опытным путем выясняли, как сделать лучше. И за время работы и изучения различных ресурсов нашли несколько полезных фичей, использование которых значительно облегчило жизнь. Здесь не будет подробных описаний работы каждого из инструментов, но вы точно сможете понять их преимущества и изучить подробнее при необходимости. Главное знать, что такое вообще бывает :)

Предагрегированные состояния

Представьте, что вы ежедневно хотите знать топ-10 самых просматриваемых фильмов за всю историю сервиса. Звучит не очень сложно, но если в таблице с сырыми данными уже накопилось более 5 млрд строк, то ежедневный запрос следующего вида может излишне нагрузить кластер:

select
   video_uid,
   uniq(user_id) users
from playback
where play_dt >= '2024-01-01' -- для упрощения ограничиваем выборку
group by video_uid
order by users desc
limit 10
format Null;

0 rows in set. Elapsed: 26.847 sec. Processed 1.56 billion rows, 35.94 GB (58.20 million rows/s., 1.34 GB/s.)
Peak memory usage: 13.95 GiB.

Разумеется, получать ответ на этот запрос нам бы хотелось как можно быстрее (например, чтобы показывать на дашборде). Поэтому для ускорения решения задачи хочется иметь некий агрегированный на каждый день набор данных. И в финальном запросе обращаться не к сырым данным, а к этому предагрегату. Задача еще и усложняется тем, что теоретически любой фильм сегодня может влететь в топ-10, поэтому в предагрегате должны храниться абсолютно все фильмы.

Проблема в том, что уники не аддитивны: если у вас до вчерашнего дня включительно определенный фильм смотрело 10 млн пользователей, а сегодня 1 млн, то скорее всего нельзя сказать, что итого у нас 11 млн уников.

ClickHouse предоставляет описанный функционал практически из коробки и реализуется в виде предагрегированных состояний агрегатных функций:

create table example_state (
   video_uid UUID,
   user_id AggregateFunction(uniq, Nullable(Int32))
)
engine = AggregatingMergeTree() -- можно и просто MergeTree
order by video_uid;


-- делаем тяжелый инсерт
insert into example_state
select
   video_uid,
   uniqState(user_id) -- предагрегированное состояние агрегатной функции, реализуется через комбинатор State
from playback
where play_dt >= '2024-01-01' -- для упрощения ограничиваем выборку. Все следующие инсерты будут заливать дневные инкременты
group by video_uid;

0 rows in set. Elapsed: 85.156 sec. Processed 1.56 billion rows, 35.94 GB (18.35 million rows/s., 422.02 MB/s.)
Peak memory usage: 13.95 GiB.


-- выполняем запрос с финальной агрегацией
select
   video_uid,
   uniqMerge(user_id) as users -- комбинатор Merge возвращает итоговое значение агрегатной функции
from example_state
group by video_uid
order by users desc
limit 10
format Null;

0 rows in set. Elapsed: 15.183 sec. Processed 920.32 thousand rows, 95.71 MB (60.62 thousand rows/s., 6.30 MB/s.)
Peak memory usage: 549.99 MiB.

Таким образом, ежедневно наполняя таблицу example_state вчерашними инкрементами, мы всегда сможем из неё быстро получать результат исходного запроса на топ-10 тайтлов.

Кайф ещё и в том, что при финальной агрегации можно запускать запросы по различным сочетаниям срезов, а не только по тому, какой был использован при вставке:

-- в нашем примере нет других срезов, поэтому можем выполнить запрос и без них
select
   uniqMerge(user_id) as users
from example_state;

1 row in set. Elapsed: 73.634 sec. Processed 208.21 thousand rows, 18.32 MB (2.83 thousand rows/s., 248.83 KB/s.)
Peak memory usage: 283.35 MiB.

Проекции

У кликхауса есть инструмент, который позволяет еще сильнее упростить работу с предагрегированной таблицей из предыдущего примера. Этот инструмент называется проекции (projections).

Вкратце, проекции — фича для ускорения частых запросов к таблице. Грубо говоря, результаты этих запросов укладываются в кэш. Под капотом реализуется как дополнительная скрытая таблица, т.е. нельзя явным образом выполнить из нее селект.

Продолжим с экспериментированием из предыдущего примера: вместо предагрегата создадим проекцию:

/*
   Cоздаем проекцию с группировкой по нужному полю.
   Проекцию создаем к большой таблице, из которой изначально делали селект топ-10 тайтлов.
   Проекции не поддерживают фильтрацию, поэтому здесь под таблицей playback
   будем понимать копию изначальной таблицы playback, отфильтрованную по play_dt >= '2024-01-01'
*/
alter table playback add projection group_by_proj
(select
    video_uid,
    uniq(user_id)
 group by video_uid); -- заметим, тут нет необходимости вписывать название таблицы и from. А также теперь у нас агрегатная функция без -State комбинатора


-- материализуем
alter table playback materialize projection group_by_proj;


-- и выполняем запрос
select
   video_uid,
   uniq(user_id) users
from playback
group by video_uid
order by users desc
limit 10
format Null;

0 rows in set. Elapsed: 17.411 sec. Processed 1.16 million rows, 120.35 MB (66.46 thousand rows/s., 6.91 MB/s.)
Peak memory usage: 9.72 GiB.

Сравниваем с первым запросом из пункта про предагрегированные состояния. Затраты на оперативку снизились в полтора раза, а число просканированных строк — более чем в тысячу.

Причем мы нигде не указывали, что запрос должен идти к проекции. Кликхаус сам определил, что в данном случае оптимальнее будет обращаться к проекции. Убедиться в этом можно из логов запросов (system.query_log.projections).

И последнее про проекции. Как мы видим, в примере с явной таблицей-предагрегатом у нас появляется головная боль в виде необходимости обеспечения консистентности предагрегата и исходной таблицы. Если мы очищаем исходную таблицу, то и предагрегат тоже придется очистить. В случае же с проекциями — обеспечение консистентности кликхаус берет на себя.

UPD: Проекции могут некорректно работать с движками Replacing/Collapsing/Aggregating. Пофикшено в версии 24.8. Спасибо @StanEgo что обратил внимание.

Материализованные представления

Несмотря на то, что мы в Okko почти не используем эту фичу, затронуть ее полезно. Главное, что надо знать про материализованные представления (materialized views) в кликхаусе — они не имеют ничего общего с материализованными представлениями в других популярных СУБД.

Очень хорошо они описаны в этом видео.

Основная идея в том, что матвью — это триггер на инсерт из одной таблицы (источника) в другую (целевую). Как только происходит инсерт в источник, триггерится матвью и происходит инсерт тех же самых блоков данных и в целевую таблицу (с учетом обработок заданных в DDL матвью). Схематично это выглядит так:

Как небольшой команде переехать на ClickHouse: на какие грабли мы наступили и о каких фишках не знали - 5

Например, предагрегаты из пунктов выше можно было реализовать через матвью следующим образом:

-- это целевая таблица - в нее матвью будет инсертить данные
create table mv_destination (
   video_uid UUID,
   user_id AggregateFunction(uniq, Nullable(Int32))
)
engine = AggregatingMergeTree()
order by video_uid;


-- создаем матвью
create materialized view mv
to mv_destination -- здесь указывается в какую таблицу матвью будет инсертить
as
select
   video_uid,
   uniqState(user_id)
from example_pmd
group by video_uid;


-- убеждаемся, что в целевой таблице нет данных
select count() from mv_destination;

┌─count()─┐
│       0 │
└─────────┘


-- инсертим в таблицу-источник 10 000 строк (эмулируем реальный кейс, когда витрина была наполнена, например, очередным днем с данными)
insert into example_pmd
select generateUUIDv4(), rand32() from numbers(10000);


–- инсертнем еще раз те же самые ключи с другими значениями, чтобы убедиться что матвью инсертнуло данные в целевую таблицу в сгруппированном виде
insert into example_pmd
select video_uid, rand32() from example_pmd; –- предполагаем, что изначально таблица была пустой


-- и теперь смотрим на число строк и число уникальных ключей video_uid
select
   count() as total_rows,
   uniqExact(video_uid) as total_keys
from mv_destination;

┌─total_rows─┬─total_keys─┐
│      20000 │      10000 │
└────────────┴────────────┘

1 row in set. Elapsed: 0.011 sec. Processed 20.00 thousand rows, 320.00 KB (1.82 million rows/s., 29.19 MB/s.)
Peak memory usage: 2.09 MiB.

Заметим, что мы не выполняли самостоятельных инсертов в mv_destination. Матвью mv сделало это за нас.

Удобство еще в том, что нам даже не нужно писать скрипт, который по расписанию будет выгребать данные из таблицы-источника. Любой инсерт в таблицу-источник будет триггерить инсерт в целевую.

Но также обязательно стоит иметь в виду, что матвью не знает ни о каких изменениях источника, кроме инсертов. Если вы ее очистите, или удалите какие-то данные из нее, матвью об этом не узнает!

truncate example_pmd;


-- смотрим что поменялось
select
   count() as total_rows,
   uniqExact(video_uid) as total_keys
from mv_destination;

┌─total_rows─┬─total_keys─┐
│      20000 │      10000 │
└────────────┴────────────┘

1 row in set. Elapsed: 0.009 sec. Processed 20.00 thousand rows, 320.00 KB (2.16 million rows/s., 34.63 MB/s.)
Peak memory usage: 2.09 MiB.

Собственно, ничего и не поменялось.

Это одна из причин, по которой мы пока не используем матвью — достаточно сложно обеспечивать консистентность целевой таблицы при изменениях таблицы-источника. Вторая же причина заключается в том, что у нас несколько кластеров, а сырые данные пользовательских действий и наши витрины находятся в разных кластерах. Поэтому даже если создать матвью на одном кластере, то он не узнает об инсерте в источник на другом кластере. И целевая таблица так и останется пустой.

Если хочется узнать про матвью подробнее, то есть крайне понятная презентация.

Как группировать, если не хватает оперативки

Любой уважающий себя пользователь кликхауса обязательно столкнется с такой ошибкой:

Code: 241. DB::Exception: Memory limit (total) exceeded: would use 101.02 GiB
(attempt to allocate chunk of 4256958 bytes), maximum: 100.65 GiB.
OvercommitTracker decision: Query was selected to stop by OvercommitTracker.:
While executing AggregatingTransform. (MEMORY_LIMIT_EXCEEDED)

Понятно, что эта ошибка может быть вызвана кучей разных причин: 

  • много одновременных запросов на кластер

  • неоптимальный запрос без поля партицирования / первичного ключа

  • группировка по стрингам и тп. 

Мы же рассмотрим чистый кейс, когда кажется что оптимизировать запрос с группировкой уже некуда, но мы все равно не помещаемся в оперативку. 

На этот предмет есть шикарное видео, которое настоятельно рекомендую посмотреть всем неравнодушным к кликхаусу. 

Я же оттуда расскажу про одну неочевидную фичу: сброс промежуточных результатов группировки на диск. То есть идея в том, что группировка происходит в оперативке, но как только очередная часть данных сгруппирована, и доступная оперативка заканчивается, можно этот готовый (или почти готовый) результат временно сбросить на диск. Так мы освобождаем место для оставшихся группировок. Настройка включается через max_bytes_before_external_group_by, подробнее прочитать про нее можно здесь.

-- создаем тестовую таблицу
create table example_memory_limit (
   id UInt32,
   value UInt64
)
engine = MergeTree()
order by id
as
select
   number % 100,
   rand32()
from numbers(1000000000);


-- пытаемся выполнить запрос. ограничение - 16 Gb на запрос
select
   id,
   uniqExact(value) as uniqs
from example_memory_limit
group by id
limit 1
settings max_memory_usage = '16G';


падаем с ошибкой
Code: 241. DB::Exception: Received from ____:9000.
DB::Exception: Memory limit (for query) exceeded: would use 14.92 GiB
(attempt to allocate chunk of 0 bytes), maximum: 14.90 GiB.:
While executing AggregatingTransform. (MEMORY_LIMIT_EXCEEDED)


-- а теперь добавим возможность сбрасывать промежуточные результаты на диск
select
   id,
   uniqExact(value) as uniqs
from example_mt
group by id
limit 1
settings max_memory_usage = '16G', max_bytes_before_external_group_by = '8G';

┌─id─┬───uniqs─┐
│ 55 │ 9988394 │
└────┴─────────┘

1 row in set. Elapsed: 18.080 sec. Processed 1.00 billion rows, 12.00 GB (55.31 million rows/s., 663.74 MB/s.)
Peak memory usage: 7.49 GiB.

Как видим, получилось.

Как небольшой команде переехать на ClickHouse: на какие грабли мы наступили и о каких фишках не знали - 6

В некоторых случаях, когда оптимизировать уже некуда, данная фича сильно помогает и спасает от костылей.

Есть и аналогичная настройка max_bytes_before_external_sort. Думаю, понятно, что она делает. Но в нашей практике она используется гораздо реже.

Работа с массивами

Массивы — супермощный и удобный инструмент, и кликхаус создан для работы с ними. 

Конечно, описание работы с массивами заслуживает отдельной статьи (например, этой). Я же постараюсь привести короткий синтетический пример. 

Посмотрим, как можно посчитать длину пользовательской сессии, а также полный набор действий в ней. Под пользовательской сессией будем подразумевать набор действий между нажатиями на кнопку stop (отображается в данных как action = stop). Т.е. набор исходных данных выглядит как-то так:

select * from example_arrays where userid = 123;

┌─userid─┬──────────────────ts─┬─action─┐
│    123 │ 2024-07-15 13:44:14 │ ready  │
│    123 │ 2024-07-15 13:45:17 │ play   │
│    123 │ 2024-07-15 13:46:05 │ play   │
│    123 │ 2024-07-15 13:47:56 │ stop   │ -- конец первой сессии
│    123 │ 2024-07-15 13:48:27 │ resume │
│    123 │ 2024-07-15 13:49:42 │ play   │
│    123 │ 2024-07-15 13:50:19 │ play   │
│    123 │ 2024-07-15 13:51:14 │ stop   │ -- конец второй сессии
│    123 │ 2024-07-15 13:52:33 │ clear  │
│    123 │ 2024-07-15 13:53:01 │ play   │
└────────┴─────────────────────┴────────┘

Тогда посчитать длину сессии и набор экшенов для пользователя 123 можем так:

with raw as (
   select
      -- группируем в массив при помощи groupArray и добавляем еще одно поле, по которому будем сплитовать
      arraySort(x -> x.1, groupArray((ts, action, if(action = 'stop', 1, 0)))) as arr
   from example_arrays
   where userid = 123
)
select
   (events.1[-1] - events.1[1]) as length, -- длина сессии в секундах
   arrayMap(x -> (x.1, x.2), arrayJoin(arraySplit((x, y) -> y, arr, arr.3))) as events
from raw;

/*
   arraySplit - для сплита одного массива на несколько.
   Поверх полученного массива - arrayJoin. чтобы одна строка соответствовала одной сессии.
   Поверх него arrayMap - чтобы оставить только первые два элемента кортежа из
   ts, action, а вспомогательное поле скипнуть
*/
Как небольшой команде переехать на ClickHouse: на какие грабли мы наступили и о каких фишках не знали - 7

Бонусом, обратите внимание, что алиас events задается позже, чем мы к нему обращаемся при расчете length. Но кликхаус все равно нас понимает и не выплевывает ошибку.

Вместо заключения

Понятно, что этот набор описанных выше фишек даже не близок к исчерпывающему. Но я рассказал о том, что особенно сильно зашло нам. Надеюсь, было полезно и вам.

Небольшим дисклеймером скажу, что для упрощения повествования мы рассматривали везде однонодовый одношардовый кластер, поэтому все create / drop / alter и прочие запросы были достаточно короткими. Понятно, что это далеко от реальной жизни, но зато нигде не надо было создавать реплицированные распределенные таблички, а также не надо было добавлять on cluster <название кластера> к большинству запросов. 

Весь код тестировался на ClickHouse версии 23.8.

Немного полезных ссылок:

  • прочие прикольные фишки, особенно актуально для аналитиков 

  • хорошая статья про устройство MergeTree движков

  • хорошая статья про устройство MergeTree движков и не только

  • чат в телеге

  • база знаний от Altinity (один из самых известных интеграторов кликхауса)

  • YouTube-канал

На этом у меня всё. 

Какие ваши любимые фишки кликхауса, о которых я не упоминал? А какие самые мощные фейлы? Приходите в комменты, если есть чем поделиться 👇

Автор: pgonin

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js