В этом материале я продолжаю делиться полевым опытом работы с системой сбора логов на базе Heka и ElasticSearch.
На этот раз рассказ пойдет про миграцию данных между двумя кластерами ElasticSearch 2.2 и 5.2.2, которая стоила немалых нервов лично мне. Как-никак, предстояло перевезти 24 миллиарда записей, не сломав уже работающую систему.
Прошлая статья закончилась на том, что система работает, логи поступают и складываются в кластер ElasticSearch, доступен их просмотр в реальном времени через Kibana. Но кластер изначально был собран со значительным запасом по памяти как раз на вырост.
Если обратиться к официальной документации ElasticSearch (далее просто ES), то в первую очередь вы увидите строгое предупреждение «Don't cross 32 gb». Превышение грозит проседанием производительности вплоть до моментов полной остановки, пока garbage collector выполняет пересборку в духе «stop the world». Рекомендация производителя по памяти на сервере: 32 ГБ под heap (xms/xmx) и еще 32 ГБ свободного места под кэш. Итого 64 ГБ физической памяти на одну дата-ноду.
Но что делать, если памяти больше? Официальный ответ все в той же документации – ставить несколько экземпляров ES на один хост. Но мне такой подход показался не совсем правильным, так как штатных средств для этого не предусмотрено. Дублировать init-скрипты – это прошлый век, поэтому более интересной выглядела виртуализация кластера с размещением нод в LXD-контейнерах.
LXD (Linux Container Daemon) – так называемый «контейнерный легковизор». В отличии от «тяжелых» гипервизоров не содержит эмуляции аппаратуры, что позволяет сократить накладные расходы на виртуализацию. К тому же имеет продвинутый REST API, гибкую настройку используемых ресурсов, возможности переноса контейнеров между хостами и другие возможности, более характерные для классических систем виртуализации.
Вот такая вырисовывалась структура будущего кластера.
К началу работ под рукой было следующее железо:
-
Четыре работающих дата-ноды ES в составе старого кластера: Intel Xeon 2x E5-2640 v3; 512 ГБ ОЗУ, 3x16 ТБ RAID-10.
- Два новых пустых сервера аналогичной предыдущему пункту конфигурации.
По задумке, на каждом физическом сервере будет две дата-ноды ES, мастер-нода и клиентская нода. Кроме того, на сервере разместится контейнер-приёмник логов с установленными HAProxy и пулом Heka для обслуживания дата-нод этого физического сервера.
Подготовка нового кластера
В первую очередь нужно освободить одну из дата-нод – этот сервер сразу уходит в новый кластер. Нагрузка на оставшиеся три возрастет на 30%, но они справятся, что подтверждает статистика загрузки за последний месяц. Тем более это ненадолго. Далее привожу свою последовательность действий для штатного вывода дата-ноды из кластера.
Снимем с четвертой дата-ноды нагрузку, запретив размещение на ней новых индексов:
{
"transient": {
"cluster.routing.allocation.exclude._host": "log-data4"
}
}
Теперь выключаем автоматическую ребалансировку кластера на время миграции, чтобы не создавать лишней нагрузки на оставшиеся дата-ноды:
{
"transient": {
"cluster.routing.rebalance.enable": "none"
}
}
Собираем список индексов с освобождаемой дата-ноды, делим его на три равные части и запускаем перемещение шардов на оставшиеся дата-ноды следующим образом (по каждому индексу и шарду):
PUT _cluster/reroute
{
"commands" : [ {
"move" :
{
"index" : "service-log-2017.04.25", "shard" : 0,
"from_node" : "log-data4", "to_node" : "log-data1"
}
}
}
Когда перенос завершится, выключаем освободившуюся ноду и не забываем вернуть ребалансировку обратно:
{
"transient": {
"cluster.routing.rebalance.enable": "all"
}
}
Если позволяют сеть и нагрузка на кластер, то для ускорения процесса можно увеличить очередь одновременно перемещаемых шардов (по умолчанию это количество равно двум)
{
"transient": {
"cluster": {
"routing": {
"allocation": {
"cluster_concurrent_rebalance": "10"
}
}
}
}
}
Пока старый кластер постепенно приходит в себя, собираем на трёх имеющихся серверах новый на базе ElasticSearch 5.2.2, с отдельными LXD-контейнерами под каждую ноду. Дело это простое и хорошо описанное в документации, поэтому опущу подробности. Если что – спрашивайте в комментариях, расскажу детально.
В ходе настройки нового кластера я распределил память следующим образом:
-
Мастер-ноды: 4 ГБ
-
Клиентские ноды: 8 ГБ
-
Дата-ноды: 32 ГБ
- XMS везде устанавливаем равным XMX.
Такое распределение родилось после осмысления документации, просмотра статистики работы старого кластера и применения здравого смысла.
Синхронизируем кластеры
Итак, у нас есть два кластера:
-
Старый – три дата-ноды, каждая на железном сервере.
- Новый, с шестью дата-нодами в LXD контейнерах, по две на сервер.
Первое, что делаем, – включаем зеркалирование трафика в оба кластера. На приемных пулах Heka (за подробным описанием отсылаю к предыдущей статье цикла) добавляем ещё одну секцию Output для каждого обрабатываемого сервиса:
[Service1Output_Mirror]
type = "ElasticSearchOutput"
message_matcher = "Logger == 'money-service1''"
server = "http://newcluster.receiver:9200"
encoder = "Service1Encoder"
use_buffering = true
После этого трафик пойдет параллельно в оба кластера. Учитывая, что мы храним индексы с оперативными логами компонент не более 21 дня, на этом можно было бы и остановиться. Через 21 день в кластерах будут одинаковые данные, а старый можно отключить и разобрать. Но долго и скучно столько ждать. Поэтому переходим к последнему и самому интересному этапу – миграции данных между кластерами.
Перенос индексов между кластерами
Так как официальной процедуры миграции данных между кластерами ES на момент выполнения проекта не существует, а изобретать «костыли» не хочется – используем Logstash. В отличии от Heka он умеет не только писать данные в ES, но и читать их оттуда.
Судя по комментариям к прошлой статье, у многих сформировалось мнение, что я почему-то не люблю Logstash. Но ведь каждый инструмент предназначен для своих задач, и для миграции между кластерами именно Logstash подошёл как нельзя лучше.
На время миграции полезно увеличить размер буфера памяти под индексы с 10% по умолчанию до 40%, которые выбраны по среднему количеству свободной памяти на работающих дата-нодах ES. Также нужно выключить обновление индексов на каждой дата-ноде, для чего добавляем в конфигурацию дата-нод следующие параметры:
memory.index_buffer_size: 40%
index.refresh_interval: -1
По-умолчанию индекс обновляется каждую секунду и создает тем самым лишнюю нагрузку. Поэтому, пока в новый кластер никто из пользователей не смотрит, обновление можно отключить. Заодно я создал шаблон по умолчанию для нового кластера, который будет использоваться при формировании новых индексов:
{
"default": {
"order": 0,
"template": "*",
"settings": {
"index": {
"number_of_shards": "6",
"number_of_replicas": "0"
}
}
}
}
С помощью шаблона выключаем на время миграции репликацию, тем самым снизив нагрузку на дисковую систему.
Для Logstash получилась следующая конфигурация:
input {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "index_name"
size => 5000
docinfo => true
query => '{ "query": { "match_all": {} }, "sort": [ "@timestamp" ] }'}
}
output {
elasticsearch { hosts => [ "log-new-data1:9200" ]
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"}}
}
В секции input описываем источник получения данных, указываем системе, что данные нужно забирать пачками (bulk) по 5000 записей, и выбираем все записи, отсортированные по timestamp.
В output нужно указать назначение для пересылки полученных данных. Обратите внимание на описания следующих полей, которые можно получить из старых индексов:
-
document_type – тип (mapping) документа, который лучше указать при переезде, чтобы имена создаваемых mappings в новом кластере совпадали с именами в старом – они используются в сохранённых запросах и дашбордах.
- document_id – внутренний идентификатор записи в индексе, который представляет собой уникальный 20-символьный хэш. С его явной передачей решаются две задачи: во-первых, облегчаем нагрузку на новый кластер не требуя генерировать id для каждой из миллиардов записей, и во-вторых, в случае прерывания процесса нет необходимости удалять недокачанный индекс, можно просто запустить процесс заново, и ES проигнорирует записи с совпадающим id.
Параметры запуска Logstash:
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/migrate.conf --pipeline.workers 8
Ключевыми параметрами, влияющими на скорость миграции, являются размер пачек, которые Logstash будет отправлять в ES, и количество одновременно запускаемых процессов (pipeline.workers) для обработки. Строгих правил, которые определяли бы выбор этих значений, нет – они выбирались экспериментальным путем по следующей методике:
-
Выбираем небольшой индекс: для тестов использовался индекс с 1 млн многострочных (это важно) записей.
-
Запускаем миграцию этого индекса с помощью Logstash.
-
Смотрим на thread_pool на приёмной дата-ноде, обращая внимание на количество «rejected» записей. Рост этого значения однозначно говорит о том, что ES не успевает проиндексировать поступающие данные – тогда количество параллельных процессов Logstash стоит уменьшить.
- Если резкого роста «rejected» записей не происходит – увеличиваем количество bulk/workers и повторяем процесс.
После того как всё было подготовлено, составлены списки индексов на переезд, написаны конфигурации, а также разосланы предупреждения о предстоящих нагрузках в отделы сетевой инфраструктуры и мониторинга, я запустил процесс.
Чтобы не сидеть и не перезапускать процесс logstash, после завершения миграции очередного индекса я сделал с новым файлом конфигурации следующее:
-
Список индексов на переезд разделил на три примерно равные части.
-
В /etc/logstash/conf.d/migrate.conf оставил только статическую часть конфигурации:
input { elasticsearch { hosts => [ "localhost:9200" ] size => 5000 docinfo => true query => '{ "query": { "match_all": {} }, "sort": [ "@timestamp" ] }'} } output { elasticsearch { hosts => [ "log-new-data1:9200" ] index => "%{[@metadata][_index]}" document_type => "%{[@metadata][_type]}" document_id => "%{[@metadata][_id]}"}} }
-
Собрал скрипт, который читает имена индексов из файла и вызывает процесс logstash, динамически подставляя имя индекса и адрес ноды для миграции.
- Всего нужно запустить три экземпляра скрипта, по одному на каждый файл: indices.to.move.0.txt, indices.to.move.1.txt и indices.to.move.2.txt. После этого данные уходят в первую, третью и пятую дата-ноды.
Код одного из экземпляров скрипта:
cat /tmp/indices_to_move.0.txt | while read line
do
echo $line > /tmp/0.txt && /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/migrate.conf --pipeline.workers 8 --config.string "input {elasticsearch { index => "$line" }} output { elasticsearch { hosts => [ "log-new-data1:9200" ] }}"
done;
Для просмотра статуса миграции пришлось «на коленке» собрать ещё один скрипт, и запустить в отдельном процессе screen (через watch -d -n 60):
#!/bin/bash
regex=$(cat /tmp/?.txt)
regex="(($regex))"
regex=$(echo $regex | sed 's/ /)|(/g')
curl -s localhost:9200/_cat/indices?h=index,docs.count,docs.deleted,store.size | grep -P $regex |sort > /tmp/indices.local
curl -s log-new-data1:9200/_cat/indices?h=index,docs.count,docs.deleted,store.size | grep -P$regex | sort > /tmp/indices.remote
echo -e "indextttcount.sourcetcount.desttremainingtdeletedtsource.gbtdest.gb"
diff --side-by-side --suppress-common-lines /tmp/indices.local /tmp/indices.remote | awk '{print $1"t"$2"t"$7"t"$2-$7"t"$8"t"$4"tt"$9}'
Процесс миграции занял около недели. И честно скажу – спалось мне эту неделю неспокойно.
После переезда
После переноса индексов осталось сделать совсем немного. В одну прекрасную субботнюю ночь старый кластер был выключен и изменены записи в DNS. Поэтому все пришедшие на работу в понедельник увидели новый розово-голубой интерфейс пятой Kibana. Пока сотрудники привыкали к обновленной цветовой гамме и изучали новые возможности, я продолжил работу.
Из старого кластера взял еще один освободившийся сервер и поставил на него два контейнера с дата-нодами ES под кластер новый. Все остальное железо отправилось в резерв.
Итоговая структура получилась точно такой, какой планировалась на первой схеме:
-
Три мастер-ноды.
-
Две клиентские ноды.
-
Восемь дата-нод (по две на сервер).
- Четыре log-receiver (HAProxy + Heka Pools, по одному на каждый сервер).
Переводим кластер в production режим – возвращаем параметры буферов и интервалы обновления индексов:
memory.index_buffer_size: 10%
index.refresh_interval: 1s
Кворум кластера (учитывая три мастер-ноды) выставляем равным двум:
discovery.zen.minimum_master_nodes: 2
Далее нужно вернуть значения шард, принимая во внимание, что дата-нод у нас уже восемь:
{
"default": {
"order": 0,
"template": "*",
"settings": {
"index": {
"number_of_shards": "8",
"number_of_replicas": "1"
}
}
}
}
Наконец, выбираем удачный момент (все сотрудники разошлись по домам) и перезапускаем кластер.
Нашардить, но не смешивать
В этом разделе я хочу обратить особое внимание на снижение общей надёжности системы, которое возникает при размещении нескольких дата-нод ES в одном железном сервере, да и вообще при любой виртуализации.
С точки зрения ES кластера – всё хорошо: индекс разбит на шарды по количеству дата-нод, каждый шард имеет реплику, primary и replica шарды хранятся на разных нодах.
Система шардирования и репликации в ES повышают как скорость работы, так и надёжность хранения данных. Но эта система проектировалась с учётом размещения одной ноды ES на одном сервере, когда в случае проблем с оборудованием теряется лишь одна дата-нода ES. В случае с нашим кластером упадут две. Даже с учетом равного разделения индексов между всеми нодами и наличия реплики для каждого шарда, не исключена ситуация когда primary и replica одного и того же шарда оказываются на двух смежных дата-нодах одного физического сервера.
Поэтому разработчики ES предложили инструмент для управления размещением шард в пределах одного кластера – Shard Allocation Awareness (SAA). Этот инструмент позволяет при размещении шард оперировать не дата-нодами, а более глобальными структурами вроде серверов с LXD-контейнерами.
В настройки каждой дата-ноды нужно поместить ES атрибут, описывающий физический сервер, на котором она находится:
node.attr.rack_id: log-lxd-host-N
Теперь нужно перезагрузить ноды для применения новых атрибутов, и добавить в конфигурацию кластера следующий код:
{
"persistent": {
"cluster": {
"routing": {
"allocation": {
"awareness": {
"attributes": "rack_id"
}
}
}
}
}
}
Причем только в таком порядке, ведь после включения SAA кластер не будет размещать шарды на нодах без указанного атрибута.
Кстати, аналогичный механизм можно использовать для нескольких атрибутов. Например, если кластер расположен в нескольких дата-центрах и вы не хотите туда-сюда перемещать шарды между ними. В этом случае уже знакомые настройки будут выглядеть так:
node.attr.rack_id: log-lxd-hostN
node.attr.dc_id: datacentre_name
{
"persistent": {
"cluster": {
"routing": {
"allocation": {
"awareness": {
"attributes": "rack_id, dc_id"
}
}
}
}
}
}
Казалось бы, все в этом разделе очевидно. Но именно очевидное и вылетает из головы в первую очередь, так что отдельно проверьте – тогда после переезда не будет мучительно больно.
Следующая статья цикла будет посвящена двум моим самым любимым темам – мониторингу и тюнингу уже построенной системы. Обязательно пишите в комментариях, если что-то из уже написанного или запланированного особенно интересно и вызывает вопросы.
Автор: adel-s