— Мы получаем больше миллиона твитов в день, и наш сервер просто не успевает их обрабатывать. Поэтому мы хотим установить на кластер Hadoop и распределить обработку.
Речь шла о вычислительно тяжёлом сентиментном анализе, поэтому я мог поверить, что у одного сервера действительно не хватает CPU, чтобы справиться с большим потоком твитов.
— А что вы собираетесь делать с уже обработанными данными?
— Скорее всего, мы будем складывать их в MySQL, как делали это раньше, или даже удалять.
— Тогда вам определённо не нужен Hadoop.
Мой бывший коллега был далеко не первым, кто говорил про распределённые вычисления на Hadoop. И каждый раз я видел полное непонимание того, зачем была придумана и разработана эта платформа.
Вообще говоря, при обработке данных встречается несколько типичных ситуаций, и для каждой из них существуют свои подходы и свои инструменты. Рассмотрим основные из них.
Точечная обработка активных данных
Самый частый сценарий, с которым нам всем приходится сталкиваться, это хранение «активных» данных — информации о пользователях, списка товаров, комментариев к статьям и т.д. — всего, что может часто меняться. В этом случае мы хотим иметь возможность обрабатывать данные точечно — извлекать нужный объект по индексу, обрабатывать его и загружать обратно. Именно такой функционал предоставляет большинство СУБД (причём как реляционных, так и NoSQL).
При масштабировании основная проблема здесь возникает с максимальным объёмом данных, хранимым в базе. Однако, даже если используемая СУБД не поддерживает распределённую структуру, проблема легко решается за счёт партиционирования на уровне приложения.
Потоковая обработка в реальном времени
Иногда, тем не менее, упор делается не на хранении, а на обработке данных. Именно с такой ситуацией столкнулся мой бывший коллега, занимающийся сентиментным анализом. Ему нужно было в реальном времени получать твиты, анализировать их и выводить результат на динамически генерируемом графике. Миллион ежедневных твитов не был проблемой — в конце концов, это не больше 140 миллионов символов или 280Мб при использовании кодировки UTF16. Проблемой был анализ этих твитов в реальном времени.
К счастью, большинство потоковых алгоритмов (будь то сентиментный анализ твитов, сбор куммулятивной статистики или онлайн машинное обучение) использует для своей работы небольшие, независимые друг от друга кусочки данных. Это позволяет легко распараллелить обработку, просто добавив больше вычислительных узлов и поставив перед ними балансировщик нагрузки.
В простейшем случае в качестве балансировщика может выступать брокер сообщений (такой как RabbitMQ или ZeroMQ), в более сложных можно использовать готовые фреймворки для потоковой обработки, такие как Storm. При этом основной код, непосредственно выполняющий обработку данных, практически не меняется по сравнению с односерверной версией.
Пакетная обработка исторических данных
Кроме активных и потоковых, существует также ещё один важный тип данных — исторические, т.е. те, которые были однажды сгенерированы и уже вряд ли когда-либо изменятся. Сюда входят журналы событий, финансовые показатели, индексы документов за определённый день, да и вообще любые данные, привязанные к некоторому моменту в прошлом. Чаще всего такие данные накапливаются в большом количестве и затем используются аналитиками для решения бизнес-задач. Отличительной чертой здесь является то, что на момент обработки необходимые данные уже собраны и разложены на множестве серверов (если данные умещаются на один сервер, то они просто не настолько большие).
Представим, что у нас есть данные обо всех покупках в крупной сети супермаркетов за последние пол года. Мы хотим проанализировать эти данные: посчитать среднюю эффективность каждого супермаркета, эффект от проведённых акций, корреляцию между купленными товарами и множество других метрик. Как организовать работу с данными таким образом, чтобы вычисление этих параметров занимало резонное время?
Мы можем загрузить все данные в распределённую базу Oracle и работать с ними так же, как с активными. Но в этом случае сервер приложений будет последовательно забирать данные из базы и последовательно обрабатывать каждую запись, что крайне неэффективно.
Мы также можем настроить конвеер для потоковой обработки, распределяя нагрузку между серверами приложений. Но рано или поздно мы упрёмся в канал связи между обрабатывающими узлами и узлами данных.
Единственный способ распределить нагрузку и не переполнить канал связи — минимизировать передвижение данных между узлами. А для этого необходимо максимум вычислений производить локально на тех машинах, где лежат обрабатываемые данные. Именно принцип локальности данных лежит в основе парадигмы MapReduce и всего Hadoop.
Подробней о MapReduce
Принято считать, что каждое задание MapReduce состоит из двух фаз — фазы map и фазы reduce. На самом деле всё несколько сложней: сначала данные разбиваются на сплиты (splits), затем над каждым из них выполняется функция map, затем результаты сортируются, затем комбинируются, затем снова сортируются и наконец передаются функции reduce. Однако именно map и reduce описывают основную идею парадигмы.
На этапе применения функции map выполняется вся работа, которую можно выполнить локально. Например, локально можно посчитать количество слов в строке или вычислить метрику одной записи в CSV файле, или проанализировать твит и т.д. Map обеспечивает прозрачное распараллеливание и минимальную нагрузку на канал связи.
Но одного локального map для большинства задач недостаточно: чтобы посчитать количество слов во всём тексте, нужно сложить их количество в каждой строке, а чтобы посчитать среднее значение метрики, нужно собрать вместе результаты по каждой записи из CSV файла. Именно это и является задачей функции reduce. При этом количество данных, передаваемых по сети, значительно снижается (немало в это помогает и функция комбинирования, которая выступает как локальный reduce).
Так почему же мой коллега был не прав, собираясь использовать Hadoop для сентиментного анализа твитов? Ведь, как уже было сказано выше, на фазе map можно проанализировать каждый твит по отдельности, полностью проигнорировав фазу reduce! Всё дело в инфраструктуре. Во-первых, твиты всё равно сначала придётся транспортировать к вычислительным узлам, а это означает потерю преимущества локальности данных. Во-вторых, Hadoop плохо подходит для онлайн обработки: работа ведётся с пакетами данных, а значит твиты придётся сначала собрать, а только затем запустить задание MapReduce. Даже если настроить задачу map на постоянное и бесконечное считывание твитов из источника, Hadoop через какое-то время убьёт всё задание как сбойное. Ну и, в-третьих, если вы услышите, что Hadoop быстрый, помните, что производительность достигается за счёт минимизации передвижения данных, в то время как сами задания MapReduce, особенно на небольших объёмах данных, могут выполняться довольно долго за счёт накладных расходов (запуск JVM, выполнение резервных заданий, запись промежуточных результатов на диск и т.д.).
Поэтому используйте правильные инструменты для правильных задач, и будет вам счатье!
Автор: ffriend