Меня зовут Дмитрий и я вляпался в Airflow. Цель этой статьи — помочь начинающим пользователям Airflow ознакомиться с таблицами PostgreSQL. Время узнать насколько глубока аирфлоуольная нора.
Общая информация про Airflow
Airflow - это инструмент планирования и управления рабочими процессами, он использует базу данных для хранения метаданных о задачах, DAGs (Directed Acyclic Graphs), запусках и результатах.
База данных в Airflow:
-
SQLAlchemy
: Airflow использует библиотеку SQLAlchemy для взаимодействия с различными системами управления базами данных; -
SQLite
: По умолчанию Airflow использует встроенную базу данных SQLite, которая хранится в файле airflow.db. Этот вариант подходит для маленьких проектов, но не рекомендуется для продуктовой среды; -
Postgres, MySQL, MSSQL
: Для более крупных проектов и продуктовых сред рекомендуется использовать реляционные базы данных (Postgres, MySQL, MSSQL).
Как было сказано выше основной инструмент это SQLAlchemy
посредством которой проиcходит взаимодействие с БД. Рассказывать про то, как её развернуть и подключить я не буду, но есть замечательная статья https://habr.com/ru/articles/860900/. Упомяну, что настройка БД осуществляется в файле ~/airflow/airflow.cfg
в строчке sql_alchemy_conn
, где можно настроить подключение именно на постгрес. Сам постгрес настраивается отдельно.
Метаданные в Airflow:
-
Задачи: Информация о каждой задаче (имя, тип, зависимости, параметры и т.д.);
-
DAGs: Описание графика зависимостей между задачами;
-
Запуски: Информация о каждом запуске DAGs (дата и время запуска, статус и т.д.);
-
Результаты: Информация о результатах выполнения задач (успешно, с ошибкой, время выполнения и т.д.).
Таблицы и их содержимое
Далее коротко расскажу про некоторые таблицы и их поля.
Dag_code
-
fileloc_hash
- хеш дага; -
fileloc
- расположение файла дага в системе; -
last_updated
- дата последнего обновления дага; -
source_code
- весь код дага с управляющими символами.
Из данной таблички можно вытянуть весь код DAG и сохранить в файл для переиспользования или корректировки, а также использовать в системе контроля версий.
Dag_pickle
-
id
- идентификатор дага; -
pickle
- основное поле, хранит сериализованные данные дага; -
created_dttm
- информация о времени создания или обновления записи; -
pickle_hash
- хеш сериализованного объекта дага, позволяет Airflow быстро проверять изменился ли даг с последнего сохранения.
Отслеживать изменения дагов при их выполнении.
Log
-
id
- идентификатор записи; -
dttm
- дата и время записи события; -
dag_id
- идентификатор DAG, к которому относится событие; -
task_id
- идентификатор задачи внутри DAG; -
map_index
- если задач внутри DAG несколько, то можно так получить инстансы задачи; -
event
- тип события (например, success, failed, skipped, и т.д.). Это более высокоуровневая информация, чем подробные сообщения в log. -
execution_date
- дата и время выполнения задачи; -
owner
- автор DAG или группа пользователей, отвечающих за него; -
extra
- дополнительная информация в формате JSON, может выводить автор DAG, хост, полный путь выполнения задачи и т.п.
По необходимости проанализировать события для конкретных пользователей или для определенных задач. Стоит так же отметить, что место хранения логов определяется в вышеупомянтом конфигурационном файле airflow.cfg
в разделе [core], параметр base_log_folder
(если у вас версия 2.0 и выше, то иначе – в разделе [logging]), и по умолчанию Airflow сохраняет логи задач в файлах на локальной файловой системе сервера Airflow. Поэтому если своевременно не ухаживать за своими логами, то есть все шансы потратить кучу времени на очистку в будущем. Либо использовать удаленное хранилище для логов. Но никто не запрещает вам испытать на себе роль Нео. Тук-тук-тук, так сказать.
Slot_pool
-
id
: уникальный идентификатор записи в таблице; -
pool
: имя пула слотов. Это текстовая строка, которая используется для идентификации пула в Airflow; -
slots
: максимальное количество слотов, доступных в пуле. Это целое число, которое определяет, сколько задач могут выполняться одновременно из этого пула; -
description
: Дополнительное описание пула слотов (необязательное поле).
Из этой таблички мы можем узнать сколько можно в целом запускать DAGs одновременно. То есть таким образом мы можем распределять задачи в зависимости от потребностей и потребляемых ресурсов по разных пулам. Нет времени объяснять – просто запускай!
Connection
-
conn_id
: уникальный идентификатор соединения; -
conn_type
: тип соединения. Это определяет, к какой системе подключается соединение Postgres, MySQL, S3, Hive, OpenAI Cloud Storage и т.п.; -
description
: описание соединения. Полезно для документирования и понимания назначения соединения; -
host
: хост или адрес сервера, к которому подключается соединение. Например, это может быть имя хоста базы данных, URL-адрес веб-сервиса или имя файла; -
schema
: схема (или база данных) в пределах сервера, если это применимо. Это поле используется, например, при работе с базами данных, такими как PostgreSQL или MySQL; -
login
: имя пользователя для аутентификации на сервере; -
password
: пароль для аутентификации на сервере. Важно: В производственных средах не рекомендуется хранить пароли в открытом виде в таблице; -
port
: порт сервера, к которому подключается соединение. Обычно используется для баз данных или других сетевых служб; -
Если вы используете шифрование, то могут быть дополнительные поля
is_encrypted
иis_extra_encrypted
. Первое указывает зашифрован ли пароль, а второе зашифрованы ли дополнительные параметры; -
extra
: дополнительные параметры соединения в формате JSON.
Таблица connections в Airflow хранит информацию о соединениях с внешними системами, которые используются DAGs для доступа к данным или выполнения других операций.
Dag
-
dag_id
: уникальный идентификатор DAG. Да, это может быть и текст; -
root_dag_id
: идентификатор корневого DAG, если это под-DAG; -
is_paused
: DAG приостановлен? ЕслиTRUE
, DAG не будет запускаться автоматически; -
is_subdag
: DAG является под-DAGом? -
is_active
: DAG активен? -
last_parsed_time
: время последней успешной парсинга DAG-файла. Когда Airflow загружает DAG из файла, он записывает время в это поле; -
last_pickled
: сериализованный (pickled) объект DAG. Это позволяет Airflow быстро загружать DAG из базы данных, минуя повторный парсинг кода. В современных версиях Airflow это поле используется реже из-за перехода на более эффективные методы сериализации. -
last_expired
: время когда DAG получал сигнал об обновлении; -
scheduler_lock
: время установки блокировки планировщиком. Это предотвращает одновременный доступ к DAG нескольким планировщикам; -
pickle_id
: Идентификатор записи в таблице dag_pickle. -
Fileloc
: путь к файлу DAG в файловой системе; -
processor_subdir
: подкаталог для обработки DAG. -
owners
: владельцы DAG (обычно в формате JSON). -
description
: описание DAG. -
default_view
: предпочтительный способ просмотра DAG в интерфейсе Airflow. -
schedule_interval
: интервал планирования DAG (например, @daily, 0 0 * * *). -
timetable_description
: описание расписания запусков. -
max_active_tasks
: максимальное количество одновременно выполняемых задач в DAG. -
max_active_runs
: максимальное количество одновременно выполняемых запусков DAG. -
has_task_concurrency_limits
: есть ли ограничения на параллелизм задач? -
has_import_errors
: есть ли ошибки импорта в DAG-файле? -
next_dagrun
: планируемое время следующего запуска DAG. -
next_dagrun_data_interval_start
: начало интервала данных для следующего запуска. -
next_dagrun_data_interval_end
: конец интервала данных для следующего запуска. -
next_dagrun_create_after
: время, после которого можно создать следующий запуск DAG.
Таблица dag
(или dag_model
в более новых версиях Airflow) в Airflow хранит метаданные о ваших DAGs (Directed Acyclic Graphs). Это центральное хранилище информации о всех ваших DAG-файлах, их состоянии и истории выполнения.
Какая статья без кода. Посмотрим, сколько активных дагов, на паузе, активных на паузе, всего дагов, и какую долю составляют вышеупомянутые.
WITH dags AS (
SELECT
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True) AS active,
(SELECT COUNT(dag_id) FROM dag WHERE is_paused = True) AS paused,
(SELECT COUNT(dag_id) FROM dag WHERE is_active = True AND is_paused = True) AS a_n_p,
(SELECT COUNT(dag_id) FROM dag) AS all_dags
)
SELECT
active,
paused,
a_n_p,
all_dags,
CAST(active AS FLOAT) * 100 / all_dags AS active_ratio,
CAST(paused AS FLOAT) * 100 / all_dags AS paused_ratio,
CAST(a_n_p AS FLOAT) * 100 / all_dags AS a_n_p_ratio
FROM dags;
Вы сможете это протестить самостоятельно(при некоторых условиях), а у меня получилось следующее:
Активных |
Паузанутых |
Активно паузанутых |
Всего |
Доля активных |
Доля паузанутых |
Доля активно паузанутых |
360 |
278 |
166 |
574 |
45% |
35% |
20% |
И всё в целом хорошо, но как могут быть активно паузанутые?
А это вопрос для самоизучения. Мало ли на практике нужно будет проанализировать DAGs, а тут такая подстава.
Log_template
-
id
: уникальный идентификатор записи лога; -
filename
: путь к файлу лога; -
elasticsearch_id
: ID записи в Elasticsearch (если используется Elasticsearch для хранения логов). Это поле будет NULL, если Elasticsearch не используется; -
created_at
: время создания записи лога.
Таблица log_template
в Airflow используется для хранения шаблонов имен файлов логов. Эти шаблоны определяют, как Airflow формирует имена файлов для логов задач. Она не так часто используется напрямую разработчиками DAG, а больше управляется самой системой Airflow. Так же в таблице может присутствовать поле template, в котором мы можем задать шаблон для записи логов. Как и определить это самостоятельно новым классом/методом и т.п. Это довольно сложный и специфический процесс под конкретные цели и задачи, со своими плюсами и минусами. Главное - оно работает из коробки...
Ab_user
-
id
: уникальный идентификатор пользователя; -
first_name
: имя пользователя; -
last_name
: фамилия пользователя; -
username
: имя пользователя для входа, иногда известно как логин, но это уже совсем другая история; -
password
: хешированный пароль; -
active
: указывает, активен ли аккаунт пользователя; -
email
: адрес электронной почты пользователя; -
last_login
: время последнего(если кому-то это очень важно, то крайнего) входа пользователя; -
login_count
: количество успешных входов пользователя; -
fail_login_count
: количество неудачных попыток входа пользователя; -
created_on
: время создания аккаунта пользователя; -
changed_on
: время последнего изменения данных пользователя; -
created_by_fk
: идентификатор пользователя, который создал данного; -
changed_by_fk
: идентификатор пользователя, который последний раз изменил данные этого пользователя.
Таблица ab_user содержит базовую информацию о пользователях Airflow, необходимую для управления доступом и аутентификации. В зависимости от вашей конфигурации, она может содержать дополнительные поля для более сложного управления доступом и пользовательскими профилями.
В этой статье мы рассмотрели структуру нескольких ключевых таблиц в базе данных Airflow, включая dag, connections, ab_user и таблицы логов. Понимание структуры этих таблиц необходимо для эффективного управления и мониторинга рабочих процессов в Airflow. Более подробную информацию можно найти в официальной документации Airflow.
Автор: LunarBirdMYT