Однажды я вел вебинар про то, как принимать 10 000 ивентов в секунду. Показал вот такую картинку, зрители увидели сиреневый слой, и началось: «Ребят, а зачем нам все эти кафки и рэббиты, неужели без них не обойтись»? Мы и ответили: «Зачем-зачем, чтобы пройти собес!»
Очень смешно, но давайте я все-таки объясню.
Мы можем принимать ивенты сразу в зеленой области и заставить наши приложения писать их в кликхаус.
Но кликхаус любит, когда в него пишут сообщения пачками
Другими словами, в него лучше запихнуть миллион сообщений, вместо того чтобы писать по одному. Kafka, Rabbit или Яндекс.Кью выступают как буфер, и мы можем контролировать с его помощью входящую нагрузку.
Как бывает: в одну секунду пришло 10 тысяч ивентов, в следующую — тысяча, в другую — 50 тысяч. Это нормально, пользователи рандомно включают свои мобильные приложения. В таком случае в кликхаус напрямую будет заходить то 2 тысячи, то 10 тысяч сообщений. Но с помощью буфера вы можете подкопить сообщения, потом достать из этой копилки миллион ивентов и направить в кликхаус. И вот она — желанная стабильная нагрузка на ваш кластер.
Это все история про очереди
— Во-первых, очереди можно использовать для передачи сообщений между различными сервисами.
Например, для бэкграунд задач. Вы заходите в админку магазина и генерируете отчет по продажам за год. Задача трудоемкая: нужно прочитать миллионы строк из базы, это хлопотно и очень долго. Если клиент будет висеть постоянно с открытым http-коннектом — 5, 10 минут — связь может оборваться, и он не получит файл.
Логично выполнить эту задачу асинхронно на фоне. Пользователь нажимает кнопку «сгенерировать отчет», ему пишут: «Все окей, отчет генерируется, он придет на вашу почту в течение часа». Задача автоматически попадает в очередь сообщений и далее на «стол» воркера, который выполнит ее и отправит пользователю на ящик.
— Второй кейс — про кучу микросервисов, которые общаются через шину.
Например, один сервис принимает ивенты от пользователей, передает их в очередь. Следующий сервис вытаскивает ивенты и нормализует их, к примеру, проверяет, чтобы у них был валидный e-mail или телефон. Если все хорошо, он перекладывает сообщение дальше, в следующую очередь, из которой данные будут записываться в базу.
— Еще один поинт — это падение дата-центра, в котором
Конец, ничего не работает. Что будет с сообщениями? Если писать без буфера, сообщения потеряются. Кликхаус недоступен, клиенты отвалились. До какого-то предела выручит память, но с буфером безопаснее — вы просто взяли и записали сообщения в очередь (например, в кафку). И сообщения будут храниться там, пока не закончится место или пока их не прочитают и не обработают.
Как автоматически добавлять новые виртуалки при увеличении нагрузки
Чтобы протестить нагрузку, я написал приложение и протестил автоматически масштабируемые группы.
Мы создаем инстанс-группу. Задаем ей имя и указываем сервисный аккаунт. Он будет использоваться для создания виртуалок.
resource "yandex_compute_instance_group" "events-api-ig" {
name = "events-api-ig"
service_account_id = yandex_iam_service_account.instances.id
Затем указываем шаблон виртуалки. Указываем CPU, память, размер диска и т.д.
instance_template {
platform_id = "standard-v2"
resources {
memory = 2
cores = 2
}
boot_disk {
mode = "READ_WRITE"
initialize_params {
image_id = data.yandex_compute_image.container-optimized-image.id
size = 10
}
Указываем, к какому сетевому интерфейсу его подрубить.
}
network_interface {
network_id = yandex_vpc_network.internal.id
subnet_ids = [yandex_vpc_subnet.internal-a.id, yandex_vpc_subnet.internal-b.id, yandex_vpc_subnet.internal-c.id]
nat = true
}
Самое интересное — это scale_policy.
Можно задать группу фиксированного размера fixed scale с тремя инстансами A, B, C.
scale_policy {
fixed_scale {
size = 3
}
}
allocation_policy {
zones = ["ru-central1-a", "ru-central1-b", "ru-central1-c"]
}
Либо использовать auto_scale — тогда группа будет автоматически масштабироваться в зависимости от нагрузки и параметров.
scale_policy {
auto_scale {
initial_size = 3
measurment_duration = 60
cpu_utilization_target = 60
min_zone_size = 1
max_size = 6
warmup_duration = 60
stabilization_duration = 180
}
Главный параметр, на который надо обратить внимание, — это cpu utilization target. Можно выставить значение, при превышении которого Яндекс.Облако автоматически создаст нам новую виртуалку.
Теперь протестируем автомасштабирование при увеличении нагрузки
Наше приложение принимает различные ивенты, проверяет джейсонку и направляет в кафку.
Перед нашей инстанс-группой стоит load-балансер. Он принимает все запросы, которые приходят на адрес 84.201.147.84 на порту 80, и направляет их на нашу инстанс-группу — на порт 8080.
У меня есть виртуалка, которая с помощью Yandex.Tank делает тестовую нагрузку. Для теста я установил 20 тысяч запросов в течение 5 минут.
Итак, нагрузка пошла.
Сначала все ноды будут загружены во всех трех зонах (A, B и C), но когда мы превысим нагрузку, Яндекс.Облако должно развернуть дополнительные инстансы.
По логам будет видно, что нагрузка выросла и в каждом регионе количество нод увеличилось до двух. В балансировку добавилась еще одна машина, количество инстансов тоже везде увеличилось.
При этом у меня был интересный момент. Один инстанс, который находится в регионе С, записывал данные (от момента приема данных до записи) за 23 миллисекунды, а у инстанса из региона А было 12,8 миллисекунд. Такое происходит из-за расположения кафки. Кафка находится в регионе А, поэтому в нее записи идут быстрее.
Ставить все инстансы кафки в одном регионе — не надо.
Когда добавилась еще одна машина, новая нагрузка спала, показатель CPU вернулся к норме. Полную аналитику по тестовому запуску можно посмотреть по ссылке: overload.yandex.net/256194.
Как написать приложение для работы с очередями и буферами обмена
Приложение написано на golang. Сначала мы импортируем встроенные модули.
package main
import (
"encoding/json"
"flag"
"io"
"io/ioutil"
"log"
"net/http"
"strings"
)
Затем подключаем github.com/Shopify/sarama — это библиотека для работы с кафкой.
Прописываем github.com/prometheus/client_golang/prometheus, чтобы метрики передавались в API Metrics.
Также подключаем github.com/streadway/amqp для работы с rabbitmq.
Затем следуют параметры бэкендов, в которые мы будем записывать.
var (
// Config options
addr = flag.String("addr", ":8080", "TCP address to listen to")
kafka = flag.String("kafka", "127.0.0.1:9092", "Kafka endpoints")
enableKafka = flag.Bool("enable-kafka", false, "Enable Kafka or not")
amqp = flag.String("amqp", "amqp://guest:guest@127.0.0.1:5672/", "AMQP URI")
enableAmqp = flag.Bool("enable-amqp", false, "Enable AMQP or not")
sqsUri = flag.String("sqs-uri", "", "SQS URI")
sqsId = flag.String("sqs-id", "", SQS Access id")
sqsSecret = flag.String("sqs-secret", "", "SQS Secret key")
enableSqs = flag.Bool("enable-sqs", false, "Enable SQS or not")
// Declaring prometheus metrics
apiDurations = prometheus.NewSummary(
prometheus.SummaryOpts{
Name: "api_durations_seconds",
Help: "API duration seconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
)
Адрес кафки — (строка).
Включить кафку или нет — поскольку приложение может писать в несколько разных бэкендов.
В приложении реализована возможность работы с тремя очередям.
Первое — это кафка.
Второе — amqp для рэббита.
И третья очередь — sqs для Яндекс.Кью.
Дальше мы открываем и задаем общие глобальные переменные для работы с нашим бэкендом. Прописываем настройки prometheus для отображения и визуализации.
В main мы включаем кафку, рэббит и создаем очередь с названием Load.
И если у нас включен sqs, мы создаем клиент для Яндекс.Кью.
Дальше наше приложение по http принимает несколько инпоинтов:
/status просто отдает okey, это сигнал для load-балансера, что наше приложение работает.
Если вы кидаете запрос на /post/kafka, ваша джейсонка попадет в кафку. Также работают /post/amqp и /post/sqs.
Как работает кафка
Кафка — простой, некапризный и очень эффективный инструмент. Если вам нужно быстро принять и сохранить много сообщений, кафка к вашим услугам.
Как-то на одном из проектов важно было уложиться в маленький бюджет. И вот представьте, мы берем самые дешевые машины без SSD (а кафка пишет последовательно и читает последовательно, так что можно не тратиться на дорогие диски), ставим кафку и zookeeper. Наше скромное решение на три ноды спокойно выдерживает нагрузку 200 тысяч сообщений в секунду! Кафка — это про «поставил и забыл», за пару лет работы кластер ни разу нас не потревожил. И стоил 120 евро в месяц.
Единственное, что нужно запомнить — кафка очень требовательна к CPU, и ей очень не нравится, когда кто-то рядом поджирает проц. Если у нее будет сосед под боком, она начнет тормозить.
Кафка устроена так: у вас есть topic, можно сказать, что это название очереди. Каждый topic бьется на части до 50 partitions. Эти партиции размещаются на разных серверах.
Как вы видите на схемке, topic load разбит на 3 партиции. Partition 1 оказывается на Kafka 1, вторая партиция — на кафка 2, третья — на 3. Тем самым нагрузка полностью распределяется. Когда кластер начинает принимать нагрузку, сообщения пишутся в один топик, а кафка раскидывает их по партициям, гоняет их по кругу. В итоге все ноды нагружаются равномерно.
Можно заморочиться и разбить топик на 50 партиций, поставить 50 серверов и расположить на каждом сервере по 1 партиции — нагрузка распределится на 50 нод. И это очень круто.
Партиции могут реплицироваться благодаря zookeeper. Кафке необходимо минимум 3 ноды зукипера. Например, вы хотите, чтобы ваша партиция реплицировались на 2 ноды. Указываете репликейшн фактор 2 и каждая партиция будет закинута 2 раза на рандомные хосты. И если ваша нода упадет, то благодаря зукиперу кафка это увидит: «ага, первая нода в дауне», кафка 2 заберет себе первую партицию.
Как я разворачивал кафку с помощью Terraform
В репозитории у нас есть terraform-файл, он называется kafka.tf .
Вначале мы поднимем 3 зукипера: resource “yandex compute instance” “zookeeper count = 3”.
Потом находим “zookeeper_deploy”, который деплоит наш зукипер. Хорошо, если он будет вынесен на отдельные машины, где кроме него ничего нет. Далее собираем айдишники нод и генерируем файл. Запускаем ansible для настройки зукипера.
Кафку поднимаем аналогично зукиперу и, что важно, после него.
Как работает RabbitMQ
Благодаря тому, что кафка по сути просто сохраняет сообщения на диск и по запросу отдает клиенту с нужного места, она очень и очень шустрая. Производительность рэббита значительно ниже, но он напичкан фичами под завязку! Рэббит пытается сделать очень многое, что естественным образом влечет за собой потребление ресурсов.
Рэббит уже не так прост — тут вам и exchanges с роутингом, и куча плагинов для delayed messages, deadletter и прочего хлама. За сообщениями следит сам кролик. Как только консьюмер подтвердил обработку сообщения, оно удаляется. Если консьюмер отвалился посередине — рэббит вернет сообщение в очередь. В общем, хороший комбайн, когда нужно перекидывать сообщения между сервисами. Цена этого — производительность.
Рэббит практически все делает внутри себя. В нем есть много встроенных инструментов, можно подключать разные плагины, много настроек для работы с сообщениями и очередями.
Если вам нужно перекидывать сообщения между сервисами в небольшом количестве — ваш выбор однозначно RabbitMQ. Если вам необходимо быстро сохранять кучу событий — метрики от клиентов, логи, аналитика и т.д. — ваш выбор kafka. Подробнее о сравнении двух инструментов можно прочитать в моей статье.
И еще: рэббиту не нужен зукипер.
Автор: Озеров Василий