Flume — управляем потоками данных. Часть 1

в 15:44, , рубрики: big data, flume, Hadoop, Анализ и проектирование систем, Блог компании DCA (Data-Centric Alliance), разработка, хранение данных

Привет! В этом цикле статей я планирую рассказать о том, как можно организовать сбор и передачу данных с помощью одного из инструментов Hadoop — Apache Flume.

Flume — управляем потоками данных. Часть 1 - 1


Первая статья посвящается основным элементам Flume, их настройкам и способам запуска Flume. На просторах Хабра уже имеется статья о том, как работать с Flume, поэтому некоторые базовые разделы будут во многом схожи с ней.

В продолжении цикла я постараюсь более подробно осветить каждый из компонентов Flume, рассказать о том, как настроить мониторинг для него, написать свою реализацию одного из элементов и многое другое.

1. Что такое Flume?

Flume представляет собой инструмент, позволяющий управлять потоками данных и, в конечном счете, передавать их на некоторый “пункт назначения” (например, в файловую систему или HDFS).

В целом, организация транспортировки данных посредством Flume напоминает создание эдакого “конвейера” или “водопровода”. Этот “конвейер” состоит из различных участков (узлов), на которых и происходит управление потоком данных (фильтрация, разделение потока и т.п.).

Flume является надежным и удобным инструментом для транспортировки данных. Надежность обеспечивается в первую очередь транзакционностью передачи данных. Т.е. при правильной настройке цепочки узлов Flume не может быть ситуации, при которой данные потеряются или будут переданы не полностью. Удобство же заключается в гибкости конфигурации — большинство задач решаются добавлением в конфигурацию нескольких параметров, а более сложные могут быть решены путем создания собственных элементов Flume.

Для начала обозначим основные термины, а затем мы рассмотрим структуру одиночного узла Flume.

2. Основные термины

  • Событие (event) — единица данных с дополнительной мета-информацией. По структуре событие напоминает POST-запрос.
    • Заголовки (headers) — мета-информация, набор пар “ключ”-”значение”.
    • Содержимое (body) — собственно, данные, ради передачи которых всё затевается. Передается как byte[].

  • Клиент (client) — внешний, по отношению к узлу Flume, сервис, поставляющий данные.
  • Источник (source) — отвечает за прием данных. При этом Flume предусматривает два типа источников — EventDrivenSource и PollableSource. В первом случае источник сам решает, когда добавлять события в канал (например, HTTPSource добавляет события по мере получения HTTP-запросов). PollableSource в противовес EventDrivenSource является пассивным — Flume просто периодически опрашивает источник на предмет появления новых событий.
  • Сток (Sink) — компонент, отвечающий за передачу данных на следующий этап обработки. Это может быть другой узел Flume, файловая система, HDFS и т.п.
  • Канал (channel) — компонент, выполняющий роль буфера при транспортировке данных. Канал является пассивным компонентом, он самостоятельно не инициирует никаких действий. Источники добавляют события в канал, в то время как стоки его опустошают.
  • Агент (agent) — процесс, в рамках которого функционируют компоненты Flume (источники, каналы, стоки). JVM Instance, в общем. Один узел может содержать несколько агентов.

3. Структура узла Flume

Правильнее было бы назвать этот подраздел “Структура агента Flume”, т.к. узел Flume может состоять из нескольких агентов. Но в рамках данной статьи все примеры будут приводиться как “один узел — один агент”, поэтому я позволю себе вольность и пока не буду разделять эти понятия.

Рассмотрим несколько конфигураций для различных жизненных случаев.

Простой узел
Под простым узлом я подразумеваю самую минималистичную конфигурацию Flume, которая только может быть: источник &#8594 канал &#8594 сток.

