Здравствуйте, коллеги! Напоминаем, что не так давно у нас вышла книга о Spark, а прямо сейчас проходит последнюю корректуру книга о Kafka.
Надеемся, эти книги окажутся достаточно успешными для продолжения темы — например, для перевода и издания литературы по Spark Streaming. Перевод об интеграции этой технологии с Kafka мы и хотели вам сегодня предложить
1. Обоснование
Apache Kafka + Spark Streaming – одна из наилучших комбинаций для создания приложений, работающих в реальном времени. В этой статье мы подробно обсудим детали такой интеграции. Кроме того, мы рассмотрим пример со Spark Streaming-Kafka. Затем обсудим «подход с получателем» и вариант непосредственной интеграции Kafka и Spark Streaming. Итак, приступим к интеграции Kafka и Spark Streaming.
2. Интеграция Kafka и Spark Streaming
При интеграции Apache Kafka и Spark Streaming возможны два подхода к конфигурации Spark Streaming для получения данных из Kafka – т.е. два подхода к интеграции Kafka и Spark Streaming. Во-первых, можно использовать Получатели и высокоуровневый API Kafka. Второй (более новый) подход – это работа без Получателей. Для обоих подходов существуют разные модели программирования, отличающиеся, например, по части производительности и семантических гарантий.
Рассмотрим эти подходы подробнее
a. Подход на основе получателей
В данном случае прием данных обеспечивает Получатель. Итак, воспользовавшись высокоуровневым API потребления, предусмотренным в Kafka, мы реализуем Получатель. Далее полученные данные хранятся в Исполнителях Spark. Затем в Kafka – Spark Streaming запускаются задания, в рамках которых обрабатываются данные.
Однако, при использовании такого подхода сохраняется риск потери данных в случае отказа (при конфигурации, задаваемой по умолчанию). Следовательно, потребуется дополнительно включить в Kafka – Spark Streaming журнал опережающей записи, чтобы исключить потери данных. Таким образом, все данные, полученные от Kafka, синхронно сохраняются в журнале опережающей записи в распределенной файловой системе. Именно поэтому даже после отказа системы все данные можно восстановить.
Далее рассмотрим, как использовать в приложении с Kafka – Spark Streaming такой подход с применением получателей.
i. Связывание
Теперь свяжем наше потоковое приложение со следующим артефактом для приложений Scala/Java, воспользуемся при этом определениями проекта для SBT/Maven.
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0
Однако, при развертывании нашего приложения нам придется добавить вышеупомянутую библиотеку и ее зависимости, это понадобится для Python-приложений.
ii. Программирование
Далее создадим входной поток DStream
, импортировав KafkaUtils
в код потокового приложения:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
Кроме того, пользуясь вариантами createStream, можно задать классы-ключи и классы-значения, а также соответствующие классы для их декодирования.
iii. Развертывание
Как и в случае с любым приложением Spark, для запуска используется команда spark-submit. Однако, детали немного отличаются в приложениях на Scala/Java и в приложениях на Python.
Более того, при помощи –packages
можно добавить spark-streaming-Kafka-0-8_2.11
и его зависимости непосредственно к spark-submit
, это пригодится для приложений на Python, где невозможно управлять проектами при помощи SBT/Maven.
./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 ...
Также можно загрузить JAR-архив артефакта Maven spark-streaming-Kafka-0-8-assembly
из репозитория Maven. Затем добавить его к spark-submit
с –jars
.
b. Прямой подход (без получателей)
После подхода с использованием получателей был выработан более новый подход – «прямой». Он дает надежные сквозные гарантии. В таком случае мы периодически запрашиваем Kafka о смещениях вычитанных данных (offsets) по каждому топику/секции, а не организуем доставку данных через получателей. Кроме того, определяется размер считываемого фрагмента, это нужно для правильной обработки каждого пакета. Наконец, для считывания диапазонов с данными из Kafka с заданными смещениями используется простой потребляющий API, особенно когда запускаются задания по обработке данных. Весь процесс напоминает считывание файлов из файловой системы.
Примечание: Данная возможность появилась в Spark 1.3 для Scala и Java API, а также в Spark 1.4 для Python API.
Теперь давайте обсудим, как применить этот подход в нашем потоковом приложении.
Об API потребления (Consumer API) подробнее рассказано по следующей ссылке:
Apache Kafka Consumer | Examples of Kafka Consumer
i. Связывание
Правда, такой подход поддерживается лишь в приложениях на Scala/Java. Имея следующий артефакт, скомпонуйте проект SBT/Maven.
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.0
ii. Программирование
Далее импортируйте KafkaUtils и создайте входной DStream
в коде потокового приложения:
import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
В параметрах Kafka потребуется указать либо metadata.broker.list
, либо bootstrap.servers
. Следовательно, по умолчанию мы будем потреблять данные, начиная с последнего смещения в каждой секции Kafka. Однако, если вы хотите, чтобы считывание началось с самого маленького фрагмента, то в параметрах Kafka нужно задать конфигурационную опцию auto.offset.reset
.
Более того, работая с вариантами KafkaUtils.createDirectStream
, можно начать считывание с произвольного смещения. Затем сделаем следующее, что позволит нам получить доступ к фрагментам Kafka, потребленным в каждом пакете.
// Храним ссылку на актуальные диапазоны фрагментов, чтобы ее могли использовать и последующие потоки
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}
Если мы хотим организовать мониторинг Kafka на основе Zookeeper при помощи специальных инструментов, то можем сами обновлять Zookeeper с их помощью.
iii. Развертывание
Процесс развертывания в данном случае напоминает процесс развертывания в варианте с получателем.
3. Преимущества прямого подхода
Второй подход к интеграции Spark Streaming с Kafka выигрывает у первого по следующим причинам:
a. Упрощенный параллелизм
В данном случае не требуется создавать множество входных потоков Kafka и объединять их. Однако, Kafka – Spark Streaming создаст столько RDD-сегментов, сколько будет сегментов Kafka на потребление. Все эти данные Kafka будут считываться параллельно. Следовательно, можно сказать, что у нас будет соответствие «один к одному» между сегментами Kafka и RDD, а такая модель получается понятнее и проще в настройке.
b. Эффективность
Чтобы полностью исключить потери данных при первом подходе, информацию требовалось хранить в журнале опережающей записи, а затем их реплицировать. На самом деле, это неэффективно, поскольку данные реплицируются дважды: в первый раз самим Kafka, а во второй – журналом опережающей записи. При втором подходе данная проблема устраняется, поскольку получателя нет, а, значит, не нужен и журнал опережающей записи. Если у нас предусмотрено достаточно долгое хранение данных в Kafka, то восстанавливать сообщения можно прямо из Kafka.
с. Семантика Exactly-Once
В принципе, мы использовали высокоуровневый API Kafka при первом подходе, чтобы хранить в Zookeeper потребленные считываемые фрагменты. Однако, именно так принято потреблять данные из Kafka. Пусть при этом и надежно исключаются потери данных, существует небольшая вероятность, что при некоторых отказах отдельные записи могут быть потреблены дважды. Все дело в несогласованности между механизмом надежной передачи данных в Kafka – Spark Streaming и считыванием фрагментов, происходящим в Zookeeper. Следовательно, при втором подходе мы применяем простой Kafka API, не требующий прибегать к Zookeeper. Здесь считываемые фрагменты отслеживаются в Kafka – Spark Streaming, для этого используются контрольные точки. В таком случае устраняется несогласованность между Spark Streaming и Zookeeper/Kafka.
Следовательно, даже в случае отказов, Spark Streaming получает каждую запись строго однократно. Здесь нужно гарантировать, чтобы наша операция вывода, при которой данные сохраняются во внешнем хранилище, была либо идемпотентной, либо атомарной транзакцией, в которой сохранялись бы и результаты, и смещения. Именно так достигается семантика exactly-once при выводе наших результатов.
Хотя, здесь есть один недостаток: смещения в Zookeeper не обновляются. Поэтому мониторинговые инструменты Kafka на основе Zookeeper не позволяют отслеживать прогресс.
Однако, мы все равно можем обращаться к смещениям, если обработка устроена таким методом – обращаемся к каждому пакету и обновляем Zookeeper сами.
Вот и все, что мы хотели рассказать об интеграции Apache Kafka и Spark Streaming. Надеемся, вам понравилось.
Автор: ph_piter