Привет!
Не секрет, что для оценки платежеспособности клиентов банки используют данные из различных источников (кредитное бюро, мобильные операторы и т.д.). Количество внешних партнёров может достигать нескольких десятков, а аналитиков в нашей команде наберётся лишь несколько человек. Возникает задача оптимизации работы небольшой команды и передачи рутинных задач вычислительным системам.
Как данные попадают в банк, и как команда аналитиков следит за этим процессом, разберём в данной статье.
Начнём по порядку.
Нашу распределённую систему на основе Hadoop, и все процессы, связанные с ней, мы коротко называем SmartData. SmartData получает данные по API от внешних агентов. (Причём агентами для неё являются как внешние партнёры, так и внутренние системы банка). Безусловно, было бы полезно собирать некий «актуальный профиль» по каждому клиенту, что мы и делаем. Обновлённые данные от источников попадают в Оперпрофиль. Оперпрофиль реализует идею Customer 360 и хранится в виде таблиц Hbase. Это удобно для дальнейшей работы с клиентом.
Работа с агентами осуществляется непрерывно, и её нужно контролировать. Для быстрой проверки качества взаимодействия и hit rate, а также простоты передачи этой информации другим командам, мы используем визуализацию, например, отчёты в Tableau.
Исходные данные поступают в Kafka, проходят предварительную обработку и помещаются в DataLake, построенный на основе HDFS. Потребовалось придумать решение, как организовать парсинг файлов с логами из HDFS, их обработку и ежедневную выгрузку в аналитические системы и системы визуализации. А ещё совместить это с любовью аналитиков к Python ноутбукам.
Закончим с внутренней кухней и перейдём к практике.
Нашим решением стало использование API Livy. Livy позволяет сабмитить код на кластер прямо из Jupyter. HTTP запрос, содержащий код, написанный на Python (или Scala), и мета-данные, отправляется в Livy. Livy инициирует запуск Spark сессии на кластере, которая управляется менеджером ресурсов Yarn. Для отправки HTTP запросов подойдёт модуль requests. Любители парсить сайты наверняка с ним уже знакомы (а если нет – вот шанс немного узнать про него).
Импортируем необходимые модули и создадим сессию. (Также сразу узнаем адрес нашей сессии, в будущем это пригодится). В параметры передаем данные для авторизации пользователя и название языка скрипта, который будет исполнять кластер.
import json, requests, schedule, time
host = 'http://***:8998'
data = {'kind': 'spark', 'proxyUser': 'user'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
session_id = r.json().get('id')
print("session_id: " + str(session_id))
session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
Ждём, когда статус сессии перейдёт в idle. В случае, если время ожидания превысит установленный timeout – отправляем сообщение об ошибке.
timeout = time.time() + wait_time
sess_state = ['starting', 'success', 'idle']
while(True):
time.sleep(7)
req_st = requests.get(session_url, headers=headers).json().get('state')
if req_st != 'idle' and time.time() > timeout:
requests.delete(session_url, headers=headers)
send_message("Scheduler_error", req_st)
break
if req_st == 'idle':
break
if req_st not in sess_state:
send_message("Scheduler_error", req_st)
break
print("Session_state: ", req_st)
Теперь можно отправлять код в Livy.
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
time.sleep(15)
r = requests.get(statement_url, headers=headers).json()['output']
session_url = 'http://***:8998/sessions/' + str(session_id)
В цикле ждём окончания исполнения кода, получаем результат обработки:
r.get('data').get('text/plain')
Метод delete удалит сессию.
requests.delete(session_url, headers=headers)
Для ежедневной выгрузки можно использовать несколько вариантов, про cron на хабре уже писали, а вот про user-friendly модуль schedule – нет. Просто добавлю его в код, объяснений он не потребует. И, для удобства, все выкладки соберу в одном месте.
import json, requests, schedule, time
schedule.every().day.at("16:05").do(job, 300)
while True:
schedule.run_pending()
def job(wait_time):
host = 'http://***:8998'
data = {'kind': 'spark', 'proxyUser': 'user'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
session_id = r.json().get('id')
print("session_id: " + str(session_id))
session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)
timeout = time.time() + wait_time
sess_state = ['starting', 'success', 'idle']
while(True):
time.sleep(7)
req_st = requests.get(session_url, headers=headers).json().get('state')
if req_st != 'idle' and time.time() > timeout:
requests.delete(session_url, headers=headers)
break
if req_st == 'idle':
break
if req_st not in sess_state:
send_message("Scheduler_error", req_st)
break
print("Session_state: ", req_st)
statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data),headers=headers)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
time.sleep(15)
r = requests.get(statement_url, headers=headers).json()['output']
session_url = 'http://***:8998/sessions/' + str(session_id)
print(r.get('data').get('text/plain'))
#requests.delete(session_url, headers=headers)
def send_message(subject, text):
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
me = "my_email_adress"
you = "email_adress"
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = me
msg['To'] = you
text = text
part1 = MIMEText(text, 'plain')
msg.attach(part1)
s = smtplib.SMTP('domain.org')
s.ehlo()
s.starttls()
s.login("user", "password")
s.sendmail(me, you, msg.as_string())
s.quit()
Заключение:
Быть может, это решение не претендует на лучшее, но оно прозрачно для команды аналитиков. Плюсы, которые в нём вижу я:
- возможность использовать для автоматизации привычный Jupyter
- наглядное взаимодействие
- участник команды в праве сам выбрать, каким образом он будет работать с файлами (spark-зоопарк), как следствие, нет необходимости переписывать существующие скрипты
Конечно, при запуске большого количества заданий, придётся следить за освобождающимися ресурсами, настраивать коммуникацию между выгрузками. Эти вопросы решаются в индивидуальном порядке и согласовываются с коллегами.
Будет замечательно, если хотя бы одна команда возьмет это решение на заметку.
Ссылки
Автор: Ксения Пеньевская