Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду

в 16:01, , рубрики: clickhouse, devops, RabbitMQ, Блог компании Ребреин

Однажды я вел вебинар про то, как принимать 10 000 ивентов в секунду. Показал вот такую картинку, зрители увидели сиреневый слой, и началось: «Ребят, а зачем нам все эти кафки и рэббиты, неужели без них не обойтись»? Мы и ответили: «Зачем-зачем, чтобы пройти собес!» 

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду - 1

Очень смешно, но давайте я все-таки объясню.


Мы можем принимать ивенты сразу в зеленой области и заставить наши приложения писать их в кликхаус.

Но кликхаус любит, когда в него пишут сообщения пачками

Другими словами, в него лучше запихнуть миллион сообщений, вместо того чтобы писать по одному. Kafka, Rabbit или Яндекс.Кью выступают как буфер, и мы можем контролировать с его помощью входящую нагрузку. 

Как бывает: в одну секунду пришло 10 тысяч ивентов, в следующую — тысяча, в другую — 50 тысяч. Это нормально, пользователи рандомно включают свои мобильные приложения. В таком случае в кликхаус напрямую будет заходить то 2 тысячи, то 10 тысяч сообщений. Но с помощью буфера вы можете подкопить сообщения, потом достать из этой копилки миллион ивентов и направить в кликхаус. И вот она — желанная стабильная нагрузка на ваш кластер.

Это все история про очереди

— Во-первых, очереди можно использовать для передачи сообщений между различными сервисами. 

Например, для бэкграунд задач. Вы заходите в админку магазина и генерируете отчет по продажам за год. Задача трудоемкая: нужно прочитать миллионы строк из базы, это хлопотно и очень долго. Если клиент будет висеть постоянно с открытым http-коннектом — 5, 10 минут — связь может оборваться, и он не получит файл. 

Логично выполнить эту задачу асинхронно на фоне. Пользователь нажимает кнопку «сгенерировать отчет», ему пишут: «Все окей, отчет генерируется, он придет на вашу почту в течение часа». Задача автоматически попадает в очередь сообщений и далее на «стол» воркера, который выполнит ее и отправит пользователю на ящик.

— Второй кейс про кучу микросервисов, которые общаются через шину. 

Например, один сервис принимает ивенты от пользователей, передает их в очередь. Следующий сервис вытаскивает ивенты и нормализует их, к примеру, проверяет, чтобы у них был валидный e-mail или телефон. Если все хорошо, он перекладывает сообщение дальше, в следующую очередь, из которой данные будут записываться в базу.

— Еще один поинт — это падение дата-центра, в котором хостится база.

Конец, ничего не работает. Что будет с сообщениями? Если писать без буфера, сообщения потеряются. Кликхаус недоступен, клиенты отвалились. До какого-то предела выручит память, но с буфером безопаснее — вы просто взяли и записали сообщения в очередь (например, в кафку). И сообщения будут храниться там, пока не закончится место или пока их не прочитают и не обработают.


Как автоматически добавлять новые виртуалки при увеличении нагрузки

Чтобы протестить нагрузку, я написал приложение и протестил автоматически масштабируемые группы.

Мы создаем инстанс-группу. Задаем ей имя и указываем сервисный аккаунт. Он будет использоваться для создания виртуалок.

