Для того чтобы мы могли считать статистику, наш сайт в своей работе генерирует огромное количество событий. Например, при отправке сообщения другому пользователю, при заходе пользователя на сайт, при смене местоположения и т.д. События представляют из себя строку в формате JSON или GPB (Google Protocol Buffers) и содержат время отправки, идентификатор пользователя, тип события, а также поля, относящиеся непосредственно к самому событию (например, координаты пользователя).
Каждую секунду генерируются сотни тысяч событий, и нам нужны инструменты, чтобы их собирать и анализировать эффективно и с минимальной задержкой. Мы рассматривали несколько существующих решений для этой задачи и до недавнего времени использовали демон под названием Scribe от Facebook. Он в целом нас устраивал и позволял делать все, что нам нужно. Однако в какой-то момент Facebook забросил свою разработку, и при некоторых условиях Scribe начал у нас падать (например, при перегрузке upstream-серверов). Самостоятельно устранить причину падений демона у нас не получилось, поэтому мы начали искать альтернативу.
Наши требования к системе доставки событий были следующие:
- наличие локального (прокси) демона;
- сохранение событий на диск в случае недоступности принимающего сервера;
- возможность маршрутизации событий по категориям;
- шардирование потоков данных по хешу (от user_id или другого) и round-robin;
- запись событий в файлы на принимающей стороне (scribe-like);
- нормальная работа в условиях высокой latency сети (доставка событий между ДЦ);
- масштабируемость приема и отсылки до миллиона событий в секунду;
- легкость эксплуатации, адекватное потребление ресурсов.
Мы рассматривали следующие варианты:
- Apache Flume: нестабилен, теряет события при падении, если не использовать Spooling Directory Source, который имеет очень неудобный API;
- FluentD: слишком низкая производительность, в остальном очень хорош;
- Apache Kafka: нет локального агента (см. issues.apache.org/jira/browse/KAFKA-1955).
К сожалению, ни один из этих вариантов не решает все наши проблемы, поэтому мы решили написать свою систему и назвали ее Live Streaming Daemon (LSD).
Что умел Scribe?
Чтобы понять, что делает LSD и зачем он нужен, давайте сначала подробнее рассмотрим фичи, которые мы использовали в scribe.
Наличие локального демона
Scribe работает по архитектуре «клиент-сервер», где клиентами называются машины, которые генерируют события, а серверами — машины, которые их получают. Чтобы экономить ресурсы и уметь буферизовать на диск события в случае проблем с доставкой, Scribe предлагать запускать инстансы клиента на каждой машине, на которой генерируются события. Приложение, генерирующее события, соединяется с локальным клиентом через unix или tcp socket и посылает в него события через протокол Apache Thrift. Предполагается, что локальный прокси будет всегда доступен и будет отвечать за небольшое время.
В целом в большинстве случаев все так и работает, однако иногда локальный инстанс Scribe может начать отвечать дольше обычного или вообще аварийно завершиться. Поэтому у нас был механизм, который сохраняет событие в локальные файлы вместо Scribe, если он недоступен. Отдельным cron-скриптом мы потом отправляли эти события в Scribe, когда он поднимался.
Возможность маршрутизации событий по категориям
Категорией события называется, по сути, имя директории, в которую будет записано то или иное событие на принимающем сервере. Разные типы событий имеет смысл класть в различные категории, поскольку обработчик для них может отличаться. В Scribe предусмотрена возможность посылать разные категории на разные сервера и задается она маской имени категории, например debug_*
.
В нашей конфигурации все события по умолчанию доставляются в европейский ДЦ. Если нужно доставить событие в пределах ДЦ, мы посылаем событие, которое имеет префикс local_
, или же, если мы хотим доставить событие в определенный ДЦ, мы добавляем префикс с именем этого ДЦ. В конфигурации демона прописаны разные маршруты для этих категорий, и они доставляются туда, куда нужно. При доставке в удаленные ДЦ могут использоваться промежуточные узлы для буферизации событий.
Шардирование потоков данных
Иногда бывает удобно доставлять данные, которые относятся к конкретному пользователю, на один и тот же сервер. В некоторых случаях это позволяет значительно улучшить производительность обработки за счет кеширования данных пользователя на небольшое время.
Как правило, данные распределяются просто по алгоритму round-robin, то есть каждый следующий кусок данных посылается на следующий сервер из списка, и так по кругу. У Scribe есть недостаток при работе в обоих режимах: демон «запоминает» сервер, на который нужно доставить конкретное событие, и при недоступности одного из принимающих серверов события будут копиться на диске и никуда не доставляться, даже если остальные сервера доступны и способны принять и обработать весь поток событий.
Запись событий в файлы на принимающей стороне
На принимающей стороне (т.е. на стороне сервера) все события пишутся в файлы вида <имя_категории>/<имя_категории>-<дата>_<счетчик>
, а также создается симлинк вида <имя_категории>/<имя_категории>_current
на последний файл в категории. Файлы ротируются на основании прошедшего времени (например, 60 секунд) или объема (например, 10 Мб) в зависимости от того, что случится раньше.
Если категория называется, к примеру, error_log, то иерархия файлов и директорий будет следующая:
/var/scribe/error_log/ |-- error_log-2016-09-13_004742 |-- error_log-2016-09-13_004743 |-- error_log-2016-09-13_004744 `-- error_log_current -> error_log-2016-09-13_004744
Запись осуществляется всегда в последний файл. В предыдущие файлы сервер не пишет, их можно свободно читать и удалять после того, как файл полностью обработан.
Нормальная работа в условиях высокой latency сети
Клиент Scribe отправляет данные небольшими пачками и ждет подтверждения с удаленной стороны перед тем, как отправить новую пачку. Это очень плохо работает, например, в случае пересылки событий через Атлантический океан, где задержка передачи данных составляет примерно 125 мс. Если максимальный размер пачки, к примеру, составляет 0,1 Мб, то за одну секунду таким способом можно передать лишь 0,1 Мб / 0,125 с = 0,8 Мб/с. Это ограничение можно обойти, если не ждать подтверждения для каждой пачки, а отправлять события в потоковом режиме.
Что предлагает LSD?
В целом основных претензий к Scribe у нас было всего две:
- Нестабильность и потеря данных при падении демона.
- При падении принимающего сервера трафик не перераспределяется между оставшимися серверами автоматически, требуется ручное вмешательство.
LSD решает эти две проблемы и удовлетворяет нашим требованиям по доставке событий, о которых мы говорили в начале.
Защита от потери данных при падении демона
Не бывает софта без ошибок, поэтому вместо того, чтобы постараться сделать LSD «неубиваемым» и всегда отвечающим за адекватное время, было решено пойти другим путем: клиенты будут всегда писать события в файлы, а LSD-клиент будет эти файлы читать и доставлять на нужные машины. Этот способ удобен еще и тем, что не требуется драйверов Thrift, Protocol Buffers и т.д., события можно отправлять хоть из shell-скрипта.
Чтобы отправить событие, нужно записать строку с этим событием в конец файла вида <category>/<filename>.log, где <category> — имя категории события. В качестве <filename> может служить любая монотонно возрастающая строка, основанная на текущей дате и времени. Такой формат выбран не случайно и позволяет пересылать на другие сервера события, доставленные с помощью LSD или Scribe. В качестве <filename> мы рекомендуем использовать дату и время в формате YYYYMMDDHHII (например, 201609131714). При выборе такого формата файлы создаются максимум раз в минуту и их имена монотонно возрастают.
Если нужно отправлять события размером больше 4 Кб ( stackoverflow.com/questions/1154446/is-file-append-atomic-in-unix ) из нескольких процессов, то нужно брать файловую блокировку перед записью события в файл, чтобы строки не перемешивались. Можно добавлять суффикс _big к имени файла и писать большие события в отдельный файл, чтобы не брать блокировку для маленьких событий.
Также поддерживается plain-формат вида <category>.log, и, в таком случае, создание поддиректории не требуется. Такой формат удобно использовать при отправке событий из shell-скриптов и для сбора логов.
Автоматическое перераспределение потока событий
Если падает один из серверов, на которые доставляются события, то они автоматически перераспределяются по оставшимся серверам. Если же один из серверов работает медленнее остальных, то в этом случае ему просто достанется такой поток событий, который он в состоянии принять.
Это также означает, что однократная доставка не гарантируется, поскольку недоступность сервера определяется на основании таймаута. Возможна ситуация, когда события могут успешно доставляться на сервер, но подтверждения об этом приходить не будут, или же будут приходить с большим опозданием. В таком случае LSD-клиент заново пошлет пачку событий, подтверждение для которой не пришло за таймаут (по умолчанию 30 секунд).
Доставка событий в режиме реального времени
Поскольку мы выбрали имя Live Streaming Daemon, нужно соответствовать :). Когда хватает пропускной способности сети и производительности сервера на принимающей стороне, доставка событий осуществляется в режиме реального времени — никаких искусственных задержек при доставке не вносится. Это удобно, если вы доставляете логи или создаете много промежуточных узлов для пересылки событий. С другой стороны, доставка в режиме реального времени требует большего количества ресурсов, чем если бы события накапливались и отправлялись раз в несколько секунд (с такими настройками мы использовали Scribe). Поэтому потребление CPU у LSD в среднем несколько выше, чем у Scribe, однако разница не очень значительна.
Производительность
К сожалению, мы не смогли измерить производительность Scribe на нашем потоке событий для внутренней системы аналитики UDS, поскольку scribe-клиенты падали под нагрузкой (про UDS не так давно рассказывал Александр Крашенников www.percona.com/blog/2016/08/29/percona-live-europe-featured-talk-with-alexander-krasheninnikov-processing-11-billion-events-a-day-with-spark-in-badoo ).
Один LSD-сервер легко справляется с потоком событий в 2 гигабита/с (400k событий/с), поступающим c тысяч серверов. Соответственно, чтобы принять поток в 1 миллион событий в секунду, нужно всего 3 сервера, при этом каждый из серверов должен быть оснащен двумя гигабитными сетевыми картами.
Open-source
Исходные коды LSD находятся на GitHub: github.com/badoo/lsd (для установки наберите команду go get github.com/badoo/lsd).
Демон работает под Linux и macOS, но для промышленного использования рекомендуется использовать Linux.
Помимо LSD, у нас есть большое количество других проектов, выложенных в open source, посмотреть и изучить которые вы можете в нашем техблоге: tech.badoo.com/ru/open-source
Юрий Насретдинов, старший разработчик, Badoo
Автор: Badoo