Такую конфигурацию можно использовать для простых целей — например, узел является замыкающим в цепочке узлов нашего «водопровода» и выполняет всего одну роль: принимает данные и записывает их в файл (непосредственно записью занимается сток). Или же узел является промежуточным и просто передает данные дальше (иногда это полезно делать для обеспечения отказоустойчивости — например, развернуть такой узел на машине с Flume-клиентом, чтобы избежать потери данных при проблемах с сетью).

Flume — управляем потоками данных. Часть 1 - 2

Делитель
Более сложный пример, который может быть использован для разделения данных. Здесь ситуация немного другая по сравнению с одиночным стоком: наш канал опустошают два стока. Это приводит к тому, что поступающие события делятся между двумя стоками (не дублируются, а именно делятся). Такую конфигурацию можно использовать, чтобы разделить нагрузку между несколькими машинами. При этом, если одна из конечных машин выйдет из строя и привязанный к ней сток не сможет отправлять на нее события, другие стоки продолжат работу в штатном режиме. Естественно, что при этом работающей машине придется отдуваться за двоих.

Flume — управляем потоками данных. Часть 1 - 3

Примечание: Flume располагает более тонкими инструментами для балансировки нагрузки между стоками, для этого используются Flume Sink Processor’ы. О них речь пойдет в следующих частях цикла.

Дубликатор
Такой узел Flume позволяет отправлять одни и те же события на несколько стоков. Может возникнуть вопрос — а зачем два канала, разве не может канал дублировать события сразу на два стока? Ответ — нет, поскольку не «канал раздает события», а «сток опустошает канал». Даже если бы такой механизм и существовал, то выход из строя одного из стоков привел бы к неработоспособности других (т.к. каналу бы пришлось работать по принципу “либо все смогли, либо никто”). Это объясняется тем, что при сбое на уровне стока отсылаемая пачка событий не исчезает «в никуда», а остается лежать в канале. Ибо транзакция.

Flume — управляем потоками данных. Часть 1 - 4

Примечание: в данном примере используется безусловное дублирование — т.е. в оба канала копируется все подряд. Flume позволяет не дублировать, а разделять события по некоторым условиям — для этого используется Flume Channel Selector. О нем речь также пойдет в следующих статьях цикла.

Универсальный приемник
Еще один полезный вариант конфигурации — с несколькими источниками. Крайне полезная конфигурация, когда необходимо “слить воедино” однотипные данные, полученные различными способами.

Flume — управляем потоками данных. Часть 1 - 5

Резюме:

  • Узел может иметь в своем составе множество источников, каналов и стоков.
  • Один источник может складывать события в несколько каналов (дублировать или распределять по некоторому правилу).
  • Несколько источников складывать события в один канал.
  • Один сток может работать только с одним каналом.
  • Несколько стоков могут забирать события из одного канала (равномерно или по некоторому правилу балансировки).

4. Конфигурация и запуск узла Flume

Думаю, пришло время практических примеров. Стандартный пакет Flume содержит множество реализаций источников/каналов/стоков для разных случаев жизни — описание по их настройке можно найти здесь. В рамках этой статьи я ограничусь самыми простыми реализациями компонентов:

  • Memchannel (канал, использующий оперативную память для хранения событий).
  • NetCat Source.
  • Logger Sink (сток, выводящий события в консоль).

Пожалуй так выглядит самая простая конфигурация для узла Flume:

### ==================== Компоненты узла ==================== ###
# Перечисляем все основные компоненты, из которых будет состоять наш узел: источники, каналы и узлы

# <agent>.sources - имена источников, разделенные пробелом (в этом примере один источник: my_source)
my_agent.sources = my_source
# <agent>.channels - аналогично указываем имена каналов
my_agent.channels = my_channel
# <agent>.sinks - для стоков то же самое
my_agent.sinks = my_sink

