Лучше день потерять, но потом за пять минут долететь (с)
Привет коллеги.
Хочу поделиться с вами соображениями о том, чем отличаются фреймворки Hive и Pig, входящие в экосистему Hadoop. По сути, это два очень похожих продукта, цель у которых одна — взять на себя всю техническую реализацию MapReduce, предоставив взамен возможность описывать процесс обработки данных на более абстрактном уровне. В этой статье мы увидим как выглядят выборки в этих двух системах, и попытаемся понять, в каких случаях надо использовать то или иное решение.
Hive
Итак, начнем с Hive. Его основная фишка — это SQL-подобный язык запросов HQL (Hive query language). Он позволяет работать с данными привычным нам способом, так, как если бы мы работали с обычной реляционной базой. Скрипты можно запускать как через консоль, так и с помощью командной строки.
Hive это:
- SQL-подобный язык HQL
- Интерактивная консоль
- Встроенные функции агрегации
- Поддержка пользовательских функций (UDF)
- Данные — как таблица
Hive умеет работать:
- с текстовыми файлами (можно задать разграничительный символ)
- с сжатыми текстовыми файлами (Gzip, Bzip)
- с массивами, словарями, объединениями (union)
- имеет огромное количество встроенных функций для работы с: коллекциями, датами, строками, JSON-ми
- с математическими функциями (округление, логарифмы, корни, тригонометрия)
- с функциями агрегации (sum, min, max, avg...)
- Если всего перечисленного выше не хватило, то можно использовать кастомные функции, а также мэпперы и редьюсеры (python, java)
Простой пример:
--Создадим внешнюю таблицу. (Описание структуры лога)
CREATE EXTERNAL TABLE win_bids_log (
date_field string,
request_id string,
user_ssp_id string,
dsp_id string,
win_price int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
LOCATION 'hdfs://inpit/bid-logs';
CREATE EXTERNAL TABLE win_bids_by_dsp (
dsp_id string,
win_bids_cout int,
win_price int
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
STORED AS TEXTFILE
LOCATION ''hdfs://output/win-bids-by-dsp'';
INSERT OVERWRITE TABLE win_bids_by_dsp
SELECT dsp_id, COUNT(dsp_id), SUM(win_price) FROM win_bids_log GROUP BY dsp_id;
Как видите, всё довольно просто и понятно. Довольно таки приятно писать запросы, на знакомом языке. Но продолжается это счастье до тех пор, пока не приходится столкнуться с более сложными запросами.
Пример посложнее:
INSERT OVERWRITE TABLE movieapp_log_stage
SELECT * FROM (
SELECT custid, movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid, time,
CAST((CASE recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT) recommended, activity,
CAST(null AS INT) rating, price
FROM movieapp_log_avro
WHERE activity IN (2,4,5,11)
UNION ALL
SELECT
m1.custid, m1.movieid,
CASE WHEN m1.genreid > 0 THEN m1.genreid ELSE -1 END genreid, m1.time,
CAST((CASE m1.recommended WHEN 'Y' THEN 1 ELSE 0 END) AS INT) recommended,
m1.activity, m1.rating,
CAST(null as float) price
FROM movieapp_log_avro m1
JOIN (
SELECT custid,movieid,
CASE WHEN genreid > 0 THEN genreid ELSE -1 END genreid,MAX(time) max_time,
activity
FROM movieapp_log_avro
GROUP BY custid, movieid, genreid, activity
) m2
ON (
m1.custid = m2.custid
AND m1.movieid = m2.movieid
AND m1.genreid = m2.genreid
AND m1.time = m2.max_time
AND m1.activity = 1
AND m2.activity = 1
)
) union_result;
Разобраться конечно можно, но всё же стоит признать, что в данном случае определенно не хватает какой-то упорядоченности. Разложить бы это всё по полочкам, да с комментариями. Не так ли?
Итого:
Hive плюсы:
- Старый, добрый SQL — хорош для описания выборок. Да и просто все его знают.
- MapReduce под капотом. Уходит много оверхеда, связанного с обвязкой вокруг MR. Описание моделей данных, входных и выходных форматов, цепочек MR задач.
- Интерактивность. Хорош для анализа данных в разных срезах.
- Быстрота разработки
- Отсутствие зависимостей, компиляции, сборки (всё это скрыто)
Hive Минусы:
- Не всё можно уложить в парадигму HQL
- Это хорошо ложится в голову, при простых выборках. Но с ростом сложности становится всё труднее и труднее их понимать. Особенно если выборку писали не вы
Pig
Поговорим теперь о Pig. Он основан на процедурном языке Pig Latin. Чтобы в нем разобраться нужно потратить какое то время.
Давайте разберемся и походу выясним отличия от Hive
Pig это:
- язык Pig Latin
- Интерактивная консоль
- Встроенные функции агрегации
- Поддержка пользовательских функций (UDF)
- Данные — в виде структур (Tuple, Bag)
Pig умеет работать:
- с текстовыми файлами (можно задать разграничительный символ)
- с сжатыми текстовыми файлами (Gzip, Bzip)
- с массивами, словарями, объединениями (union)
- имеет огромное количество встроенных функций для работы с: датами, строками, структурами
- с математическими функциями (округление, логарифмы, корни, тригонометрия)
- с функциями агрегации (sum, min, max, avg...)
- Если всего перечисленного выше не хватило, то можно использовать кастомные функции (jython, java)
Как видите, Pig умеет всё то же, что и Hive. Отличие лишь в представлении данных и в языке. Но именно это отличие выводит работу с Pig совершенно на другой уровень.
Рассмотрим Pig подробнее.
Данный фреймфорк работает со специальными структурами данных — Tuple и Bag.
- Tuple — упорядоченный набор полей. Структура, к полям которой можно обращаться по индексу и/или имени.
- Bag — коллекция (множество) Tuple.
Pig Latin базовые функции:
- LOAD
- STORE
- GENERATE
- JOIN
- GROUP
- FILTER
- UNION
- DISTINCT
- ORDER
Давайте рассмотрим на примере, как можно трансформировать данные в процессе работы с Pig. Работать будем с log файлом RTB биржи. Данные представлены в следующем виде:
- time — врмемя
- bid_id — идентификатор ставки,
- user_id — идентификатор пользователя,
- dsp_id — идентификатор биддера (игрока)
- bid — ставка
Pig — загрузка данных (LOAD)
Для загрузки используется функция LOAD, также мы указываем разделительный символ 't' и сигнатуру данных (при необходимости можно указывать тип).
--почистим выходную директорию HDFS (Pig поддерживает команды Hadoop)
fs -rm -f -r -skipTrash /data/pig/out
--загрузим данные в переменную 'raw_data'
raw_data = LOAD '/data/pig/example/' USING PigStorage('t') AS (time, bid_id, user_id, dsp_id, bid:int);
На выходе мы получим вот такую структуру (Tuple). В запросах к её полям можно обращаться через точку. Например: raw_data.dsp_id
raw_data -> tuple с именованными полями.
-------------------------------------------------------------------------------------------
time, bid_id, user_id, dsp_id, bid
-------------------------------------------------------------------------------------------
(2014.02.14 14:08:27.711, 56949, 45234534553459, DSP-2, 12)
(2014.02.14 14:08:28.712, 61336, 45221696259999, DSP-1, 56)
(2014.02.14 14:08:29.713, 74685, 45221699381039, DSP-2, 89)
(2014.02.14 14:08:30.714, 56949, 45221695781716, DSP-1, 21)
(2014.02.14 14:08:25.715, 27617, 45221682863705, DSP-3, 22)
Pig — итеративная обработка данных (FOREACH — GENERATE)
FOREACH — GENERATE позволяет итеративно «бежать» по набору данных и применять к каждой записи какие-либо операции, или просто отдать на выход определенные поля, убрав всё не нужное.
--Нормализуем данные. Обрежем timestamp с помощью SUBSTRING
norm_data = FOREACH raw_data GENERATE SUBSTRING(time, 0,10) AS date, dsp_id, bid;
На выходе получаем то же самое множество, но с обрезанной датой, и только двумя полями: dsp_id, bid.
norm_data -> tuple с именованными полями и обрезанной датой
---------------------------------------
date, dsp_id, bid
---------------------------------------
(2014.02.14, DSP-2, 12)
(2014.02.14, DSP-1, 56)
(2014.02.14, DSP-2, 89)
(2014.02.14, DSP-1, 21)
Pig — группировка данных (GROUP)
GROUP — позволяет группировать данные, при этом выдавая на выход нетривиальную структуру.
--Сгруппируем по dsp_id и date
group_norm_data = GROUP norm_data BY (dsp_id, date);
На выходе имеем:
группу в качестве ключа. К ней можно обращаться через префикс group.
и коллекцию агрегатов с префиксом norm_data
group_norm_data -> (группа как ключ) : [ (norm_data), (norm_data) ]
----------------------------------------------------------------------------------
( group), array of norm_data
----------------------------------------------------------------------------------
( (DSP-1, 2014.02.14), {(2014.02.14, DSP-1, 56), (2014.02.14, DSP-1, 21)} )
( (DSP-1, 2014.02.17), {(2014.02.17, DSP-1, 34), (2014.02.17, DSP-1, 24)} )
( (DSP-2, 2014.02.14), {(2014.02.14, DSP-2, 89), (2014.02.14, DSP-2, 12)} )
Pig — развертка агрегатов (FLATTEN)
Иногда необходимо развернуть агрегаты в линейную структуру («выпрямить»).
Для этого существует функция FLATTEN
-- Разворачиваем агрегаты в линейную структуру
ft_group_norm_data = FOREACH group_norm_data GENERATE FLATTEN(group), FLATTEN(norm_data);
Из сложной сгруппированной структуры мы получаем прямолинейный набор Tuples.
ft_group_norm_data -> tuple с именованными полями
----------------------------------------------------------------------
dsp_id, date date dsp_id bid
-----------------------------------------------------------------------
(DSP-1, 2014.02.14, 2014.02.14, DSP-1, 56)
(DSP-1, 2014.02.14, 2014.02.14, DSP-1, 21)
(DSP-1, 2014.02.15, 2014.02.15, DSP-1, 15)
(DSP-1, 2014.02.15, 2014.02.15, DSP-1, 31)
Pig — функции агрегации (SUM)
Давайте что-нибудь посчитаем. Например, сумму дневных ставок, сделанных каждым биддером.
--Вычислим сумму дневных ставок, сделанных каждым биддером
sum_bids_dsp = FOREACH group_norm_data GENERATE group, SUM(norm_data.bid) AS bids_sum;
sum_bids_dsp -> группа : bids_sum
------------------------------------------------------
group, bids_sum
------------------------------------------------------
( (DSP-1, 2014.02.16), 82)
( (DSP-1, 2014.02.17), 58)
( (DSP-2, 2014.02.14), 101)
( (DSP-2, 2014.02.16), 58)
Pig — GROUP ALL
Часто необходимо посчитать количество «записей» в выборке. Просто применить COUNT к выборке — не удастся. Данные надо свернуть в одну группу и уже затем применить функции агрегации.
--Вычислим общую сумму, и количество групп.
--Для этого свернем всё в одну группу.
group_all = GROUP sum_bids_dsp ALL;
На выходе имеем группу — «all» и коллекцию всех предыдущих агрегатов.
( all, { ((DSP-1,2014.02.14),77), ((DSP-1,2014.02.15),67), ((DSP-1,2014.02.16),82),((DSP-1,2014.02.17),58),((DSP-2,2014.02.14),101),((DSP-2,2014.02.16),58),((DSP-2,2014.02.17),123),((DSP-3,2014.02.14),22),((DSP-3,2014.02.15),109),((DSP-3,2014.02.16),136),((DSP-3,2014.02.17),81) } )
теперь вычислим количество и сумму
summary = FOREACH group_all GENERATE COUNT(sum_bids_dsp), SUM(sum_bids_dsp.bids_sum);
Выход
------------------------------------------------------
count, sum
------------------------------------------------------
(11, 914)
По-моему, это то, что нужно. Обработка данных представлена в упорядоченном виде. Всё легко разбивается на шаги. Каждый этап можно снабжать комментариями.
Итого:
Pig плюсы:
- Процедурный подход. Упорядоченность! Язык позволяет разбивать логику на блоки, каждый шаг можно развернуто описывать комментариями.
- Формирование MapReduce под капотом. Уходит много оверхеда, связанного с обвязкой вокруг MR. Описание моделей данных, входных и выходных форматов, цепочек MR задач.
- Интерактивность. Хорош для анализа данных в разных срезах.
- Быстрота разработки. Отсутствие зависимостей, сборки
Pig Минусы:
- Не всё можно уложить в язык Pig Latin
- Pig Latin вместе со структурами данных более сложен, в отличии от HQL
- Для UDF используется Jython. Это может ограничить в использовании некоторых библиотек.
Резюме:
- Hive хорош для небольших и несложных выборок. HQL похож на SQL, поэтому можно очень быстро начать работать с данным фреймворком.
- Pig Требует изучения языка и структур данных. Но зато, разобравшись один раз, вы получаете более мощный инструмент, в котором легче реализовывать сложные и многоступенчатые выборки. Вы получаете простой и упорядоченный код, с доступными и уместными комментариями.
Если вы и ваши коллеги отлично знаете SQL, работаете с ним ежедневно, и вас не смущают зубодробительные запросы, то Hive это отличное решение. Однако, если вы работаете с SQL эпизодично и ваш data workflow не укладывается в простые запросы, то однозначно стоит потратить день и разобраться с Pig. В дальнейшем это может сэкономить много времени вам, и вашим коллегам.
Автор: 2ANikulin