В этой статье я расскажу о Twitter Scalding – фреймворке для описания процесса обработки данных в Apache Hadoop. Я начну издалека, с истории фреймворков поверх Hadoop. Потом дам обзор возможностей Scalding. В завершение покажу примеры кода, доступные для понимания тем, кто знает Java, но почти не знаком со Scala.
Интересно? Поехали!
Проще, чем MapReduce
Когда парадигма MapReduce только зарождалась, это был прорывной шаг к упрощению разработки распределенных вычислений. Однако вскоре пришло осознание, что писать вручную mapper’ы и reducer’ы весьма утомительно. Для ускорения разработки появились высокоуровневые надстройки над Map/Reduce – Pig, Hive и Cascading, а потом и другие. Остановимся подробнее на последнем.
Cascading это Java-фреймворк для описания процесса обработки данных, т.н. workflow. После описания Cascading анализирует workflow подобно анализаторам запроса в СУБД, строит план выполнения в виде последовательности map/reduce задач и отправляет их на кластер Hadoop, самостоятельно управляя их запуском и промежуточными данными. К сожалению, Cascading оперирует достаточно низкоуровневыми абстракциями, поэтому по популярности он долгое время проигрывал другим механизмам обработки данных.
Из этой ситуации нашелся удачный выход. Twitter адаптировал Cascading под свои нужды и обернул его абстракции в традиционные для Scala средства. Так родился Scalding – Scala фреймворк поверх Cascading’а. Здесь можно сделать лирическое отступление и поговорить про Scala.
Итак, попытка применить стандартные подходы Scala к описанию workflow обработки данных увенчалась успехом, и у Scalding есть шанс догнать по популярности Hive, Pig и их многочисленные новомодные аналоги. Он получился настолько хорош, что ради него даже имеет смысл выучить Scala, чем мы сейчас и займемся.
Введение в Scalding
Сейчас я осознанно пропущу все, что касается внутреннего устройства Scalding и Cascading. Будем считать, что это черный ящик с симпатичным интерфейсом, который считает для нас какие-то данные. Если все сложится, потом будет еще одна статья о внутреннем устройстве этих фреймворков.
Кортеж (tuple) — это несколько объектов, скрепленных вместе. Классический пример кортежа — Pair, Triple и т.д. В Scala они — часть языка. Кортежи пишутся в скобках через запятую.
Дженерики пишутся в квадратных скобках, а не в угловых.
val longConstant: Long = 0L // final long longConstant = 0L;
var list: List[Int] // List<Integer> list;
(String, Int) // Pair<String, Integer>
Плоские операции
Основное понятие в Scalding – это Pipe. Pipe – это конвейер, из которого навстречу программисту летят данные. По сути, это похоже на Stream из Java 8. Первая реализация Pipe не поддерживала типизацию, но это длилось недолго. Любители строгой типизации придумали TypedPipe – конвейер с объектами строго заданного типа, generic, в терминах Java.
Для TypedPipe определены некоторые стандартные операции над потоком – map, flatMap, filter, limit и другие. Это все плоские операции над потоком, теоретически, их можно эффективно выполнять с неограниченным параллелизмом и на любом объеме данных.
Данные в TypedPipe должны откуда-то читаться. Для этого в Scalding’е есть Source – источник данных. Единственное его предназначение – породить Pipe или TypedPipe. Имеется несколько готовых источников, большинство из них – чтение из файлов в различных форматах, но есть также возможность чтения из произвольного итератора (а значит, и из коллекции в памяти) и, конечно, возможность определять свои источники. Что важно, один и тот же код Cascading и Scalding работает как на кластере Hadoop, так и на локальных данных, и это очень удобно для тестирования.
Когда все операции проделаны, наступает время сохранять данные. За запись на диск в Scalding отвечает Sink – завершающая часть конвейера. Sink'и аналогичны Source’ам, часто это один и тот же класс, реализующий два интерфейса.
Группирующие операции
MapReduce дает нам возможность проводить реорганизацию потока, представляемого TypedPipe. В первую очередь, это операция группировки groupBy, которая сгруппировывает записи со всего потока по ключу, аналог GROUP BY в SQL. После группировки TypedPipe[V] принимает особую форму Grouped[K, V], над которым становятся доступны дополнительные операции.
Во-первых, с помощью методов mapGroup и mapValuesStream можно получить элементы Grouped[K, V] в виде пары из ключа K, по которому шла группировка, и итератора по всем значениям V, которые к этом ключу подошли. К итератору по значениям применимы любые функции коллекций Scala. Но обычно этого даже не требуется, т.к. у Grouped есть много функций-шорткатов, которые покрывают наиболее популярные кейсы.
Во-вторых, Grouped можно отсортировать операцией sortBy. После этого к нему также применимы mapGroup, mapValuesStream и все их производные.
В-третьих, Grouped[K, V1] можно объединять (join) с другим Grouped[K, V2]. Здесь работают те же правила, что в реляционных базах данных – доступны leftJoin, rightJoin, join (inner), outerJoin (full). На выходе получается Grouped[K, (V1, V2)].
Стоит отметить, что когда несгруппированный поток содержит пары TypedPipe[(K, V)], к нему можно применить операцию hashJoin. Она аналогична обычному Grouped.join, но делается в памяти. Это хорошо работает для обогащения данных из маленьких справочников, но для больших таблиц может привести к OOM.
Grouped можно преобразовать обратно в TypedPipe операциями toTypedPipe, keys или values. Первая сохранит и ключ, и значение, остальные вернут что-то одно.
Scalding by example
Теперь, после обзора основных возможностей фреймворка давайте посмотрим, как это работает, на примере.
Допустим, мы RTB площадка, и у нас есть история кликов наших пользователей по урлам на наблюдаемых сайтах. История представлена в огромном TSV файле с тремя колонками – URL, Timestamp и UserId.
Также у нас есть разметка сайтов по тематикам. Сайтов у нас не очень много, максимум, тысячи. Вся разметка помещается в небольшом TSV файле с колонками – Domain и Topic.
Мы хотим понять, как часто пользователь переключается между тематиками. Для этого нам нужно оставить в истории кликов только те события, когда пользователь переходит с сайта одной тематики на сайт другой.
Напишем код, который будет делать за нас эту трансформацию. Инфраструктуру запуска рассматривать не будем. Если интересно, полный код примера доступен на github.
В Scala можно задавать алиасы для типов. Это удобно, т.к. позволит различать нам один String от другого в декларациях типов.
type Domain = String
type UserId = String
type Topic = String
type Url = String
type Timestamp = Long
Теперь объявим классы из доменной модели:
case class Click(url: Url, ts: Timestamp, userId: UserId)
case class SiteInfo(domain: Domain, topic: Topic)
Case class в Scala – это удобный способ описания классов для неизменяемых значений. Из него автоматически генерируется конструктор, геттеры и прочий однотипный код.
Прочтем таблицу с кликами:
val clicksPipe: TypedPipe[Click] =
TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks))
.map(tuple => Click.tupled(tuple))
Здесь мы объявили источник – типизированный TSV с колонками типа (String, Long, UserId). Потом мы обернули этот источник в TypedPipe. Далее, для удобства, мы сконвертировали кортежи из трех колонок (Url, Timestamp, UserId) в объекты класса Click.
Получился TypedPipe[Click].
Оставим от урлов только домены.
def url2domain(url: Url): Domain = {
return new URL(url).getHost
}
val domainsPipe: TypedPipe[Click] =
clicksPipe
.map(click => click.copy(url = url2domain(click.url)))
Прочтем справочник, где домены разделены по тематикам, и сразу сгруппируем его в виде, пригодном для hashJoin.
val sitesGroupByDomain: Grouped[Domain, SiteInfo] =
TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites))
.map(tuple => SiteInfo.tupled(tuple))
.groupBy(siteInfo => siteInfo.domain)
Добавим к потоку кликов информацию о тематиках сайтов. Для этого сджойним поток кликов со справочником доменов.
val clicksWithSiteInfo: TypedPipe[(Domain, (Click, SiteInfo))] =
domainsPipe
.map(click => (click.url, click))
.hashJoin(sitesGroupByDomain)
Сгруппируем поток кликов по пользователям и отсортируем по таймстемпу клика. Кроме того, нас больше не интересует информация о домене, достаточно только информации о тематике сайта. Для этого введем вспомогательный класс, отражающий активный интерес пользователя к тематике в момент времени.
case class TopicActivity(topic: Topic, ts: Timestamp, userId: UserId)
val topicActivityStreamPerUser: SortedGrouped[UserId, TopicActivity] =
clicksWithSiteInfo
.map(tuple => {
val (domain, (click, siteInfo)) = tuple
TopicActivity(siteInfo.topic, click.ts, click.userId)
})
.groupBy(activity => activity.userId)
.sortBy(activity => activity.ts)
Самый сложный момент – в потоке пользовательских активностей нужно отловить моменты переключения тематик. Для отлавливания переключений напишем функцию на Scala в Java-стиле. Она накапливает результат в ArrayBuffer (аналог ArrayList), что потенциально может привести к OOM на очень длинных историях.
def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = {
val result = ArrayBuffer[TopicActivity]()
var firstTs = 0l
var lastTopic = null.asInstanceOf[Topic]
for (activity <- activities) {
if (firstTs == 0l || lastTopic != activity.topic) {
result.append(activity)
firstTs = activity.ts
lastTopic = activity.topic
}
}
result.toIterator
}
val firstTopicActivitiesPipe: TypedPipe[TopicActivity] =
topicActivityStreamPerUser
.mapGroup((userId, activities) => topicSwitches(userId, activities))
.values
В потоке остались только первые активности каждого интереса. По ним можно проследить, как менялся фокус интереса пользователя в течение времени. Осталось записать результат в файл.
firstTopicActivitiesPipe
.map(activity => (activity.topic, activity.ts, activity.userId))
.write(TypedTsv(args.required("output")))
Вот и все. Мы описали нетривиальную трансформацию данных буквально в 40 строк.
Итоговый код в scala-way
Если следовать каноничному scala-way, то код получится еще короче. Кроме того, можно переписать функцию поиска переключений между тематиками с итеративного подхода на функциональный, убрав использование буфера. Теперь процесс не упадет даже на бесконечном входе. Теоретически…
def topicSwitches(userId: UserId, activities: Iterator[TopicActivity]): Iterator[TopicActivity] = {
activities.scanLeft(Helper())((helper, activity) => {
if (helper.topic.isEmpty || helper.topic.get != activity.topic) {
Helper(Some(activity.topic), activity.ts, true)
} else {
Helper(helper.topic, helper.firstTs, false)
}
}).filter(_.firstVisit).map(helper => TopicActivity(helper.topic.get, helper.firstTs, userId))
}
TypedPipe.from(TypedTsv[(Url, Timestamp, UserId)](pathToClicks))
.map(tuple => Click.tupled(tuple))
.map(click => click.copy(url = new URL(click.url).getHost))
.map(click => (click.url, click))
.hashJoin(
TypedPipe.from(TypedTsv[(Domain, Topic)](pathToSites))
.map(tuple => SiteInfo.tupled(tuple))
.groupBy(_.domain)
)
.map({case (_, (click, siteInfo)) => TopicActivity(siteInfo.topic, click.ts, click.userId)})
.groupBy(_.userId)
.sortBy(_.ts)
.mapGroup(topicSwitches)
.values
.write(TypedTsv(outputPath))
В следующих статьях я разберу вопросы организации кода поточной обработки данных и их тестирования. И, под конец, расскажу, как все это работает изнутри.
Ресурсы
Полный код примера на github
Scalding Wiki
Книга “Programming MapReduce with Scalding”
Автор: fediq