### ==================== Источник my_source ================== ###
# Тип источника - netcat (источники из стандартной поставки Flume имеют зарезервированные имена-псевдонимы,
# в общем случае здесь можно указать полное имя класса источника, в т.ч., вашего собственного)
my_agent.sources.my_source.type = netcat
# Указываем, куда биндить наш исчтоник
my_agent.sources.my_source.bind = 0.0.0.0
my_agent.sources.my_source.port = 11111
# Указываем источнику канал (или список каналов, через пробел), куда отправлять полученные события
my_agent.sources.my_source.channels  = my_channel

### ==================== Канал my_channel ================== ###
# Используем тип канала из пакета Flume - memory (как и с источником, здесь можно казать свой класс), который хранит события в памяти
my_agent.channels.my_channel.type = memory
# Вместимость канала, кол_во событий
my_agent.channels.my_channel.capacity = 10000
# Число событий в одной транзакции (как на добавление, так и на "вытягивание")
my_agent.channels.my_channel.transactionCapacity = 100

### ==================== Сток my_sink ================== ###
# Тип стока - логгер, пишуший события в консоль (и здесь также можно указать свой класс)
my_agent.sinks.my_sink.type = logger
# Из какого канала будем забирать события
my_agent.sinks.my_sink.channel = my_channel
# Настройка исключительно для стока типа logger - сколько первых байт тела события выводить в консоль
my_agent.sinks.my_sink.maxBytesToLog = 256

Осталось теперь запустить узел с нашей конфигурацией. Сделать это можно двумя способами:

  1. На кластере Hadoop, через Cloudera Manager (в этой статье есть подробное описание того, как это сделать).
  2. Как Java-сервис, используя библиотеки Flume.

Поскольку процесс запуска Flume средствами Cloudera Manager освещен достаточно подробно, рассмотрим второй вариант — с помощью Java.

Прежде всего необходимо добавить зависимости Flume к нашему проекту. Для этого добавим в pom.xml репозиторий Clodera и два артефакта Flume — ng-sdk и ng-node.

<repositories>     
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    <repositories>
 
    <dependencies>        
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.5.0-cdh5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-node</artifactId>
            <version>1.5.0-cdh5.3.0</version>
        </dependency>
    </dependencies>

После этого создадим класс с точкой входа:

package ru.flume.samples;

import org.apache.flume.node.Application;

public class FlumeLauncher {
    
    public static void main(String[] args) {  
        // файл с конфигурацией Log4j я позволю себе указать прямо здесь
        System.setProperty("log4j.configuration", "file:/flume/config/log4j.properties");          
        
        // Запускаем Flume с параметрами:
        Application.main(new String[]{
            "-f", "/flume/config/sample.conf", // путь до файла с конфигурацией
            "-n", "my-agent"                   // имя агента
        });   
    }    
}

Читатели, знакомые с Java, заметят, что можно вообще не создавать этот класс, а просто скопировать необходимые зависимости для Flume в отдельную папку и запустить Java с нужными аргументами командной строки. Но это уже дело вкуса — я предпочитаю, чтобы Maven сам подтягивал все необходимые зависимости, в том числе и разработанные нами компоненты Flume, и аккуратно всё это заворачивал в deb-пакет.

Если все пути указаны верно, а конфигурация не содержит ошибок, мы увидим в консоле вот такой лог от Flume.

Вывод Flume, если всё получилось

