На первый взгляд, в этой истории есть всё, чтобы заслужить статус романтичного поста накануне 8 марта: самолёты, любовь, чуточку шпионажа и, наконец, котик (точнее, кошка). Трудно представить, что всё это имеет самое непосредственное отношение к Kafka, KSQL и эксперименту «как в домашних условиях с помощью информационных технологий найти самый шумный самолёт». Трудно, но придётся: именно такой эксперимент провёл Саймон Обьюри, а мы перевели статью его авторства с описанием всех подробностей процесса.
Наша новая кошка по имени Снежок просыпается рано. Ее будят звуки самолетов, пролетающих над нашим домом. А что если бы я, используя Apache Kafka, KSQL и Raspberry Pi, смог определить, какой именно самолет не дает моей кошке спать? Хорошо бы еще создать занятную панель слежения, на которую кошка могла бы переключить свое внимание — и дать мне ещё немножко поспать.
В общих чертах
Переносим самолеты с неба в графики с помощью Kafka и KSQL
Самолеты определяют свое местоположение с помощью GPS приемников. Бортовой передатчик периодически сообщает локацию, идентификационный номер, высоту и скорость корабля, используя короткие радиопередачи. Эти передачи вещательного автоматического зависимого наблюдения (АЗН-В) являются по сути пакетами данных, открытыми для доступа с наземных станций.
Один микрокомпьютер, такой как Raspberry Pi, и несколько вспомогательных компонентов — это все, что требуется для получения сообщений бортовых передатчиков самолетов, снующих над моим домом.
Бортовые передачи самолетов выглядят как запутанный клубок сообщений и требуют систематизации. Распознать эти хаотичные потоки данных — это все равно что подслушать беседу на шумной вечеринке. Поэтому, чтобы найти самолет, который тревожит мою кошку, я решил использовать сочетание Kafka и KSQL.
Разбуженная кошка и Raspberry Pi
Сбор показаний АЗН-В с помощью Raspberry Pi
Для сбора бортовых передач я использовал Raspberry Pi и RTL2832U — USB-модем, изначально продававшийся для просмотра цифрового ТВ на компьютере. На Raspberry Pi я установил dump1090 — программу, которая получает данные с АЗН-В через RTL2832U с помощью небольшой антенны.
Мой программный радиоприемник из Raspberry Pi и RTL2832U
Преобразуем сигналы АЗН-В в темы Kafka
Теперь, когда я получил поток необработанных сигналов АЗН-В, нам следует обратить внимание на трафик. Raspberry Pi не имеет достаточной мощности для серьезных вычислений, поэтому мне придется передать обработку данных моему локальному кластеру на Kafka.
Получаемые сообщения делятся либо на сообщения о локации, либо на сообщения об идентификации борта. Локация будет иметь выглядеть как борт 7c6db8 летит на высоте 6,250 футов в координате -33.8,151.0. Сообщение об идентификации борта будет выглядеть как борт 7c451c совершает полет по маршруту QJE1726.
Небольшой скрипт на Python, работающий на Raspberry Pi, разделяет входящие сообщения АЗН-В. Я использовал прокси-сервер Confluent Rest Proxy для распределения данных с Raspberry Pi в темы location-topic и ident-topic на Kafka. Прокси-сервер предоставляет RESTful интерфейс для кластера Kafka, позволяя легко создавать сообщения путем простого REST-вызова на Pi.
Я хотел понять, какие самолеты летали над моей крышей и по каким маршрутам. База данных OpenFlights позволяет сопоставить код авиаборта, например 7C6DB8, присвоенный Международной организацией гражданской авиации (ИКАО), с типом самолета — в нашем случае «Боинг-737». Я загрузил данные моего картографирования в тему icao-to-aircraft.
KSQL предоставляет “SQL-движок”, который дает возможность обработки данных в реальном времени по темам Apache Kafka. Например, чтобы найти бортовой код 7C6DB8, мы можем написать запрос следующим образом:
CREATE TABLE icao_to_aircraft WITH (KAFKA_TOPIC='ICAO_TO_AIRCRAFT_REKEY', VALUE_FORMAT='AVRO', KEY='ICAO');
ksql> SELECT manufacturer, aircraft, registration
FROM icao_to_aircraft
WHERE icao = '7C6DB8';
Boeing | B738 | VH-VYI
Аналогично, в тему callsign-details я загрузил позывные (т. е. QFA563, это рейс авиакомпании Qantas из Брисбена в Сидней).
CREATE TABLE callsign_details WITH (KAFKA_TOPIC='CALLSIGN_DETAILS_REKEY', VALUE_FORMAT='AVRO', KEY='CALLSIGN');
ksql> SELECT operatorname, fromairport, toairport
FROM callsign_details
WHERE callsign = 'QFA563';
Qantas | Brisbane | Sydney
Теперь давайте взглянем поток данных location-topic. Мы можем наблюдать постоянный поток входящих сообщений о местоположении пролетающего самолета.
kafka-avro-console-consumer --bootstrap-server localhost:9092 --property --topic location-topic
{"ico":"7C6DB8","height":"6250","location":"-33.807724,151.091495"}
Запрос на KSQL будет выглядеть так:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yyyy-MM-dd HH:mm:ss'),
ico, height, location
FROM location_stream
WHERE ico = '7C6DB8';
2018-09-19 07:13:33 | 7C6DB8 | 6250.0 | -33.807724,151.091495
KSQL: гармонизация потоков...
Настоящая ценность KSQL лежит в возможности объединения входящих потоков данных о местоположении с исходными данными тем (см. 03_ksql.sql) — то есть, добавлении полезных сведений к необработанному потоку данных. Это очень похоже на left join в традиционной базе данных. Результатом является еще одна тема Kafka, произведенная без единой строчки кода Java!
CREATE STREAM location_and_details_stream AS
SELECT l.ico, l.height, l.location, t.aircraft
FROM location_stream l
LEFT JOIN icao_to_aircraft t ON l.ico = t.icao;
К тому же вы получаете запрос KSQL. Поток данных будет выглядеть так:
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss')
, manufacturer
, aircraft
, registration
, height
, location
FROM location_and_details_stream;
18-09-27 09:53:28 | Boeing | B738 | VH-YIA | 7225 | -33.821,151.052
18-09-27 09:53:31 | Boeing | B738 | VH-YIA | 7375 | -33.819,151.049
18-09-27 09:53:32 | Boeing | B738 | VH-YIA | 7425 | -33.818,151.048
Помимо этого, мы можем объединить входящий поток callsign с фиксированной темой callsign_details:
CREATE STREAM ident_callsign_stream AS
SELECT i.ico
, c.operatorname
, c.callsign
, c.fromairport
, c.toairport
FROM ident_stream i
LEFT JOIN callsign_details c ON i.indentification = c.callsign;
ksql> SELECT TIMESTAMPTOSTRING(rowtime, 'yy-MM-dd HH:mm:ss')
, operatorname
, callsign
, fromairport
, toairport
FROM ident_callsign_stream ;
18-09-27 13:33:19 | Qantas | QFA926 | Sydney | Cairns
18-09-27 13:44:11 | China Eastern | CES777 | Kunming | Sydney
18-09-27 14:00:54 | Air New Zealand | ANZ110 | Sydney | Auckland
Теперь у нас есть две информативные темы:
1. location_and_details_stream, которая обеспечивает поток обновленной информации о местоположении и скорости самолета;
2. ident_callsign_stream, которая описывает подробности рейса, в том числе авиакомпанию и пункт назначения.
С этими постоянно обновляемыми темами мы можем создать несколько отличных обзорных панелей. Я использовал Kafka Connect, чтобы выгрузить темы Kafka, заполняемые KSQL, в Elasticsearch (полные скрипты здесь).
Обзорная панель Kibana
Вот пример обзорной панели, демонстрирующей местоположение самолета на карте. Кроме того, вы можете увидеть диаграмму по авиакомпаниям, график высоты полета и облака слов по основным пунктам назначения. Тепловая карта показывает районы сосредоточения самолетов, т.е. с наивысшим уровнем шума.
Назад, к кошке
Сегодня моя кошка разбудила меня в районе 6 часов утра. Может ли KSQL помочь мне найти тот самолет, который пролетал в это время над моим домом на высоте меньше 3,500 футов?
select timestamptostring(rowtime, 'yyyy-MM-dd HH:mm:ss')
, manufacturer
, aircraft
, registration
, height
from location_and_details_stream
where height < 3500 and rowtime > stringtotimestamp('18-09-27 06:10', 'yy-MM-dd HH:mm') and rowtime < stringtotimestamp('18-09-27 06:20', 'yy-MM-dd HH:mm');
2018-09-27 06:15:39 | Airbus | A388 | A6-EOD | 2100.0
2018-09-27 06:15:58 | Airbus | A388 | A6-EOD | 3050.0
Потрясающе! Я могу определить самолет, оказавшийся над моей крышей в 6:15 утра. Оказывается, Снежка разбудил Airbus А380 (огромный самолет, кстати), летевший в Дубай.
Всего пара выходных дней, и у вас есть система потоковой обработки с KSQL. Которая, к тому же, позволяет быстро найти интересные события данных. Хотя Снежок может отнестись к ним скептически.
Автор: Анастасия Овсянникова