Доброго времени суток! У нас открылось совершенно новое направление обучения — BigData, а это значит, что немного расширяется горизонт материалов, которыми мы будем делиться с вами. Сегодня рассмотрим Luigi, как часть того, что раскрывается на нашем курсе.
Luigi — фреймворк на языке Python для построения сложных последовательностей по выполнению зависимых задач. Довольно большая часть фреймворка направлена на преобразования данных из различных источников (MySql, Mongo, redis, hdfs) и с помощью различных инструментов (от запуска процесса до выполнения задач разных типов на кластере Hadoop). Разработан в компании Spotify и открыт в виде open source инструмента в 2012 году.
Самое главное преимущество фреймворка — возможность выстраивать последовательности зависимых задач. Фреймворк разрешает зависимости, отслеживает граф выполнения, управляет запуском задач, обрабатывает ошибки с возможностью перезапуска нужных задач, распределяет ресурсы рабочих процессов с возможностью параллельной работы независимых частей графа задач.
Для выполнения всех этих задач существуют и другие инструменты. Это Oozie, Pinball, Airflow (находится в статусе инкубации в Apache — проходит различные проверки, недавно вышел обзор на хабре). В данной статье рассмотрим только Luigi.
Установка и документация
Для установки можно воспользоваться командой:
pip install luigi
Документация доступна тут
Задача (Task)
В файле luigi_demo_tasks.py определяем класс, наследуемый от luigi.Task. Добавляем вызов run для возможности запуска из консоли.
from luigi import Task, run
class MyTask(Task):
pass
if __name__ == '__main__':
run()
Запускаем. Дополнительно указываем опцию --local-scheduler, чтобы пока что не обращаться к центральному планировщику задач.
python -m luigi_demo_tasks MyTask --local-scheduler
Примечание. В документации указан другой способ запуска без вызова run и с добавлением директории в PYTHONPATH.
Видим следующий результат:
DEBUG: Checking if MyTask() is complete
/usr/local/lib/python3.4/dist-packages/luigi/worker.py:334: UserWarning: Task MyTask() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task MyTask__99914b932b has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) running MyTask()
INFO: [pid 5369] Worker Worker(salt=920153035, workers=1, host=your_host username=username, pid=5369) done MyTask()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task MyTask__99914b932b has status DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=920153035, workers=1, host=your_host, username=username, pid=5369) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 ran successfully:
- 1 MyTask()
This progress looks :) because there were no failed tasks or missing external dependencies
===== Luigi Execution Summary =====
В сообщениях видим, что задача MyTask ставится на выполнение и успешно выполняется. Повторный запуск дает точно такой же результат.
Сделаем теперь так, чтобы MyTask выполнял некоторую работу. Для этого переопределим метод run из базового класса:
from luigi import Task, run
class MyTask(Task):
def run(self):
print("Hello world!")
if __name__ == '__main__':
run()
В информации о выполнении задачи увидим следующее:
INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) running MyTask()
Hello world!
INFO: [pid 7448] Worker Worker(salt=857719525, workers=1, host=your_host, username=username, pid=7448) done MyTask()
Гарантия однократного выполнения задачи
Часто бывает так, что необходимо выполнить некую задачу единожды. Например из-за того, что её выполнение является ресурсоёмким. В Luigi задача считается сделанной, если сгенерирован некий объект (файл на машине, файл в hdfs, артефакт в MySql, таблица в Hive и другие), и можно проверить его существование. Для указания объекта необходимо переопределить в задаче метод output и в нем вернуть любого наследника или наследников класса Target. Для примера будем использовать LocalTarget — файл в локальной файловой системе.
from luigi import Task, run, LocalTarget
class MyTask(Task):
filename = "hello_file.txt"
def run(self):
with open(self.filename, 'w') as f:
f.write("Hello world!")
def output(self):
return LocalTarget(self.filename)
if __name__ == '__main__':
run()
Первый запуск задачи генерирует файл hello_file.txt. Повторный запуск задачи сообщает нам, что все задачи выполнены.
Зависимые задачи
В luigi задачи могут зависеть от других задач. Для указания зависимости от другой задачи необходимо переопределить метод requires. В нем вернуть объект класса любой другой задачи. Определим две зависимые задачи, каждая из которых пишет файл.
from luigi import Task, run, LocalTarget
class MyTaskFirst(Task):
filename = "first.txt"
def run(self):
with open(self.filename, 'w') as f:
f.write("first!")
def output(self):
return LocalTarget(self.filename)
class MyTaskSecond(Task):
filename = "second.txt"
def run(self):
with open(self.filename, 'w') as f:
f.write("second!")
def requires(self):
return MyTaskFirst()
def output(self):
return LocalTarget(self.filename)
if __name__ == '__main__':
run()
Запуск задачи немного изменился. Указываем для запуска самую последнюю задачу, фреймворк сам определит и выполнит все зависимости.
python -m luigi_demo_tasks MyTaskSecond --local-scheduler
Внешние зависимости
Иногда для выполнения задачи необходимы данные, генерируемые внешними системами. Так как в зависимостях в методе requires можно указывать только другие задачи, то нам понадобится задача-обертка для внешних данных. Для примера рассмотрим задачу подсчета частоты каждого символа в файле hello_file.txt:
from collections import defaultdict
from luigi import Task, run, LocalTarget, ExternalTask
class ExternalData(ExternalTask):
def output(self):
return LocalTarget("hello_file.txt")
class TaskWithExternalData(Task):
filename = "char_counts.txt"
def run(self):
frequencies = defaultdict(int)
with open(self.requires().output().path) as f_in:
for line in f_in:
for c in line:
frequencies[c] += 1
with open(self.filename, 'w') as f_out:
for c, count in frequencies.items():
f_out.write('{}t{}n'.format(c, count))
def requires(self):
return ExternalData()
def output(self):
return LocalTarget(self.filename)
if __name__ == '__main__':
run()
Планировщик
Для централизованного выполнения задач можно использовать центральный планировщик. Его основные задачи: обеспечить отсутствие одновременного выполнения одинаковых задач, предоставить визуализацию всех запущенных задач с их зависимостями.
Из документации запуск планировщика:
$ luigid --background --pidfile <PATH_TO_PIDFILE> --logdir <PATH_TO_LOGDIR> --state-path <PATH_TO_STATEFILE>
Адрес планировщика по умолчанию:
localhost:8082/
Запустим предыдущую задачу с использованием планировщика, предварительно удалив файл char_counts.txt:
python -m luigi_demo_tasks TaskWithExternalData
В планировщике увидим обе задачи:
А так же граф зависимостей:
Параллельный запуск задач
Реализуем несколько зависимых задач, зависимость оформим в виде песочных часов: корневая задача типа 1 зависит от десяти одинаковых задач типа 2. Эти 10 задач типа 2 зависят от одной типа 3. она, в свою очередь, зависит от 10 задач типа 4, и все эти 10 зависят от одной задачи типа 5.
Каждая задача пишет в результате работы файл со своим именем и номером, а так же спит 10 секунд. Это нужно для того, чтобы в планировщике можно было проследить порядок выполнения задач. Обратите внимание на то, что в задачу можно передать параметр. Это позволяет запускать разные по сути задачи с одинаковым кодом.
Для реализации будем использовать наследование, так как это позволит сократить код. Запустим одновременно 5 процессов указав опцию --workers=5
python -m luigi_demo_tasks Task1 --Task1-task-index=0 --workers=5
В планировщике обновляя страницу увидим следующую последовательность:
Одновременно выполняется не более пяти задач, а иногда только одна, так как от нее зависят все остальные. При этом выделенные воркеры простаивают.
Запуск задач
Для запуска задач необходимо использовать какой-либо внешний планировщик, например cron. Соответственно необходимо самостоятельно настраивать получение актуального кода для запуска, логирование и конфигурирование всех задач.
Дополнительные возможности
В случае возникновения ошибок в работе luigi может отправить email.
Для каждого набора задач можно указать внешний файл с настройками, настроив перед запуском переменную LUIGI_CONFIG_PATH
.
Каждая задача может быть запущена с некоторым приоритетом, для указания приоритета необходимо в классе указать поле priority.
Реализовано довольно много классов задач, связанных с типичными примерами обработки данных на кластере — hadoop streaming задача на Python, hadoop jar задача, spark задача и другие. При этом часто они требуют существенной доработки.
Возможно выполнение любой задачи в виде запуска консольной команды с отслеживанием процесса выполнения.
Код самой библиотеки чаще всего довольно прост. Для понимания того, как выстроить зависимые задачи, достаточно посмотреть исходный код базовых классов в luigi. У них довольно простой интерфейс и неплохая документация.
Разработан крупной компанией. На данный момент стабильно есть несколько коммитов в мастер каждую неделю. Скорее всего будет и дальше поддерживаться.
Недостатки
Проверка того, выполнена ли задача, происходит только во время построения графа зависимостей. Из-за этого, если запускать выполнение набора задач чаще, чем он весь успевает выполниться, может возникнуть ситуация, когда планировщик запускает две одинаковые задачи.
Нет встроенного планировщика.
Нет способа получить метаинформацию о выполняемых задачах. Нельзя без вспомогательных средств получить документацию по запускаемым задачам, процессам обработки данных. Нет способа, например, получить общий список задач, которые зависят от данной задачи.
Веб-интерфейс планировщика полезен только для того, чтобы увидеть, почему тот или иной набор задач не выполняется. Обычно можно посмотреть на граф зависимостей и увидеть, каких именно данных не хватает.
Не совсем очевидна настройка логирования выполнения задач.
Довольно часто встречается неожиданное поведение.
Поддержка и развитие менее активное, чем, например у Airflow. Для сравнения в luigi и в airflow
Вывод
Luigi хорошо подходит для построения процессов обработки данных. Однако прежде, чем начинать использовать эту библиотеку, есть смысл попробовать реализовать несколько типичных задач на разных фреймворках и выбрать такой, который лучше подойдет именно для ваших задач.
THE END
Как всегда рады мнениям, вопросам и тапкам.
Автор: MaxRokatansky