INFO  main conf.FlumeConfiguration - Processing:my-sink
INFO  main conf.FlumeConfiguration - Added sinks: my-sink Agent: my-agent
INFO  main conf.FlumeConfiguration - Processing:my-sink
INFO  main conf.FlumeConfiguration - Processing:my-sink
INFO  main conf.FlumeConfiguration - Post-validation flume configuration contains configuration for agents: [my-agent]
INFO  main node.AbstractConfigurationProvider - Creating channels
INFO  main channel.DefaultChannelFactory - Creating instance of channel my-channel type memory
INFO  main node.AbstractConfigurationProvider - Created channel my-channel
INFO  main source.DefaultSourceFactory - Creating instance of source my-source, type netcat
INFO  main sink.DefaultSinkFactory - Creating instance of sink: my-sink, type: logger
INFO  main node.AbstractConfigurationProvider - Channel my-channel connected to [my-source, my-sink]
INFO  main node.Application - Starting new configuration:
{
    sourceRunners:{
        my-source=EventDrivenSourceRunner: { 
            source:org.apache.flume.source.NetcatSource{
                name:my-source,
                state:IDLE
            }
        }
    } 
    sinkRunners:{
        my-sink=SinkRunner: { 
            policy:org.apache.flume.sink.DefaultSinkProcessor@77f03bb1 counterGroup:{ name:null counters:{} } 
        }
    }            
    channels:{
        my-channel=org.apache.flume.channel.MemoryChannel{
            name: my-channel
        }
    }
}
INFO  main node.Application - Starting Channel my-channel
INFO  main node.Application - Waiting for channel: my-channel to start. Sleeping for 500 ms
INFO  lifecycleSupervisor-1-0 instrumentation.MonitoredCounterGroup - Monitored counter group for type: CHANNEL, name: my-channel: Successfully registered new MBean.
INFO  lifecycleSupervisor-1-0 instrumentation.MonitoredCounterGroup - Component type: CHANNEL, name: my-channel started
INFO  main node.Application - Starting Sink my-sink
INFO  main node.Application - Starting Source my-source
INFO  lifecycleSupervisor-1-1 source.NetcatSource - Source starting
INFO  lifecycleSupervisor-1-1 source.NetcatSource - Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/0:0:0:0:0:0:0:0:11111]

Чтобы убедиться, что всё работает корректно, отправим нашему NetCat-источнику небольшой тестовый файл test.txt, в котором содержится 4 строки:

Message 1
Message 2
Message 3

Важно, чтобы файл оканчивался переносом строки. Для NetCat-источника он является разделителем событий. Если мы не добавим в конец файла этот перенос строки, то источник будет считать, что последнее событие пришло не полностью. В результате этого он будет упорно ждать разделителя, который, естественно, никогда не придет. Итак, выполняем команду:

nc 127.0.0.1 11111 < test.txt

В результате этого NetCat должен вывести на экран три сообщения «ОК», как подтверждение того, что все строки файла благополучно отправлены и получены источником Flume. В это же время, сток должен вывести в консоль вот такие сообщения:

sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 31 0D                   Message 1. }
sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 32 0D                   Message 2. }
sink.LoggerSink - Event: { headers:{} body: 4D 65 73 73 61 67 65 20 33 0D                   Message 3. }

Примечание: Flume при запуске регистрирует свой shutdownHook, поэтому нет необходимости вручную высвобождать какие-либо ресурсы (соединения, открытые файлы и т.п.) — все компоненты узла самостоятельно завершат работу вместе с JVM.

5. Цепочка узлов Flume

Итак, мы разобрались, как настроить и запустить одиночный узел Flume. Однако для управления потоками данных одного узла явно маловато. Попробуем построить небольшую цепочку из трех узлов, выполняющих задачу деления (по сути — балансировка): первый узел Flume принимает информацию от клиента и отправляет события на два других узла. При этом события не дублируются на втором и третьем узлах, а равномерно распределяются между ними.

Flume — управляем потоками данных. Часть 1 - 6

Соответственно, для такой схемы необходимо несколько конфигураций (для каждого узла — своя).

Конфигурация для узла 1 (node1.conf)


node1.sources = my-source
node1.channels = my-channel
# Теперь здесь 2 стока:
node1.sinks = my-sink1 my-sink2

node1.sources.my-source.type = netcat
node1.sources.my-source.bind = 0.0.0.0
node1.sources.my-source.port = 11111
node1.sources.my-source.channels  = my-channel

node1.channels.my-channel.type = memory
node1.channels.my-channel.capacity = 10000
node1.channels.my-channel.transactionCapacity = 100

