Spotify: миграция подсистемы событий в Google Cloud (часть 1)

в 9:19, , рубрики: cloud, GCP, Google, Google Cloud Platform, Pubsub, Spotify, Блог компании Google, высокая производительность, Системы обмена сообщениями, метки:

Всякий раз, когда пользователь выполняет действие в клиенте Spotify – такое как, например, прослушивание песни или поиск исполнителя – небольшое количество информации, событие, отправляется на наши сервера. Доставка событий, процесс безопасной и надежной транспортировки информации от клиентов по всему миру к нашей центральной системе обработки, — интересная задача. В серии этих статей мы рассмотрим некоторые решения, которые реализовали в этой области. Если быть более точными, то мы рассмотрим архитектуру нашей новой системы доставки событий и расскажем, почему решили развернуть ее в Google Cloud.

В этой первой статье мы объясним то, как работает наша текущая система доставки событий и расскажем о некоторых уроках, которые извлекли в ходе работы с ней. В следующей – рассмотрим создание новой системы и то, почему мы выбрали Cloud Pub/Sub в качестве транспортного механизма для всех событий. В третьей, и последней, статье мы объясним, как работаем со всеми событиями с помощью DataFlow, и насколько быстро все это происходит.

image

События, распространяющиеся через нашу систему доставки, имеют множество применений. Большая часть наших решений в продуктовом дизайне основана на результатах A/B тестов, а они, в свою очередь, должны опираться на большие и точные данные. Плейлист Discover Weekly, запущенный в 2015, быстро стал одной из самых используемых функций в Spotify. Он создается на основе данных о проигрывании музыки. Year in music, Spotify Party и множество других функций Spotify также основаны на данных. Кроме того, данные Spotify служат одним из источников для составления топов Billboard.

Наша система доставки сообщений является одной из основополагающих частей инфраструктуры данных Spotify. Ключевое требование к ней – доставка всех данных с предсказуемой задержкой и доступность для наших разработчиков через хорошо описанный интерфейс. Данные об использовании можно описать как набор структурированных событий, сформированный в некий момент времени как ответ на некоторые заранее определенные действия.

Большинство событий, который используются в Spotify, непосредственно генерируются клиентами Spotify как ответ на определенные действия пользователей. Всякий раз, как в клиенте Spotify происходит событие, информация о нем отправляется на один из шлюзов Spotify, который записывает его в системный лог. Там ему присваивается временная отметка, которая используется в системе доставки сообщений. Для того, чтобы гарантировать определенную задержку и законченность доставки сообщения, было решено для события использовать метку лога (syslog timestamp), а не клиента, так как у нас нет контроля над событием до того, как оно попало на наши сервера.

В случае Spotify, все данные надо доставить в центральный Hadoop кластер. Сервера Spotify, на которых мы собираем данные, расположены в нескольких дата-центрах на двух континентах. Полоса пропускания между нашими дата-центрами является дефицитным ресурсом и необходимо относиться к передаче данных с особой тщательностью.

Интерфейс данных определяется местоположением данных в Hadoop и форматом, в котором они хранятся. Все данные, которые доставляются нашим сервисом, записываются в Avro формате в HDFS. Доставленные данные разбиты на разделы (партиции) по 60 минут (час). Это пережиток прошлого, когда первая система доставки сообщений была основана на scp команде и ежечасном копировании syslog файлов со всех серверов на Hadoop. Поскольку все процессы обработки данных сегодня в Spotify завязаны на почасовых данных, этот интерфейс останется с нами и в обозримом будущем.

Большинство процессов работы с данными в Spotify читают данные из часовой сборки лишь один раз. Выходные значения одних процессов могут служить входными данными для других, формируя, таким образом, длинные цепочки преобразований. После того, как процесс обработал данные за час, он уже не проводит никаких проверок в этом исходном часе на изменения. Если данные изменились, единственный способ воспроизвести дальше эти изменения — вручную перезапустить все соответствующие задачи (и относящиеся к ним задачи) для данного конкретного интервала (часа). Это дорогостоящий и трудоемкий процесс, вот почему мы выдвигаем такие требования к службе доставки сообщений и после предоставления часового сета уже не можем дополнять в нем никакие данные. Эта проблема, известная как проблема полноты данных, противопоставляется требованию минимальной задержки при обработке данных. Интересная точка зрения на проблему полноты данных изложена в докладе Dataflow от Google.

Первоначальная система доставки сообщений

Системная архитектура

Наша первоначальная система доставки сообщений была построена поверх Kafka 0.7.

image

В ней система доставки событий выстроена вокруг абстракции почасовых файлов. Она предназначена для потоковой передачи файлов логов, которые содержат события, от сервисных машин к HDFS. После того, как все лог файлы за определенный час переданы на HDFS, они преобразовываются из текста с табуляциями в формат Avro.

Когда система только создавалась, одной из недостающих функций Kafka 0.7 была способность кластера Kafka Broker работать надежным постоянным хранилищем. Это повлияло на принятие важного проектного решения – не поддерживать постоянные состояния между производителем данных, Kafka Syslog Producer и Hadoop. Событие считается надежно сохраненным только тогда, когда оно записано в файл на HDFS.

