Мне давно хотелось изложить свои впечатления об Apache Spark, и тут как раз попалась на глаза вот эта статья от сотрудника Pivotal Robert Bennett: thenewstack.io/the-good-bad-and-ugly-apache-spark-for-data-science-work, опубликованная совсем недавно, 26 июня 2018.
Это не будет перевод, а скорее все-таки мои впечатления и комментарии на тему.
Что делает Spark популярным?
Цитата:
It’s easy to see why Apache Spark is so popular. It does in-memory, distributed and iterative computation, which is particularly useful when working with machine learning algorithms. Other tools might require writing intermediate results to disk and reading them back into memory, which can make using iterative algorithms painfully slow.
Начнем с того, что это все по большей части не совсем правда. In memory? Ну да, Spark будет стараться, но то, что написано тут про other tools, точно также будет иметь место. В конце концов, память, ядра процессора и сеть — ресурсы ограниченные, так что рано или поздно любой инструмент упирается в их пределы.
В некотором смысле Spark ни разу не более in-memory, чем любой классический map-reduce. Так или иначе данные все равно должны либо оказаться на диске (кроме всего прочего, это позволит спокойнее пережить ошибки, и не начинать вычисления с самого начала), либо переданы по сети (shuffle и другие процессы). Я уже не говорю, что вам как программисту мало что помешает выполнить persist, и сохранить промежуточные результаты на диск, если вы вдруг захотите. Захотите ли вы их сохранить в память, если данных скажем терабайт? Сомневаюсь.
Я бы скорее сказал, что в отличие от других инструментов (под которыми обычно понимается классический map-reduce), Spark позволяет чуть меньше думать об оптимальном использовании ресурсов, и больше оптимизирует это использование сам. А уж окончательное быстродействие, в конечном счете, скорее зависит от прямизны рук того, кто пишет программу.
Далее автор перечисляет такие качества Spark, которые кажутся ему наилучшими:
Привлекательный API и ленивое выполнение (Appealing APIs and Lazy Execution)
В целом я с этим согласен. Spark как средство разработки намного удобнее классического map-reduce, и несколько удобнее инструментов типа Apache Crunch и других инструментов из условного «второго» поколения. А также несколько более гибкий, чем например Hive, и не ограничен SQL как таковым.
Ленивое же исполнение — это не всегда хорошо. Иногда было бы лучше, если бы скажем различия в схемах Hive и DataSet диагностировались не тогда, когда все данные уже были обработаны, а чуть раньше, и падало все не через пару часов/сутки, а при запуске.
Простота преобразования (Easy Conversion)
Тут автор в основном имел в виду преобразования между структурами Spark и Python/Pandas. Я от этого далек, поэтому не стану высказываться. Возможно про pySpark расскажу чуть ниже.
Простота трансформаций (Easy Transformations)
Another asset of Spark is the “map-side join” broadcast method. This method speeds up joins significantly when one of the tables is smaller than the other and can fit in its entirety on individual machines. The smaller one gets sent to all nodes so the data from the bigger table doesn’t need to be moved around. This also helps mitigate problems from skew. If the big table has a lot of skew on the join keys, it will try to send a large amount of data from the big table to a small number of nodes to perform the join and overwhelm those nodes.
Не знаю, что у них там в питоне, но в наших краях map-side join несложно делается либо голыми руками, либо любым из инструментов типа Crunсh. Не вижу в этом каких-то особых преимуществ, это многие умеют, вот и Hive например. При де-факто отсутствии индексов в экосистеме Hadoop map side join пожалуй один из основных инструментов оптимизации join вообще.
API же для трансформации достаточно удобен, хотя и неоднороден. Скажем, «старый» RDD API, будучи наверное немного более гибким, дает в тоже время больший простор для совершения ошибки, особенно если вы работаете не на уровне классов фиксированной структуры (Java Beans), а Row и с гибкой структурой данных. Расхождение между реальной и ожидаемой Spark схемами — вполне обычное при этом дело.
Что же до API DataSet, то я бы сказал что он очень хорош. После некоторой практики вполне можно писать все на нем также легко, как на SQL, дополняя его своими UDF, и добиваясь большей гибкости. Сами UDF при этом пишутся проще, чем для Hive, и какие-то сложности возникают разве что при возврате из них сложных структур данных (массивы, map, struct), да и то в Java и скорее потому, что ожидаются структуры для Scala.
Скажем, мне удавалось достаточно легко использовать в виде UDF такую штуку, как Java порт pymorphy2. Или геокодер. В сущности, все что нужно — это правильно инициализировать свою UDF, помня про особенности сериализации Spark.
А вот API Spark ML, с другой стороны, выглядит так, как будто его проектировали совсем другие люди. Это не значит, что он плохой — он просто иной.
Open Source Community
Spark has a massive open-source community behind it. The community improves the core software and contributes practical add-on packages. For example, a team has developed a natural language processing library for Spark. Previously, a user would either have to use other software or rely on slow user-defined functions to leverage Python packages such as Natural Language Toolkit.
Тут в общем добавить нечего. Сообщество реально большое, квалифицированное и дружелюбное. Пишется огромное число расширений для Spark.
Очередной пассаж про slow UDF оставим на совести питониста — Scala/Java UDF вовсе не такие медленные, и при этом весьма удобные.
Что я бы добавил от себя:
Разработка на разных языках
Наверное, одной из причин популярности является и поддержка нескольких языков разработки (Scala, Java, Python и R). По большому счету, API для разных языков примерно одинаково удобен, но я бы не назвал эту поддержку идеальной. Скажем, запуская свое Spark приложение, вы сразу выбираете между Java/Scala и Python, и не можете комбинировать языки в рамках одного запуска. Таким образом, интеграция между частями приложения на pySpark (на котором часто пишут ML или NLP части), и Java/Scala реально возможна только через файлы/базы данных. Ну или что-то типа Kafka, REST и пр. варианты.
Streaming
Spark Streaming (не путать с Hadoop Streaming, который совсем для другого), это еще одна привлекательная часть возможностей Spark. Если описать ее в одном предложении, то это обработка данных потоковых, приходящих скажем из Kafka, ZeroMQ и т.п. теми же средствами, что и данные, взятые из базы.
Вся прелесть именно в том, что средства именно те же самые, т.е. вам практически не придется ничего менять в программе, чтобы начать обрабатывать данные из Kafka. Ни map reduce, ни Crunch, ни Cascading какой-нибудь вам такой фокус провернуть не позволят.
Недостатки
У каждого свои недостатки (с). С какими проблемами вы можете столкнуться при работе со Spark?
Cluster Management
Spark is notoriously difficult to tune and maintain. That means ensuring top performance so that it doesn’t buckle under heavy data science workloads is challenging. If your cluster isn’t expertly managed, this can negate “the Good” as we described above. Jobs failing with out-of-memory errors is very common and having many concurrent users makes resource management even more challenging.
А разве кто-то обещал? Собственно, я уже писал выше, что все замечательно и просто может быть ровно в одном случае — если у вас либо задачка не очень большая, либо ресурсов сколько угодно — или иными словами, задача не слишком сложная.
В остальных же случаях, каковых очевидно большинство, Spark приложения нужно тюнить, настраивать и поддерживать.
Do you go with fixed or dynamic memory allocation? How many of your cluster’s cores do you allow Spark to use? How much memory does each executor get? How many partitions should Spark use when it shuffles data? Getting all these settings right for data science workloads is difficult.
Скажем, казалось бы сравнительно простая задача выбора числа executors. В принципе, зная кое-что про свои данные, можно это число спокойно рассчитать. Но в условиях, когда ресурсы используете не только вы, все становится намного веселее. Если же ваш процесс включает еще и обращения к другим приложениям, то…
Например, у меня есть приложение, частью функциональности которого является обратное геокодирование. И им занимается отдельный сервер ArcGIS. При этом ArcGIS имеет в своем распоряжении всего 4 ядра, а кластер Hadoop, где выполняется Spark, имеет десятки узлов, в итоге если мы просто выделяем Spark всего-то 8 executors, то кривая загрузки процессора ArcGIS подскакивает до 100%, где и остается на пару часов работы приложения. А если мы перекладываем эту задачу на Spark (переписав предварительно код приложения), то время работы сокращается на пару порядков — благодаря тому, что мы можем использовать ресурсы кластера и для этой задачи тоже.
То есть, у нас зачастую имеется бутылочное горлышко, где либо выделен фиксированный объем ресурсов, либо управление этими ресурсами осуществляется другим способом (на который Spark не может повлиять). Соответственно, ожидать от Spark, что он оптимизирует использование этих ресурсов, было бы наивно.
Отладка (Debugging)
Это чистая правда. Ожидаемая, впрочем. Мы имеем распределенную параллельную систему, отладка и мониторинг которой представляют собой нетривиальную задачу. SparkUI в какой-то степени решает вопросы наблюдения, а Spark Metrics — измерения производительности, но попробуйте, скажем, подключиться к исполняемому приложению отладчиком — вы не знаете ни хост, где оно работает, ни порт, который окажется свободным для подключения. Те же метрики, которые для обычного приложения могут быть легко получены например из JMX, в случае приложения распределенного должны передаваться по сети, и только потом могут быть собраны. Да, с этим все относительно плохо.
Плохое быстродействие UDF в PySpark (Slowness of PySpark UDFs)
Ну что я тут могу сказать? За что боролись — на то и напоролись (с). Насколько я понимаю, UDF на питоне приводит к тому, что происходит двойное преобразование данных между приложением и UDF. Просто потому, что питон все-таки чуждый язык для экосистемы JVM, на которой работает Spark, и UDF исполняется вне ее.
Тут можно посоветовать только одно — не пишите на питоне, пишите на Scala/Java. Понятно, что не всегда этому совету хочется и можно следовать, но боюсь что решить эту проблему глобально сможет разве что Graal, когда его версию питона доведут до промышленного уровня.
Сложно гарантировать максимальный уровень параллелизма (Hard-to-Guarantee Maximal Parallelism)
One of Spark’s key value propositions is distributed computation, yet it can be difficult to ensure Spark parallelizes computations as much as possible. Spark tries to elastically scale how many executors a job uses based on the job’s needs, but it often fails to scale up on its own. So if you set the minimum number of executors too low, your job may not utilize more executors when it needs them. Also, Spark divides RDDs (Resilient Distributed Dataset)/DataFrames into partitions, which is the smallest unit of work that an executor takes on. If you set too few partitions, then there may not be enough chunks of work for all the executors to work on. Also, fewer partitions means larger partitions, which can cause executors to run out of memory.
Если бы все было так просто. Начнем с простого — параметры для запуска следует тюнить для каждого конкретного кластера. Prod кластер может иметь на порядок больше узлов, и в разы больше памяти, доступной на каждом. Настройки для Dev кластера вероятно будут занижены при запуске на Prod. Все это еще больше усложняется, если начать учитывать текущую загрузку кластера задачами. В общем виде эта задача выделения ресурсов кластера является задачей оптимизации, достаточно нетривиальной, и не имеет единственного верного решения.
Если партиций мало, то параллелизм недостаточен. А если их слишком много — то размер каждой может оказаться ниже, чем некоторый условный нижний предел, вроде размера блока HDFS. Поскольку каждая задача — это ресурсы, затраченные на ее запуск, очевидно есть нижний предел размеров задачи, ниже которого опускаться не стоит, потому что накладные расходы растут быстрее, чем производительность.
Простой пример — приложение, которому нужен некоторый значительный объем справочников. Если в случае «обычной» задачи map-reduce на Hadoop мы как правило доставляем код к данным, т.е. копируем наше приложение + части Spark на узлы кластера, где лежит наш файл (файлы), то справочники — это уже похоже на map side join, и их нужно доставлять вместе с кодом. И вот внезапно размер данных, доставляемых на каждый узел вырос на пару порядков — было например 10 мегабайт (небольшое Spark приложение, без самого Spark), стало например 20 гигабайт (вполне реальный случай, справочники, нужные для нормализации адресов, телефонов и пр. данных вполне тянут на такой объем). Ну и вот она — цена излишнего параллелизма, налицо.
Возможно существует некоторое естественное число партиций, которое определяется числом блоков, на которые разбит наш входной файл, с учетом коэффициента репликации. Вероятно, что это число близко к оптимальному с точки зрения чтения данных. То есть, если у нас в файле три блока, и каждый блок имеет копии на 2 узлах кластера, то мы естественным образом можем параллельно вести обработку в 6 потоков, обрабатывая каждую реплику на своем узле. Разумеется, Spark учитывает эти параметры, когда динамически распределяет ресурсы.
К сожалению или к счастью, Spark не является планировщиком ресурсов кластера. Им является например Yarn. Так что у Spark просто может быть недостаточно информации, чтобы оптимально планировать использование всех ресурсов.
Не слишком хорошая интеграция с Hive
С одной стороны, Spark отлично работает с данными и метаданными Hive. Я бы сказал, что большая часть приложений, что мне попадались, как раз этим и занимается. Но не обходится без досадных проблем. Скажем, если вы попробуете воспользоваться в Spark его средствами partitionBy и bucketBy, очень велика вероятность, что Hive результаты вашей работы не увидит. При этом все что вы получите — это невнятный warning где-нибудь в логах.
Совместимость
К сожалению, мой опыт говорит на эту тему скорее плохое. Мы натыкались на множественные проблемы при попытках запускать приложения на кластерах, где версия Spark отличалась от ожидаемой. При разработке на Spark 2.2.0 проблемы были как при запуске на 2.1, так и на 2.3.
Скажем, в нашем случае Spark почему-то не мог найти при запуске на версии 2.3 один из codecs (а именно snappy). Это не слишком серьезная проблема, если вам нужно записать данные (вы можете указать кодек при записи, и выбрать любой, включая не упакованные данные), но если вам нужно прочитать что-то, что упаковано snappy, то вам явно не повезло.
Возможно, какие-то из проблем были вызваны ошибками в установщике, но сильно от этого не легче. Все-таки, мне кажется что миграция между минорными версиями должна была бы быть более плавной.
Ну и увы, но Spark не предполагает штатную параллельную установку на один кластер двух разных версий одной линейки (те же 2.2 и 2.3).
Ужасные стороны
API Awkwardness
Since much of the Spark API is so elegant, the inelegant parts really stand out. For example, we consider accessing array elements to be an ugly part of Spark life.
Не сказал бы, что работа с массивами так уж ужасна. Некоторые неудобства приносит тот факт, что Spark API изначально сделан на Scala, а там своя структура коллекций, которую работая из Java приходится приводить к скаловской. А так, если вы способны написать UDF, то вы способны делать с массивами все что угодно. А, ну да — в питоне же все плохо с UDF, все время забываю.
Не очень удобно и не слишком эффективно — да, возможно. Это пытается решить новая на сегодня версия Spark 2.4, где введены новые функции высшего порядка для работы со сложными структурами (что позволит избежать применения explode/collect).
На мой взгляд, намного более неудобной стороной API является то, что глядя на код, далеко не всегда очевидно, какая именно часть будет выполняться на driver, а какие — на других узлах. При этом механизм распространения кода по узлам предполагает его сериализацию (тем или иным способом), и тот код, который работает на executors, должен быть сериализуемым. Разбираясь с ошибками сериализации вы сможете узнать много нового и интересного о своем коде :).
Classloaders
К сожалению, вопрос изоляции кода приложения от кода Spark решен недостаточно хорошо. Впрочем, тоже самое относится и к классическим map-reduce приложениям Hadoop. При этом код Hadoop использует некоторые древние версии такой библиотеки, как Google Guava, да и другие библиотеки далеко не новы, прямо скажем. Если вспомнить, что авторы Guava любят вносить в свой API обратную несовместимость, удаляя deprecated методы, то мы получаем совершенно дурацкую картину — вы пишете свой код под Guava свежей версии, запускаете, и оно падает — либо потому, что реально вы работаете с версией Guava из Hadoop (намного более старой), и ваш код не находит методов из новой версии, либо Hadoop падает, потому что несовместим с версией новой. Это достаточно типичная, к сожалению проблема, с которой сталкивается наверное каждый второй разработчик. Библиотека Apache Http Components — еще один пример подобной проблемы.
SQL без bind variables
Увы, но типичный код для выполнения запроса на спарке выглядит вот так:
val sqlDF = spark.sql(«SELECT * FROM people WHERE id= 1»)
В API не предусмотрено варианта для выполнения запроса id=? и подстановкой параметров при каждом выполнении. Ну ладно, допустим проблема SQL-injection авторов не беспокоит, но параметры в запрос должны подставить разработчики, соответственно, замена спецсимволов — целиком на нас с вами. Объективности ради, этим же страдает и Hive, где тоже нельзя определить запрос с параметрами.
Впрочем, что еще смешнее, для JDBC источников формально нельзя даже написать запрос — можно только указать таблицу, но не колонки. Неформально можно оказывается написать вместо таблицы что-то типа (select a, b, c from d) t, но будет ли это работать во всех случаях — никто вам точно не расскажет.
Lack of Maturity and Feature Completeness
Мда. Чужая голова — потемки.
Another example feature gap is difficulty creating sequential unique record identifiers with Spark. A sequential, unique index column is helpful for some types of analysis. According to the documentation, “monotonically_increasing_id()” generates a unique ID for each row, but does not guarantee that the IDs are consecutive. If consecutive IDs are important to you, then you may need to use Spark’s older RDD format.
Не понимаю таких претензий. Исходники же доступны, и вполне можно заглянуть, и хотя бы прочитать комментарии:
Returns monotonically increasing 64-bit integers.
*
* The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive.
* The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits
* represent the record number within each partition. The assumption is that the data frame has
* less than 1 billion partitions, and each partition has less than 8 billion records.
*
Ну т.е., эта функция просто берет номер партиции, и добавляет к нему счетчик. Естественно, вам не гарантируют, что никто не будет вызывать ее между вашими двумя последовательными вызовами. Одно Spark приложение — это потенциально множество JVM, работающих на разных узлах кластера, и вероятно множество потоков выполнения внутри одной JVM.
А теперь пусть автор подумает еще немного, и попробует придумать способ генерировать в этой параллельной и распределенной системе именно такие id, какие ему хочется, не создав при этом единой точки генерации (которая будет заведомо бутылочным горлышком), и без блокировок (которые будут тем же самым).
Что мы ждем от Spark 2.4
Уже упомянутые функции высшего порядка
Это реально хорошо. Главное чтобы работало.
По сути, это набор встроенных функций для работы с массивами или map-ами, а также возможность выполнять трансформации над ними при помощи собственных функций (лябмд).
вот тут можно посмотреть некоторые примеры использования.
Новый режим исполнения
Это так называемый barier планировщик и режим выполнения. Авторы предназначают его для задач машинного обучения, но набор таких задач конечно несколько шире. По сути, это такие задачи, которые не являются обычными для Spark map-reduce. Насколько я это понял — это по большей части компоненты обмена сообщениями, которые запускаются один раз, или в случае аварийного их завершения.
Если API для поддержки таких задач будет удобным — то потребность в нем точно есть. Скажем, в нашей компании такие компоненты оформляются как Yarn-приложения, и работают от Spark несколько отдельно. Более тесная и удобная интеграция в рамках Spark была бы нелишней.
Улучшенная поддержка Avro
Поддержка Avro в общем и так была неплохой. Поддержаны некоторые дополнительные типы данных, а именно так называемые «логические типы» (по сути — некие производные типы), куда входят Decimal, Date, Time, Duration и другие.
Я, откровенно говоря, больше жду когда авторы Hive (ну и Spark заодно) научатся лучше поддерживать паркет, создавая таблицы на основе его схемы. Это и сейчас можно, но с Avro это выглядит и работает удобнее.
Поддержка Scala 2.12 (экспериментальная)
Казалось бы, мне, как в основном Java программисту, это безразлично, однако же в рамках этого проекта обещали улучшить взаимодействие с Java 8, например сериализацию лямбд, что было бы очень и очень приятно.
Автор: sshikov