Вступление
В последнее время проект Apache Spark привлекает к себе огромное внимание, про него написано большое количество маленьких практических статей, он стал частью Hadoop 2.0. Плюс он быстро оброс дополнительными фреймворками, такими, как Spark Streaming, SparkML, Spark SQL, GraphX, а кроме этих «официальных» фреймворков появилось море проектов — различные коннекторы, алгоритмы, библиотеки и так далее. Достаточно быстро и уверенно разобраться в этом зоопарке при отсутсвие серьезной документации, особенно учитывая факт того, что Spark содержит всякие базовые кусочки других проектов Беркли (например BlinkDB) — дело непростое. Поэтому решил написать эту статью, чтобы немножко облегчить жизнь занятым людям.
Небольшая предыстория:
Spark — проект лаборатории UC Berkeley, который зародился примерно в 2009г. Основатели Спарка — известные ученые из области баз данных, и по философии своей Spark в каком-то роде ответ на MapReduce. Сейчас Spark находится под «крышей» Apache, но идеологи и основные разработчики — те же люди.
Spoiler: Spark в 2-х словах
Spark можно описать одной фразой так — это внутренности движка массивно-параллельной СУБД. То есть Spark не продвигает свое хранилище, а живет сверх других (HDFS — распределенная файловая система Hadoop File System, HBase, JDBC, Cassandra,… ). Правда стоит сразу отметить проект IndexedRDD — key/value хранилище для Spark, которое наверное скоро будет интегрировано в проект.Также Spark не заботится о транзакциях, но в остальном это именно движок MPP DBMS.
RDD — основная концепция Spark
Ключ к пониманию Spark — это RDD: Resilient Distributed Dataset. По сути это надежная распределенная таблица (на самом деле RDD содержит произвольную коллекцию, но удобнее всего работать с кортежами, как в реляционной таблице). RDD может быть полностью виртуальной и просто знать, как она породилась, чтобы, например, в случае сбоя узла, восстановиться. А может быть и материализована — распределенно, в памяти или на диске (или в памяти с вытеснением на диск). Также, внутри, RDD разбита на партиции — это минимальный объем RDD, который будет обработан каждым рабочим узлом.
Все интересное, что происходит в Spark, происходит через операции над RDD. То есть обычно приложения для Spark выглядит так — создаем RDD (например достаем данные из HDFS), мусолим его (map, reduce, join, groupBy, aggregate, reduce, ...), что-нибудь делаем с результатом — например кидаем обратно в HDFS.
Ну и уже исходя из этого понимания следует Spark рассматривать как параллельную среду для сложных аналитических банч заданий, где есть мастер, который координирует задание, и куча рабочих узлов, которые участвуют в выполнении.
Давайте рассмотрим такое простое приложение в деталях (напишем его на Scala — вот и повод изучить этот модный язык):
Пример Spark приложения (не все включено, например include)
Мы отдельно разберем, что происходит на каждом шаге.
def main(args: Array[String]){
// Инициализация, не особо интересно
val conf = new SparkConf().setAppName(appName).setMaster(master)
val sc = new SparkContext(conf)
// Прочитаем данные из HDFS, сразу получив RDD
val myRDD = sc.textFile("hdfs://mydata.txt")
// Из текстового файла мы получаем строки. Не слишком интересные данные.
// Мы из этих строк сделаем кортежи, где первый элемент (сделаем его потом
// ключем) - первое "слово" строки
val afterSplitRDD = myRDD.map( x => ( x.split(" ")( 0 ), x ) )
// Сделаем группировку по ключу: ключ - первый элемент кортежа
val groupByRDD = afterSplitRDD.groupByKey( x=>x._1 )
// Посчитаем кол-во элементов в каждой группе
val resultRDD = groupByRDD.map( x => ( x._1, x._2.length ))
// Теперь можно записать результат обратно на HDFS
resultRDD.saveAsTextFile("hdfs://myoutput.txt")
}
А что же там происходит?
Теперь пробежимся по этой программе и посмотрим что происходит.
Ну во-первых программа запускается на мастере кластера, и прежде чем пойдет какая-нибудь параллельная обработка данные есть возможность что-то поделать спокойно в одном потоке. Далее — как уже наверное заметно — каждая операция над RDD создает другой RDD (кроме saveAsTextFile). При этом RDD все создаются лениво, только когда мы просим или записать в файл, или например выгрузить в память на мастер — начинается выполнение. То есть выполнение происходит как в плане запроса, конвеером, где элемент конвеера — это партиция.
Что происходит с самой первой RDD, которую мы сделали из файла HDFS? Spark хорошо синтегрирован с Hadoop, поэтому на каждом рабочем узле будет закачиваться свое подмножество данных, и закачиваться будет по партициям (которые в случае HDFS совпадают с блоками). То есть все узлы закачали первый блок, и пошло выполнение дальше по плану.
После чтения с диска у нас map — он выполняется тривиально на каждом рабочем узле.
Дальше идет groupBy. Это уже не простая конвеерная операция, а настоящая распределенная группировка. По хорошему, лучше этот оператор избегать, так как пока он реализован не слишком умно — плохо отслеживает локальность данных и по производительности будет сравним с распределенной сортировкой. Ну это уже информация к размышлению.
Давайте задумаемся о состоянии дел в момент выполнения groupBy. Все RDD до этого были конвеерными, то есть они ничего нигде не сохраняли. В случае сбоя, они опять бы вытащили недостающие данные из HDFS и пропустили через конвеер. Но groupBy нарушает конвеерность и в результате мы получим закэшированный RDD. В случае потери теперь мы вынуждены будем переделать все RDD до groupBy полностью.
Чтобы избежать ситуации, когда из-за сбоев в сложном приложении для Spark приходится пересчитывать весь конвеер, Spark позволяет пользователю контролировать кэширование оператором persist. Он умеет кэшировать в память (в этом случае идет пересчет при потере данных в памяти — она может случится при переполнении кэша), на диск (не всегда достаточно быстро), или в память с выбросом на диск в случае переполнения кэша.
После, у нас опять map и запись в HDFS.
Ну вот, теперь более менее понятно что происходит внутри Spark на простом уровне.
А как же подробности?
Например хочется знать как именно работает операция groupBy. Или операция reduceByKey, и почему она намного эфективнее, чем groupBy. Или как работает join и leftOuterJoin. К сожалению большинство подробностей пока легче всего узнать только из исходников Spark или задав вопрос на их mailing list (кстати, рекомендую подписаться на него, если будете что-то серьезное или нестандартное делать на Spark).
Еще хуже с понимаем, что творится в различных коннекторах к Spark. И насколько ими вообще можно пользоваться. Например нам на время пришлось отказаться от идеи интегрироваться с Cassandra из-за их непонятной поддержки коннектора к Spark. Но надежда есть что документация качественная в скором будущем появится.
А что у нас интересного есть сверху Spark?
- SparkSQL: SQL движок сверху Spark. Как мы видели уже, в Sparke уже практически все для этого есть, кроме хранилища, индексов и своей статистики. Это серьезно затрудняет оптимизацию, но команда SparkSQL утверждает, что они пилят свой новый фреймворк оптимизации, а также AMP LAB (лаборатория, откуда вырос Spark) не собирается отказывать и от проекта Shark — полное замещение Apache HIVE
- Spark MLib: Это по сути замещение Apache Mahaout, только намного серьезнее. Помимо эффективного параллельного машинного обучения (не только средствами RDD, но и дополнительными примитивами) SparkML еще намного качественнее работает с локальными данными, используя пакет нативной линейной алгебры Breeze, который притянет к вам в кластер Фортрановский код. Ну и очень хорошо продуманный API. Простой пример: параллельно обучаемся на кластере с кросс-валидацией.
- BlinkDB: Очень интересный проект — неточные SQL запросы сверху больших объемов данных. Хотим подсчитать average по какому-нибудь полю, но хочется сделать это не дольше чем за 5 секунд (потеряв в точности) — пожалуйста. Хотим результат с погрешностью не больше заданной — тоже годится. Кстати куски этой BlinkDB можно встретить внутри Spark (это можно рассматривать как отдельный квест).
- Ну и много-много всего сейчас пишется сверху Spark, я только самые интересные с моей точки зрения проекты перечислил
Автор: PavelVelikhov