# Оба стока делаем с типом avro, они будут опустошать наш единственный канал вдвоем
# Хосты принимающих узлов я оставлю локальными, чтобы всю эту цепочку можно было
# попробовать запустить на одной машине
node1.sinks.my-sink1.type = avro
node1.sinks.my-sink1.channel = my-channel
node1.sinks.my-sink1.hostname = 127.0.0.1
node1.sinks.my-sink1.port = 11112
node1.sinks.my-sink1.batch-size = 100

node1.sinks.my-sink2.type = avro
node1.sinks.my-sink2.channel = my-channel
node1.sinks.my-sink2.hostname = 127.0.0.1
node1.sinks.my-sink2.port = 11113
node1.sinks.my-sink2.batch-size = 100
Конфигурация для узла 2 (node2.conf)

node2.sources = my-source
node2.channels = my-channel
node2.sinks = my-sink

# Поскольку на узле 1 сток имеет тип avro, здесь мы указываем источник типа avro
node2.sources.my-source.type = avro
node2.sources.my-source.bind = 0.0.0.0
node2.sources.my-source.port = 11112
node2.sources.my-source.channels  = my-channel

node2.channels.my-channel.type = memory
node2.channels.my-channel.capacity = 10000
node2.channels.my-channel.transactionCapacity = 100

node2.sinks.my-sink.type = logger
node2.sinks.my-sink.channel = my-channel
node2.sinks.my-sink.maxBytesToLog = 256
Конфигурация для узла 3 (node3.conf)


node3.sources = my-source
node3.channels = my-channel
node3.sinks = my-sink

# Поскольку на узле 1 сток имеет тип avro, здесь мы указываем источник типа avro
node3.sources.my-source.type = avro
node3.sources.my-source.bind = 0.0.0.0
node3.sources.my-source.port = 11113
node3.sources.my-source.channels  = my-channel

node3.channels.my-channel.type = memory
node3.channels.my-channel.capacity = 10000
node3.channels.my-channel.transactionCapacity = 100

node3.sinks.my-sink.type = logger
node3.sinks.my-sink.channel = my-channel
node3.sinks.my-sink.maxBytesToLog = 256

Конфигурации для узлов 2 и 3 в данном примере идентичны, отличаются только номерами портов. Также для связи между узлами здесь используются стандартные компоненты Flume: Avro источник и Avro сток. Подробнее они будут описаны в следующих статьях, пока же нам достаточно того, что Avro Sink может отправлять по сети события, а Avro Source может их принимать.

Соответственно, запускаться каждый из узлов должен в отдельном процессе, а параметры запуска будут выглядеть следующим образом:

Application.main(new String[]{"-f", "/flume/config/node1.conf", "-n", "node1"});  
    // для других узлов по аналогии:
    //Application.main(new String[]{"-f", "/flume/config/node2.conf", "-n", "node2"});  
    //Application.main(new String[]{"-f", "/flume/config/node3.conf", "-n", "node3"});

Можно убедиться в работоспособности этой конфигурации, скормив первому узлу текстовый файл с сотней строк (маленькие порции данных могут пачкой отправиться на один из узлов и желаемого эффекта разделения данных мы не увидим).

Заключение

Эта статья является ознакомительной, приведенные здесь примеры конфигурации узлов Flume могут пригодиться лишь для отладки или знакомства с этим инструментом. В реальных проектах топология Flume выходит далеко за рамки одного-двух узлов, а конфигурации компонентов являются куда более сложными.

В следующей статье:

  • Использование заголовков и канальных селекторов (Channel Selector).
  • «Боевые» компоненты Flume:
    • Avro Source;
    • File Channel;
    • Avro Sink;
    • HDFS Sink;
    • File Roll Sink.
  • Мониторинг состояния узла Flume.

Использованные источники и полезные ссылки

Автор: DCA (Data-Centric Alliance)

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js