resource "yandex_compute_instance_group" "events-api-ig" {
  name               = "events-api-ig"
  service_account_id = yandex_iam_service_account.instances.id

Затем указываем шаблон виртуалки. Указываем CPU, память, размер диска и т.д.

instance_template {
    platform_id = "standard-v2"
    resources {
      memory = 2
      cores  = 2
    }
    boot_disk {
      mode = "READ_WRITE"
      initialize_params {
        image_id = data.yandex_compute_image.container-optimized-image.id
        size = 10
      }

Указываем, к какому сетевому интерфейсу его подрубить.

}
    network_interface {
      network_id = yandex_vpc_network.internal.id
      subnet_ids = [yandex_vpc_subnet.internal-a.id, yandex_vpc_subnet.internal-b.id, yandex_vpc_subnet.internal-c.id]
      nat = true
    }

Самое интересное — это scale_policy.

Можно задать группу фиксированного размера fixed scale с тремя инстансами A, B, C.

scale_policy {
    fixed_scale {
      size = 3
    }
  }

  allocation_policy {
    zones = ["ru-central1-a", "ru-central1-b", "ru-central1-c"]
  }

Либо использовать auto_scale — тогда группа будет автоматически масштабироваться в зависимости от нагрузки и параметров.

scale_policy {
auto_scale {
    initial_size = 3
    measurment_duration = 60
    cpu_utilization_target = 60
    min_zone_size = 1
    max_size = 6
    warmup_duration = 60
    stabilization_duration = 180
}

Главный параметр, на который надо обратить внимание, — это cpu utilization target. Можно выставить значение, при превышении которого Яндекс.Облако автоматически создаст нам новую виртуалку.

Теперь протестируем автомасштабирование при увеличении нагрузки

Наше приложение принимает различные ивенты, проверяет джейсонку и направляет в кафку.

Перед нашей инстанс-группой стоит load-балансер. Он принимает все запросы, которые приходят на адрес 84.201.147.84 на порту 80, и направляет их на нашу инстанс-группу — на порт 8080. 

У меня есть виртуалка, которая с помощью Yandex.Tank делает тестовую нагрузку. Для теста я установил 20 тысяч запросов в течение 5 минут. 


Итак, нагрузка пошла.

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду - 2

Сначала все ноды будут загружены во всех трех зонах (A, B и C), но когда мы превысим нагрузку, Яндекс.Облако должно развернуть дополнительные инстансы.

По логам будет видно, что нагрузка выросла и в каждом регионе количество нод увеличилось до двух. В балансировку добавилась еще одна машина, количество инстансов тоже везде увеличилось.

При этом у меня был интересный момент. Один инстанс, который находится в регионе С, записывал данные (от момента приема данных до записи) за 23 миллисекунды, а у инстанса из региона А было 12,8 миллисекунд. Такое происходит из-за расположения кафки. Кафка находится в регионе А, поэтому в нее записи идут быстрее.

Ставить все инстансы кафки в одном регионе — не надо.

Когда добавилась еще одна машина, новая нагрузка спала, показатель CPU вернулся к норме. Полную аналитику по тестовому запуску можно посмотреть по ссылке: overload.yandex.net/256194.


Как написать приложение для работы с очередями и буферами обмена

Приложение написано на golang. Сначала мы импортируем встроенные модули.

package main

import (
    "encoding/json"
    "flag"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    "strings"

)

Затем подключаем github.com/Shopify/sarama — это библиотека для работы с кафкой.

Прописываем github.com/prometheus/client_golang/prometheus, чтобы метрики передавались в API Metrics.

Также подключаем github.com/streadway/amqp для работы с rabbitmq.

Затем следуют параметры бэкендов, в которые мы будем записывать.

var (
    // Config options
    addr     = flag.String("addr", ":8080", "TCP address to listen to")
    kafka    = flag.String("kafka", "127.0.0.1:9092", "Kafka endpoints")
    enableKafka    = flag.Bool("enable-kafka", false, "Enable Kafka or not")
amqp    = flag.String("amqp", "amqp://guest:guest@127.0.0.1:5672/", "AMQP URI")
enableAmqp    = flag.Bool("enable-amqp", false, "Enable AMQP or not")
sqsUri    = flag.String("sqs-uri", "", "SQS URI")
sqsId    = flag.String("sqs-id", "", SQS Access id")
sqsSecret    = flag.String("sqs-secret", "", "SQS Secret key")
enableSqs    = flag.Bool("enable-sqs", false, "Enable SQS or not")

    

    // Declaring prometheus metrics
    apiDurations = prometheus.NewSummary(
        prometheus.SummaryOpts{
            Name:       "api_durations_seconds",
            Help:       "API duration seconds",
            Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
        },
    )

Адрес кафки — (строка).

Включить кафку или нет — поскольку приложение может писать в несколько разных бэкендов.

В приложении реализована возможность работы с тремя очередям.

Первое — это кафка.

Второе — amqp для рэббита.

И третья очередь — sqs для Яндекс.Кью.

Дальше мы открываем и задаем общие глобальные переменные для работы с нашим бэкендом. Прописываем настройки prometheus для отображения и визуализации.

В main мы включаем кафку, рэббит и создаем очередь с названием Load.

И если у нас включен sqs, мы создаем клиент для Яндекс.Кью.

Дальше наше приложение по http принимает несколько инпоинтов:

/status просто отдает okey, это сигнал для load-балансера, что наше приложение работает.

Если вы кидаете запрос на /post/kafka, ваша джейсонка попадет в кафку. Также работают /post/amqp и /post/sqs.

Как работает кафка

Кафка — простой, некапризный и очень эффективный инструмент. Если вам нужно быстро принять и сохранить много сообщений, кафка к вашим услугам.

Как-то на одном из проектов важно было уложиться в маленький бюджет. И вот представьте, мы берем самые дешевые машины без SSD (а кафка пишет последовательно и читает последовательно, так что можно не тратиться на дорогие диски), ставим кафку и zookeeper. Наше скромное решение на три ноды спокойно выдерживает нагрузку 200 тысяч сообщений в секунду! Кафка — это про «поставил и забыл», за пару лет работы кластер ни разу нас не потревожил. И стоил 120 евро в месяц.

Единственное, что нужно запомнить — кафка очень требовательна к CPU, и ей очень не нравится, когда кто-то рядом поджирает проц. Если у нее будет сосед под боком, она начнет тормозить.

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду - 3

Кафка устроена так: у вас есть topic, можно сказать, что это название очереди. Каждый topic бьется на части до 50 partitions. Эти партиции размещаются на разных серверах.

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду - 4

Как вы видите на схемке, topic load разбит на 3 партиции. Partition 1 оказывается на Kafka 1, вторая партиция — на кафка 2, третья — на 3. Тем самым нагрузка полностью распределяется. Когда кластер начинает принимать нагрузку, сообщения пишутся в один топик, а кафка раскидывает их по партициям, гоняет их по кругу. В итоге все ноды нагружаются равномерно.

Можно заморочиться и разбить топик на 50 партиций, поставить 50 серверов и расположить на каждом сервере по 1 партиции — нагрузка распределится на 50 нод. И это очень круто.

Партиции могут реплицироваться благодаря zookeeper. Кафке необходимо минимум 3 ноды зукипера. Например, вы хотите, чтобы ваша партиция реплицировались на 2 ноды. Указываете репликейшн фактор 2 и каждая партиция будет закинута 2 раза на рандомные хосты. И если ваша нода упадет, то благодаря зукиперу кафка это увидит: «ага, первая нода в дауне», кафка 2 заберет себе первую партицию.

Как я разворачивал кафку с помощью Terraform

В репозитории у нас есть terraform-файл, он называется kafka.tf .

Вначале мы поднимем 3 зукипера: resource “yandex compute instance” “zookeeper count = 3”.

Потом находим “zookeeper_deploy”, который деплоит наш зукипер. Хорошо, если он будет вынесен на отдельные машины, где кроме него ничего нет. Далее собираем айдишники нод и генерируем файл. Запускаем ansible для настройки зукипера.

Кафку поднимаем аналогично зукиперу и, что важно, после него.

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду - 5

Как работает RabbitMQ 

Благодаря тому, что кафка по сути просто сохраняет сообщения на диск и по запросу отдает клиенту с нужного места, она очень и очень шустрая. Производительность рэббита значительно ниже, но он напичкан фичами под завязку! Рэббит пытается сделать очень многое, что естественным образом влечет за собой потребление ресурсов.

Неужели нельзя обойтись без кафок и рэббитов, когда принимаешь 10 000 ивентов в секунду - 6

Рэббит уже не так прост — тут вам и exchanges с роутингом, и куча плагинов для delayed messages, deadletter и прочего хлама. За сообщениями следит сам кролик. Как только консьюмер подтвердил обработку сообщения, оно удаляется. Если консьюмер отвалился посередине — рэббит вернет сообщение в очередь. В общем, хороший комбайн, когда нужно перекидывать сообщения между сервисами. Цена этого — производительность.

Рэббит практически все делает внутри себя. В нем есть много встроенных инструментов, можно подключать разные плагины, много настроек для работы с сообщениями и очередями.

Если вам нужно перекидывать сообщения между сервисами в небольшом количестве — ваш выбор однозначно RabbitMQ. Если вам необходимо быстро сохранять кучу событий — метрики от клиентов, логи, аналитика и т.д. — ваш выбор kafka. Подробнее о сравнении двух инструментов можно прочитать в моей статье.

И еще: рэббиту не нужен зукипер.

Автор: Озеров Василий

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js