Проблема с надежным существованием события только внутри Hadoop состоит в том, что кластер Hadoop становится единой точкой сбоя для системы доставки сообщений. Если Hadoop выйдет из строя, то вся система доставки остановится. Чтобы справиться с этим, мы должны убедиться в том, что у нас есть достаточно дискового пространства на всех сервисах, с которых мы собираем события. Когда Hadoop вернется в строй, нам нужно «догнать» его состояние, передав все данные настолько быстро, насколько это возможно. Время восстановления в основном ограничивается пропускной способностью, которую мы можем задействовать между нашими дата-центрами.

Продюсер (Producer) – это демон, который запущен на каждом хосте, с которого мы хотим отправлять события в Hadoop. Он отслеживает лог-файлы и отсылает пакеты логов в Kafka Syslog Consumer. Producer ничего не знает от типе события или свойствах, которые могут у него быть. С его точки зрения, событие это просто набор строк в файле и все строки перенаправляются в одинаковый канал. Это означает, что события всех типов, содержащихся в одном лог-файле, также передаются через один канал. В такой системе, топики Kafka используются в качестве каналов для передачи событий. После того, как Продюсер отправляет логи к Потребителю (Consumer), ему надо дождаться подтверждения (ACK), что Consumer успешно сохранил строки лога в HDFS. Только после того, как продюсер получает ACK для отправленных логов, он считает, что они надежно сохранены и переходит к передачи других записей.

Для событий, чтобы попасть от Producer к Consumer, нужно пройти Kafka Brokers и затем Kafka Groupers. Kafka Brokers — стандартный компонент Kafka, а Kafka Groupers — это компонент, написанный нами. Groupers обрабатывает все потоки событий от локальных дата-центров и затем публикует их снова сжатыми, эффективно сгруппированными в одном топике, которая затем вытягивается Consumer.

Задача Extract, Transform and Load (ETL) используется для преобразования данных из простого формата с разделением табуляцией в Avro формат. Этот процесс — обычная Hadoop MapReduce работа, внедренная с использованием фреймворка Crunch, который работает с почасовыми наборами. Прежде, чем начать работу с определенным часом, ему надо убедиться, что все файлы полностью переданы.

Все Продюсеры постоянно отправляют контрольные метки, которые могу содержать end-of-file маркеры. Эти маркеры отправляются лишь единожды, когда Producer пришел к выводу, что весь файл был надежно сохранен на Hadoop. Монитор состояния (или «живучести») постоянно опрашивает наши системы обнаружения сервисов во всех дата-центрах о том, какие сервис-машины работали в определенный час. Чтобы проверить, все ли файлы были окончательно переданы за этот час, ETL сравнивает информацию о серверах, от которых ему стоит ожидать данные, с end-of-file маркерами. Если ETL определяет расхождение и неполную передачу данных, то он задерживает обработку данных для определенно часа.

Для того, чтобы иметь возможность максимально использовать имеющиеся маперы и редюсеры, ETL, который является обычной задачей Hadoop MapReduce, надо знать, как шардить входные данные. Маперы и редюсеры рассчитываются на основе размера входных данных. Оптимальный шардинг рассчитывается на основе количества событий, непрерывно поступающих от Consumer-ов.

Уроки

Одной из основных проблем, связанных с такой конструкции является то, что локальные Продюсеры должны убедиться в том, что данные сохранились в HDFS в центральной локации до того, как их можно счесть надежно доставленными. Это означает, что Producer сервера на западном побережье США должен знать, когда данные запишутся на диск в Лондоне. Большую часть времени это работает просто прекрасно, но если передача данных замедлится, то это вызовет задержки в доставке, от которых потом будет трудно избавиться.

Сравните это с вариантов, когда точка обслуживания находится в локальном дата-центре. Это упрощает конструкцию Продюсера, так как, обычно, сеть между хостами в центре обработки данных очень надежна.

Абстрагируясь от проблем, мы были вполне довольны системой, которая может надежно доставлять более 700,000 событий в секунду со всего света. Редизайн системы также дал нам возможность улучшить процесс разработки программного обеспечения.

Отправляя все события вместе через один канал, мы теряли гибкость управления потоками событий с разным качеством обслуживания (QoS). Это также ограничивало работу в реальном времени, так как любой процесс, работающий в реальном времени, должен был передавать свои данные через единственный канал, в котором идет весь поток, и отфильтровывать из него только нужное.

Передача неструктурированных данных добавляет ненужную задержку, так как требует дополнительного ETL-преобразования. В настоящее время, ETL работа добавляет около 30 минут задержки в доставку события. Если бы данные пересылались в формате Avro, то они сразу же были доступны при записи на HDFS.

Необходимость отправителю отслеживать завершение часа тоже вызывало проблемы. Например, если машина умирает, то не может послать сообщение о конце файла. Если end-of-file маркер теряется, то мы будем ждать вечно до тех пор, пока этот процесс не прервут вручную. По мере роста числа машин, эта проблема становится все более актуальной.

Следующие шаги

Количество доставленных сообщений в Spotify постоянно увеличивается. В результате повышенных нагрузок, мы стали испытывать все больше проблем. Со временем, количество отключений стало тревожить нас. Мы поняли, что ни мы, ни система, больше не в состоянии справляться с повышенной нагрузкой. Как раз в следущей статье расскажем, о том, как решили изменить нашу систему.
Spotify: миграция подсистемы событий в Google Cloud (часть 1) - 3
Количество сообщений, обрабатываемых нашей системой, в определенный момент времени.

Автор: Google

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js