В данной статье речь пойдет об использовании открытой платформы Apache Flink для обнаружения цепочки последовательности событий. Статья подойдет как для начинающих разработчиков в области обработки потоковых данных, так и для тех, кто желает познакомиться с Apache Flink.
Ни для кого не секрет, что на данный момент существуют различные подходы к обработке, хранению, фильтрации и анализу больших данных. В отдельный класс можно выделить системы, построенные на событийной архитектуре (Event-Driven Architecture). Данные системы призваны решать различные задачи, в том числе в режимах близких к реальному времени. Одной из таких задач является обнаружение (детектирование, идентификация) сложных цепочек связанных событий на больших входных потоках данных (FlinkCEP — Pattern Detection). Обычно, данная задача, решается системами комплексной обработки событий (CEP), которые должны обрабатывать сотни, а порой и тысячи определенных пользователем шаблонов на входном потоке данных в поисках определенного события, аномалий, системах мошенничества и даже предсказании будущего на основе текущих событий. В статье речь пойдет о библиотеке FlinkCep Apache Flink, которая позволяет решать подобные проблемы.
Данный механизм позволяет найти объект по запросу (шаблону) в бесконечном потоке входных данных. Объектом характеризуется все что угодно, чему можно задать поведение, либо определить последовательность событий. Шаблоном выступает набор неких действий, которые может осуществлять объект. Как только данные удовлетворят заданному шаблону, то система идентифицирует объект, как искомый, и выдает результат. Следственно, место для использования механизма шаблонов — это идентификация определенной модели поведения объекта.
Преимущества такого подхода заключается в том, что данные обрабатываются незамедлительно! А теперь давайте рассмотрим подробнее теорию и примеры.
Что же такое цепочка событий?
В качестве очень простого примера цепочки событий можно взять входной поток данных действий пользователя на сайте, например, по подбору машин. Сначала наш пользователь открыл личный кабинет, затем выбрал комплектацию машины, затем посмотрел ставки по кредиту. Цепочку из трех действий получили следующую:
Три действия модели поведения пользователя на сайте. При входном потоке данных, нам необходимо выявить пользователей, которые сначала подобрали себе комплектацию машины, а затем посмотрели ставки по кредиту. Допустим, это позволит нам отправить им выгодное предложение по кредиту и увеличит конверсию по нашему банковскому продукту.
Можно привести бытовой пример цепочки событий. Отслеживать нестабильность температуры в котле по показателям датчика. В объекте температура сначала увеличилась, затем уменьшилась, затем опять увеличилась в рамках определенного временного интервала. Тогда цепочка событий на изменение температуры может выглядеть следующим образом:
Как и в примере выше, здесь отображается определенная модель поведения объекта, которую мы можем найти во входном потоке данных. При выявлении данной цепочки мы можем сигнализировать диспетчеру о том, что обнаружены неполадки в системе.
Используя Flink Pattern CEP можно выявлять любые цепочки событий, как в реальном времени, так и при обработке имеющихся массивов данных.
Pattern CEP (Шаблон), называется заранее известная модель поведения объекта. В более простом случае, с температурой, мы знаем, что у нас есть три действия. которые необходимо описать в шаблоне — поднялась, уменьшилась и опять поднялась, предположим, за 5 минут. Если представить это на псевдоязыке, то будет выглядеть примерно так:
PatternTempUp()_THEN_PatternTempDown()_THEN_PatternTempUp()_Within(5 min)
Шаблон нам известен, поток событий дан, Flink все найдет!
А что, если мы хотим усложнить задачу и на основе уже известного шаблона задать еще один? Мы знаем механизм выявления нестабильности температуры, но также хотим контролировать, например, изменение давления в объекте. И с данной проблемой FlinkСep благополучно справляется. Он позволяет делать сложные подзапросы на основе простых и создавать тем самым последовательность уже самих шаблонов. Для более легкого восприятия, мы будем называть одиночные шаблоны — простыми, а сложные — последовательностью шаблонов.
Все хорошее начинается со слова «begin»
Основной класс для объявления последовательности событий — класс Pattern, который содержит в себе статический метод begin.
.....
/**
* Начало новой последовательности шаблонов. Устанавливается базовый тип шаблона и имя инициализации шаблона
* @param name Имя стартового шаблона новой последовательности шаблонов
* @param <X> Базовый тип событий шаблонов
* @return Первый шаблон последовательности шаблонов
*/
public static <X> Pattern<X, X> begin(final String name) {
return new Pattern<>(name, null, ConsumingStrategy.STRICT, AfterMatchSkipStrategy.noSkip());}
.......
Соответственно, точкой входа является метод begin класса Pattern, неважно, простой у вас шаблон или последовательность шаблонов, старт начинается c begin. Пусть у нас событие будет связано с поведением клиента на сайте, следовательно класс, который представляет данное событие — ClientEvent.
var patternFirst = Pattern.<ClientEvent>begin("First_Pattern_sequence")
Важное правило, которое необходимо соблюдать — это каждый шаблон должен иметь имя и это имя должно быть уникально в рамках одного Flink-обработчика! Как мы видели выше, имя передается в аргументы метода begin.
Средства Pattern API
Шаблоны могут иметь одиночный или циклический характер. Мы можем как угодно настроить порядок срабатывания шаблона. Всего один раз, не чаще двух раз в час, два и более раз и любые другие возможные варианты. Данная настройка называется квантификатором («Quantifiers») и имеет следующие типы:
Имя метода | Описание | Пример |
---|---|---|
Times(value) | Ожидаемое число вхождениий шаблона. Value — заданное количество раз | Pattern.times(3) — ожидается 3 вхождения |
Times(#FromValue, #ToValue) | Вхождений шаблона может быть в диапазоне от #FromValue до #ToValue | Pattern.times(0,3) — ожидается от 0 до 3 вхождений |
Optional() | Указывает, что этот шаблон является необязательным, то есть он может вообще не появляться | Pattern.times(3).optional() — ожидается либо 0 либо 3 вхождений |
Greedy() | Вхождений шаблона должно быть как можно больше | Pattern.times(3,5).greedy() — ожидается 3-5 вхождений и повторяем как можно больше. Pattern.times(2,4).optional().greedy() — ожидается 0, 2-4 вхождений и повторяем как можно больше |
OneOrMore() | Указывает на наличие хотя бы одного вхождения. Примечание. Может использоваться только с until() или within() | Pattern.oneOrMore() — ожидается хотя бы одно вхождение. Pattern.oneOrMore().optional() — ожидается 0 или более вхождений |
Самое важное — это задать условия по которым определяется модель поведения. Очевидно, что используются для этого операторы — условия, которые позволяют учитывать любые операции сравнения. В таблице ниже представлены данные операторы:
Операции шаблона | Описание |
---|---|
Where(condition) | Определяет условия, которые должны быть применены к событию. В методе where используются любые операции сравнения, результатом которых является тип boolean. |
|
|
Or(condition) | Обычный "или", который добавляет еще одно условие к поиску. В методе or используются любые операции сравнения, результатом которых является тип boolean. Или условие в методе "where" либо в "or". |
|
|
Until(condition) | Определяет условие остановки поиска. В методе until используются любые операции сравнения, результатом которых является тип boolean. Когда необходимое условие выполнено, больше событий принято не будет. Данное условие применимо, только с операцией oneOrMore(). |
|
|
Subtype(SubClass) | Определяет подтип условия. Событие соответствует шаблону, если имеет данный тип. |
|
Формирование шаблона состоит из трех действий: создание шаблона, указание ему условий поиска и срабатывания. Но как же сами шаблоны объединить в цепочку действий? Для этого существуют операторы, которые позволяют устанавливать взаимосвязи между шаблонами, переиспользовать уже созданные, указывать время жизни и формировать поиск для каждого события. Данные операторы бывают трех типов:
- Strict Contiguity — строгая связь, ожидается, что все совпадающие события должны выполниться строго друг за другом;
- Relaxed Contiguity — слабая связь, допускает игнорирование несовпадающих событий, которые появляются между совпадающими;
- Non-Deterministic Relaxed Contiguity — еще более слабая связь, позволяет дополнительные совпадения, которые могут пропускать некоторые совпадающие события.
Операции шаблона | Описание |
---|---|
Begin(#name) | Определяет старт шаблона на основе имени шаблона
|
Begin(#pattern) | Определяет старт шаблона на основе другого шаблона (вложенная структура)
|
Next(#name) | Добавляет новый шаблон на основе имени. Событие должно непосредственно следовать за предыдущим найденным событием (тип strict contiguity)
|
Next(#pattern) | Добавляет новый шаблон на основе другого шаблона. Последовательность событий должна непосредственно следовать за предыдущим найденным событием (тип strict contiguity)
|
FollowedBy(#name) | Добавляет новый шаблон на основе имени. События могут происходить между совпадающим событием и предыдущим найденным событием (тип relaxed contiguity)
|
FollowedBy(#pattern) | Добавляет новый шаблон на основе шаблона. События могут происходить между последовательностью совпадающих событий и предыдущим найденным событием (тип relaxed contiguity)
|
FollowedByAny(#name) | Добавляет новый шаблон на основе имени. События могут происходить между совпадающим событием и предыдущим найденным событием, а также другие совпадения будут представлены для каждого совпадающего события(тип non-deterministic relaxed contiguity)
|
FollowedByAny (#pattern) | Добавляет новый шаблон на основе шаблона. События могут происходить между совпадающим событием и предыдущим найденным событием, а также другие совпадения будут представлены для каждого совпадающего события(тип non-deterministic relaxed contiguity)
|
NotNext(#name) | Добавляет новый отрицательный шаблон. Событие (отрицательное) должно непосредственно следовать за предыдущим найденным событием (тип strict contiguity)
|
NotFollowedBy(#name) | Добавляет новый отрицательный шаблон. Частично совпадающая последовательность событий будет отброшена, даже если между совпадающим (отрицательным) событием и предыдущим совпадающим событием происходят другие события (тип relaxed contiguity)
|
Within (time) | Определяет максимальный интервал времени, за который последовательность событий должны быть сформирована. Если существует незавершенная последовательность событий, она отбрасывается
|
Теория – это, конечно, хорошо, но нагляднее будет рассмотреть поведение системы на конкретном простеньком примере.
Пример использования Flink Pattern CEP
Создаем проект на Java + Maven. В зависимости добавляем Flink-streaming-java и Flink-Cep.
<properties>
<flink.version>1.9.1</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
Создаем класс события (Event), для более легкого восприятия примера, класс будет содержать всего одно поле — id.
@Data
@AllArgsConstructor
public class Event {
private Integer id;
}
Далее добавим простой входной источник данных с циклом и случайными значениями для id от 1 до 3.
public class MonitoringEvent implements SourceFunction<Event> {
@Override
public void run(SourceContext sourceContext) {
Random random1 = new Random();
for (int i = 0; i < 10; i++) {
Event Event = new Event(random1.nextInt(3));
System.out.println("Source " + Event);
sourceContext.collect(Event);
}
}
@Override
public void cancel() {}
}
Создадим три примера на условия Strict Contiguity, Relaxed Contiguity и Non-Deterministic Relaxed Contiguity. С условием, что во входном потоке данных ищется сначала событие с id=1, а затем с id=2.
- Strict Contiguity
Ищется строго во входном потоке данных условие — сначала id равен 1, затем должно быть событие с id равным 2. В случае, если имеется между двумя данными событиями появляются другие, то последовательность сбрасывается и начинает ждать заново событие с id равным 1.Пример логов:
Source Event(id=1) Source Event(id=1) Source Event(id=2) Source Event(id=2) Source Event(id=2) Source Event(id=2) Source Event(id=1) Source Event(id=1) Source Event(id=0) Source Event(id=1) result > <u>{first=[Event(id=1)], next=[Event(id=2)]}</u>
Код шаблона для Strict ContiguityStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Event> inputEventStream = env.addSource(new MonitoringEvent()) .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); Pattern<Event, ?> warningPattern = Pattern.<Event>begin("first").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId() == 1; } }).next("next").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId() == 2; } }); PatternStream<Event> idEventStream = CEP.pattern(inputEventStream, warningPattern); DataStream<Map<String, List<Event>>> warnings = idEventStream.select(new PatternSelectFunction<Event, Map<String, List<Event>>>() { @Override public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception { return pattern; } }); warnings.print(); env.execute("monitoring job");
- Relaxed Contiguity
Ищется во входном потоке данных условие — сначала id равен 1, затем должно быть событие с id равным 2. В случае, если имеется между двумя данными событиями появляются другие, то последовательность не сбрасывается и считает несколько совпадений с текущим событием.После нахождения последовательность сбрасывается.Пример логов:
Source Event(id=0) Source Event(id=0) Source Event(id=1) Source Event(id=1) Source Event(id=2) Source Event(id=2) Source Event(id=1) Source Event(id=1) Source Event(id=0) Source Event(id=0) result > <u>{first=[Event(id=1)], followedBy=[Event(id=2)]}</u> result > <u>{first=[Event(id=1)], followedBy=[Event(id=2)]}</u>
Код шаблона для Relaxed ContiguityStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Event> inputEventStream = env.addSource(new MonitoringEvent()) .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); Pattern<Event, ?> warningPattern = Pattern.<Event>begin("first").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId() == 1; } }).followedBy("followedBy").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId() == 2; } }); PatternStream<Event> idEventStream = CEP.pattern(inputEventStream, warningPattern); DataStream<Map<String, List<Event>>> warnings = idEventStream.select(new PatternSelectFunction<Event, Map<String, List<Event>>>() { @Override public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception { return pattern; } }); warnings.print(); env.execute("monitoring job");
- Non-Deterministic Relaxed Contiguity
Ищется во входном потоке данных условие — сначала id равен 1, затем должно быть событие с id равным 2. В случае, если имеется между двумя данными событиями появляются другие, то последовательность не сбрасывается и считает несколько совпадений для каждого события. Результатом будет количество совпадений для каждого значения в потоке данных.Пример логов:
Source Event(id=0) Source Event(id=1) Source Event(id=1) Source Event(id=1) Source Event(id=1) Source Event(id=1) Source Event(id=2) Source Event(id=0) Source Event(id=1) Source Event(id=0) result > <u>{first=[Event(id=1)], followedByAny=[Event(id=2)]}</u> result > <u>{first=[Event(id=1)], followedByAny=[Event(id=2)]}</u> result ><u> {first=[Event(id=1)], followedByAny=[Event(id=2)]}</u> result ><u> {first=[Event(id=1)], followedByAny=[Event(id=2)]}</u> result ><u> {first=[Event(id=1)], followedByAny=[Event(id=2)]}</u>
Код шаблона для Non-Deterministic Relaxed ContiguityStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<Event> inputEventStream = env.addSource(new MonitoringEvent()) .assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); Pattern<Event, ?> warningPattern = Pattern.<Event>begin("first").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId() == 1; } }).followedByAny("followedByAny").where(new IterativeCondition<Event>() { private static final long serialVersionUID = 2392863109523984059L; @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId() == 2; } }); PatternStream<Event> idEventStream = CEP.pattern(inputEventStream, warningPattern); DataStream<Map<String, List<Event>>> warnings = idEventStream.select(new PatternSelectFunction<Event, Map<String, List<Event>>>() { @Override public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception { return pattern; } }); warnings.print(); env.execute("monitoring job");
Стратегии пропуска AfterMatchSkipStrategy
Одной из важных характеристик настройки совпадений событий по шаблонам, является стратегия пропуска. Чтобы контролировать, сколько и как совпадения будут назначены событию, необходимо указать стратегию пропуска при создании шаблона в аргументы метода (то есть в begin).
Пусть имеется входной поток данных типа String, содержащий следующие значения: a1, a2, b3, a4, b5, b6, a7 и шаблоном:
Pattern<EventSkipStrategy, ?> warningPattern = Pattern.<EventSkipStrategy>begin("first")
.where(new IterativeCondition<EventSkipStrategy>() {
@Override
public boolean filter(EventSkipStrategy value, Context<EventSkipStrategy> ctx) {
return value.getId().contains("a");
}
}).times(2).next("next").where(new IterativeCondition<EventSkipStrategy>() {
@Override
public boolean filter(EventSkipStrategy value, Context<EventSkipStrategy> ctx) {
return value.getId().contains("b");
}
});
По шаблону необходимо найти 2 события подряд, которые содержат символ а, затем строго следом должно идти событие, которое содержит символ b. Существует пять типов стратегий пропуска. Рассмотрим их подробно:
- NO_SKIP: будут выбраны все возможные совпадения.Является стратегией по умолчанию для всех шаблонов. Для исходного входного потока получаем следующий результат:
{first=[Event(id=a1), Event(id=a2)], next=[Event(id=b3)]} {first=[Event(id=a2), Event(id=a4)], next=[Event(id=b5)]}
Все возможные совпадения для каждого элемента входного потока.
- SKIP_TO_NEXT: отбрасывает каждое частичное совпадение, которое началось с того же события, с которого совпадение было начато. Старт шаблона выглядит следующим образом:
Pattern.<Event>begin("first", AfterMatchSkipStrategy.skipToNext())
Для исходного входного потока получаем следующий результат:
{first=[Event(id=a1), Event(id=a2)], next=[Event(id=b3)]} {first=[Event(id=a2), Event(id=a4)], next=[Event(id=b5)]}
Видим, что результат работы SKIP_TO_NEXT идентичен NO_SKIP. Для лучшего понимания разницы между ними, будет нагляднее изменить входной поток данных и добавить количество вхождений для b: a1,a2, b3, b4, b5.
Код шаблонаPattern<Event, ?> warningPattern = Pattern.<Event>begin("first") .where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId().contains("a"); } }).times(2).next("next").where(new IterativeCondition<Event>() { @Override public boolean filter(Event value, Context<Event> ctx) throws Exception { return value.getId().contains("b"); } }).oneOrMore();
Стратегия пропуска Результат работы шаблона Описание NO_SKIP a1,a2,b3
a1,a2,b3,b4
a1,a2,b3,b4,b5После того, как нашлось совпадение b3, процесс не отбрасывает результаты, а продолжает сопоставлять совпадения SKIP_TO_NEXT a1,a2,b3
После того, как нашлось совпадение b3, процесс отбрасывает результаты и события b4 и b5 уже не попадут в накопление - SKIP_PAST_LAST_EVENT: отбрасывает каждое частичное совпадение, начала или конца потока входных данных. Старт шаблона выглядит следующим образом:
Pattern.<Event>begin("first", AfterMatchSkipStrategy.skipPastLastEvent())
Для исходного входного потока получаем следующий результат:
{first=[Event(id=a1), Event(id=a2)], next=[Event(id=b3)]}
Соответственно отбрасывает последние данные a4, так как не удовлетворяют условиям поиска.
- SKIP_TO_FIRST: отбрасывает каждое частичное совпадение, которое началось после начала совпадения, но до того, как произошло первое событие от имени указанного шаблона. В отличие от других стратегий пропуска, данная настройка требует указать имя шаблона по которому будет осуществлено дальнейшее совпадение.
Совпадение по последнему событию от шаблона с именем 'first'
{first=[Event(id=a1), Event(id=a2)], next=[Event(id=b3)]} {first=[Event(id=a2), Event(id=a4)], next=[Event(id=b6)]}
Совпадение по последнему событию от шаблона c именем 'next'{first=[Event(id=a1), Event(id=a2)], next=[Event(id=b3)]}
- SKIP_TO_LAST: отбрасывает каждое частичное совпадение, которое началось после начала совпадения, но до того, как произошло последнее событие от имени указанного шаблона. Данная настройка также требует указать имя шаблона по которому будет осуществлено дальнейшее совпадение.
Для входного потока: ab1, ab2, a3, ab4, ab5, b6, ab7 получаем следующие результаты:
Совпадение по последнему событию от шаблона с именем 'first'{first=[Event(id=ab2), Event(id=a3)], next=[Event(id=ab4)]} {first=[Event(id=a3), Event(id=ab4)], next=[Event(id=ab5)]} {first=[Event(id=ab4), Event(id=ab5)], next=[Event(id=b6)]}
Совпадение по последнему событию от шаблона c именем 'next'{first=[Event(id=ab2), Event(id=a3)], next=[Event(id=ab4)]} {first=[Event(id=ab4), Event(id=ab5)], next=[Event(id=b6)]}
Заключение
Pattern FlinkCep позволяет с легкостью анализировать потоки событий и детектировать нужные данные. Он является мощным инструментом анализа потоковых событий и выявления нужных данных по объектам мониторинга, при этом обладает простым, хорошо документированным API и не требует написания большого количества кода. Диапазон применения данной библиотеки может быть очень обширным, начиная от антифрода и до анализа логов в системе.
P.S.Ссылка на гитхаб с примерами, используемыми в статье.
Рабочий пример об изменении температуры от разработчиков Apache Flink.
Автор: Анна