Как контролировать семейный бюджет?
У меня всегда были сложности точно следовать бюджету, особенно сейчас, когда все покупки проходят по кредитной карте. Причина проста — перед глазами нет пачки денег, которая постепенно сокращается, и в какой-то момент ты понимаешь, что тратить больше нечего. Если большая часть покупок оплачивается кредитной картой, то единственный способ узнать, сколько cредств осталось или сколько потрачено, это зайти в Интернет-банк или мобильный банк, или же использовать финансовые агрегаторы, например Mint, в которые тоже надо заходить и проверять баланс. Это возможно, но на это требуется дисциплина, а когда с той же карточки платишь не только ты, то установить её сложно.
Я подумал, что меня устроит вариант, если каждый день мне будет приходить уведомление о том, сколько денег у меня ещё осталось в этом месяце. То есть я бы устанавливал бюджет на месяц, и что-то бы считало мои траты и каждый день посылало отчёт о состоянии бюджета.
Самый очевидный вариант это использовать API банка или ходить в его Интернет-банк программно каким-нибудь headless-браузром. К сожалению, доступ к API моего банка платный, а ходить в Интернет-банк проблемно из-за двухфакторной авторизации. Однако, есть ещё один вариант. Почти все банки сегодня присылают оповещения на каждую транзакцию, информируя когда, сколько и в каком месте была совершена транзакция. Именно та информация, которая нужна для ведения бюджета. Осталось придумать, как её обрабатывать.
Мой банк может отправлять оповещения на мобильный телефон и на электронную почту. Вариант с мобильным телефоном не рассматривался ввиду сложности обработки смс-сообщений. Вариант с электронной почтой же выглядит очень заманчиво, программную обработку электронный писем можно было сделать и десятки лет назад. Но сейчас у меня дома только не всегда включённый ноутбук, а значит автоматизировать бюджет мы будем где-то в облаке, например, AWS.
Что нам понадобится в AWS?
В AWS есть множество сервисов, но нам нужно всего три: чтобы получать и отправлять электронные письма — SES, чтобы их обрабатывать — Lambda, и чтобы хранить результат DynamoDB. Плюс ещё пара вспомогательных для связки — SNS, Kinesis, CloudWatch. Это не единственный вариант обработки сообщений: вместо Lambda можно использовать EC2, вместо DynamoDB хранить данные можно в RDS (MySQL, PostgreSQL, Oracle, …), а можно и вообще написать простенький скрипт на своём маленьком сервере на перле и BerkleyDB.
Как выглядит вся обработка в общем? Приходит письмо о транзакции, мы записываем дату, сумму и место платежа в БД, и раз в день отправляем письмо с остатком для данного месяца. Вся архитектура чуть сложнее и выглядит следующим образом:
- Письмо приходит в SES.
- SES отправляет письмо в SNS топик.
- Lambda-функция ProcessCharge запускается по приходу письма по SNS, парсит письмо и записывает данные о транзакции в DynamoDB таблицу Transactions.
- Lambda-функция UpdateSummary срабатывает как триггер после записи в таблицу Transactions и обновляет данные о текущем состоянии бюджета в таблице Summary.
Рассмотрим эти шаги более подробно.
Получение письма
Simple Email Service, он же SES, это сервис для приёма и отправки писем. При получении письма можно указать, какое действие должно быть выполнено: сохранить письмо в S3, вызвать Lambda-функцию, послать письмо в SNS и другие. Для получения писем необходимо привязать свой домен, а именно указать SES сервера в MX записи домена. Своего домена у меня на тот момент не было, и я решил, что это хороший повод его зарегистрировать, воспользовавшись ещё одним AWS сервисом Route 53. Захостил я его тоже там же, в Route 53.
При привязки домена к SES требуется его проверка. Для этого SES просит добавить некоторые записи в DNS зону (MX и TXT), а затем проверяет их наличие. Если домен
aws> ses describe-receipt-rule --rule-set-name "ccalerts" --rule-name "ccalert"
{
"Rule": {
"Name": "ccalert",
"Recipients": [
"ccalert@=censored=”
],
"Enabled": true,
"ScanEnabled": true,
"Actions": [
{
"SNSAction": {
"TopicArn": "arn:aws:sns:us-west-2:=censored=:ccalerts",
"Encoding": "UTF-8"
}
}
],
"TlsPolicy": "Optional"
}
}
Обработка письма
Когда новое письмо публикуется в SNS-топик, вызывается Lambda-функция ProcessCharge. Ей нужно сделать два действия — распарсить письмо и сохранить данные в БД.
from __future__ import print_function
import json
import re
import uuid
from datetime import datetime
import boto3
def lambda_handler(event, context):
message = json.loads(event['Records'][0]['Sns']['Message'])
print("Processing email {}".format(message['mail']))
content = message['content']
trn = parse_content(content)
if trn is not None:
print("Transaction: %s" % trn)
process_transaction(trn)
За парсинг отвечает метод parse_content():
def parse_content(content):
content = content.replace("=rn", "")
match = re.search(r'A charge of ($USD) (d+.d+) at (.+?) has been authorized on (d+/d+/d+ d+:d+:d+ S{2} S+?).', content, re.M)
if match:
print("Matched %s" % match.group(0))
date = match.group(3)
# replace time zone with hour offset because Python can't parse it
date = date.replace("EDT", "-0400")
date = date.replace("EST", "-0500")
dt = datetime.strptime(date, "%m/%d/%Y %I:%M:%S %p %z")
return {'billed': match.group(1), 'merchant': match.group(2), 'datetime': dt.isoformat()}
else:
print("Didn't match")
return None
В нём мы убираем ненужные символы и с помощью регулярного выражения проверяем, содержит ли письмо информацию о транзакции, и если содержит, разбиваем её на части. Искомый текст выглядит следующим образом:
A charge of ($USD) 100.00 at Amazon.com has been authorized on 07/19/2017 1:55:52 PM EDT.
К сожалению, стандартная библиотека Питона знает мало часовых поясов, и EDT (Eastern Daylight Time) не среди них. Поэтому мы заменяем EDT на числовое обозначение -0400, и делаем такое же для основного часового пояса, EST. После этого мы можем распарсить дату и время транзакции, и преобразовать его в стандартный формат ISO 8601, поддерживаемый DynamoDB.
Метод возвращает хэш-таблицу с суммой транзакции, названием магазина и датой со временем. Эти данные передаются в метод process_transaction:
def process_transaction(trn):
ddb = boto3.client('dynamodb')
trn_id = uuid.uuid4().hex
ddb.put_item(
TableName='Transactions',
Item={
'id': {'S': trn_id},
'datetime': {'S': trn['datetime']},
'merchant': {'S': trn['merchant']},
'billed': {'N': trn['billed']}
})
В нём мы сохраняем данные в таблицу Transactions, генерируя уникальный идентификатор транзакции.
Обновление бюджета
Я бы хотел остановиться здесь подробнее, а именно на моменте как отслеживается состояние бюджета. Определим для себя несколько значений:
- budget — размер бюджета на месяц;
- total — сумма трат за месяц;
- available — остаток, (buget — total);
В любой момент времени мы хотим знать все эти значения. Это можно сделать двумя способами:
- Каждый раз, когда надо узнать состояние бюджета, транзакции суммируются чтобы получить total, затем available = (budget — total).
- Каждый раз, когда записывается новая транзакция, обновляется total. Когда надо узнать состояние бюджета, делается available = (budget — total).
Оба подхода имеют плюсы и минусы, и выбор сильно зависит от требований и ограничений системы. Первый подход хорош тем, что он не денормализует данные, храня отдельно сумму транзакций. С другой стороны, с ним сумму надо считать при каждом запросе. Для моих объёмов это не будет проблемой, но в моём случае у меня есть ограничение, вызванное DynamoDB. Чтобы посчитать сумму N транзакций, надо прочитать N записей, а значит потратить N read capacity units. Очевидно, это не очень масштабируемое решение, которое будет вызывать сложности (или высокую стоимость) даже при нескольких десятках транзакций.
При использовании второго подхода, total обновляется после каждой транзакции и всегда актуально, что позволяет избежать суммирования всех транзакций. Мне этот подход показался более рациональным в моём случае. Реализовать его, опять же, можно по-разному:
- Обновлять total после записи каждой транзакции в той же Lambda-функции ProcessCharge.
- Обновлять total в триггере после добавления нового элемента в таблицу Transactions.
Обновление в триггере более практично, в том числе с точки зрения многопоточности, поэтому я создал Lambda-функцию UpdateSummary:
from __future__ import print_function
from datetime import datetime
import boto3
def lambda_handler(event, context):
for record in event['Records']:
if record['eventName'] != 'INSERT':
print("Unsupported event {}".format(record))
return
trn = record['dynamodb']['NewImage']
print(trn)
process_transaction(trn)
Нас интересуют только события о добавлении элементов в таблицу, все остальные игнорируются.
def process_transaction(trn):
period = get_period(trn)
if period is None:
return
billed = trn['billed']['N']
# update total for current period
update_total(period, billed)
print("Transaction processed")
В process_transaction() мы вычисляем период, в виде год-месяц, к которому относится транзакция, и вызываем метод обновления total.
def get_period(trn):
try:
# python cannot parse -04:00, it needs -0400
dt = trn['datetime']['S'].replace("-04:00", "-0400")
dt = dt.replace("-05:00", "-0500")
dt = dt.replace("-07:00", "-0700")
dt = datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S%z")
return dt.strftime("%Y-%m")
except ValueError as err:
print("Cannot parse date {}: {}".format(trn['datetime']['S'], err))
return None
Этот код весьма далёк от совершенства, и в этом сыграла роль интересная особенность Питона, что он не может распарсить дату/время с часовым поясом в формате -HH:MM, который соответствует стандарту ISO 8601, и которую сам же Питон и сгенерировал (код выше, в методе parse_content()). Поэтому нужные мне часовые пояса я просто заменяю на понимаемый им формат -HHMM. Можно было воспользоваться сторонней библиотекой и сделать это более красиво, оставлю это на будущее. Возможно, ещё сказывается моё плохое знание Питона — этот проект мой первый опыт разработки на нём.
Обновление total:
def update_total(period, billed):
ddb = boto3.client('dynamodb')
response = load_summary(ddb, period)
print("Summary: {}".format(response))
if 'Item' not in response:
create_summary(ddb, period, billed)
else:
total = response['Item']['total']['N']
update_summary(ddb, period, total, billed)
В этом методе мы загружаем сводку (Summary) за текущий период с помощью метода load_summary(), total в котором нам надо обновить. Если сводки ещё не существует, мы создаём её в методе create_summary(), если существует, обновляем в update_summary().
def load_summary(ddb, period):
print("Loading summary for period {}".format(period))
return ddb.get_item(
TableName = 'Summary',
Key = {
'period': {'S': period}
},
ConsistentRead = True
)
Так как обновление сводки может производиться из нескольких потоков, мы используем консистентное чтение, которое дороже, но гарантирует, что мы получим последнее записанное значение.
def create_summary(ddb, period, total):
print("Creating summary for period {} with total {}".format(period, total))
ddb.put_item(
TableName = 'Summary',
Item = {
'period': {'S': period},
'total': {'N': total},
'budget': {'N': "0"}
},
ConditionExpression = 'attribute_not_exists(period)'
)
При создании новой сводки, по той же причине возможной записи из нескольких потоков, используется условная запись, ConditionExpression = 'attribute_not_exists(period)', которая сохранит новую сводку только в случае, если она не существует. Таким образом, если кто-то успел создать сводку в промежутке, когда мы попробовали её загрузить в load_summary() и её не было, и когда мы попытались её создать в create_summary(), наш вызов put_item() завершится исключением и вся Lambda-функция будет перезапущена.
def update_summary(ddb, period, total, billed):
print("Updating summary for period {} with total {} for billed {}".format(period, total, billed))
ddb.update_item(
TableName = 'Summary',
Key = {
'period': {'S': period}
},
UpdateExpression = 'SET #total = #total + :billed',
ConditionExpression = '#total = :total',
ExpressionAttributeValues = {
':billed': {'N': billed},
':total': {'N': total}
},
# total is a reserved word so we create an alias #total to use it in expression
ExpressionAttributeNames = {
'#total': 'total'
}
)
Обновления значения total в сводке производится внутри DynamoDB:
UpdateExpression = 'SET #total = #total + :billed'
Скорее всего, этого достаточно для безопасного обновления, однако я решил поступить консервативно и добавил условие, что запись должна произойти, только если сводку не успели обновить в другом потоке, и она до сих пор содержит значение, которое есть у нас:
ConditionExpression = '#total = :total',
Так как total является ключевым словом для DynamoDB, чтобы использовать его в выражениях DynamoDB надо создать синоним:
ExpressionAttributeNames = {
'#total': 'total'
}
На этом процесс обработки транзакций и обновления бюджета завершён:
period | budget | total |
---|---|---|
2017-07 | 1000 | 500 |
Отправка уведомления о состоянии бюджета
Последняя часть системы — уведомление о состояние бюджета. Как я писал в самом начале, мне достаточно получать уведомление раз в день, что я и реализовал. Однако ничего не мешает уведомлять после каждой транзакции, или после каких-то пороговых значений расходов / остатка. Архитектура отправки электронного письма с уведомлением достаточно проста и выглядит так:
- Таймер CloudWatch Timer срабатывает раз в день и вызывает Lambda-функцию DailyNotification.
- DailyNotification загружает данные из DynamoDB таблицы Summary и вызывает SES для отправки письма.
from __future__ import print_function
from datetime import date
import boto3
def lambda_handler(event, context):
ddb = boto3.client('dynamodb')
current_date = date.today()
print("Preparing daily notification for {}".format(current_date.isoformat()))
period = current_date.strftime("%Y-%m")
response = load_summary(ddb, period)
print("Summary: {}".format(response))
if 'Item' not in response:
print("No summary available for period {}".format(period))
return
summary = response['Item']
total = summary['total']['N']
budget = summary['budget']['N']
send_email(total, budget)
def load_summary(ddb, period):
print("Loading summary for period {}".format(period))
return ddb.get_item(
TableName = 'Summary',
Key = {
'period': {'S': period}
},
ConsistentRead = True
)
Сперва мы пытаемся загрузить сводку для текущего периода, и если её нет, то заканчиваем работу. Если есть — готовим и отправляем письмо:
def send_email(total, budget):
sender = "Our Budget <ccalert@==censored==>"
recipients = [“==censored==“]
charset = "UTF-8"
available = float(budget) - float(total)
today = date.today().strftime("%Y-%m-%d")
message = '''
As of {0}, available funds are ${1:.2f}. This month budget is ${2:.2f}, spendings so far totals ${3:.2f}.
More details coming soon!'''
subject = "How are we doing?"
textbody = message.format(today, float(available), float(budget), float(total))
print("Sending email: {}".format(textbody))
client = boto3.client('ses', region_name = 'us-west-2')
try:
response = client.send_email(
Destination = {
'ToAddresses': recipients
},
Message = {
'Body': {
'Text': {
'Charset': charset,
'Data': textbody,
},
},
'Subject': {
'Charset': charset,
'Data': subject,
},
},
Source = sender,
)
# Display an error if something goes wrong.
except Exception as e:
print("Couldn't send email: {}".format(e))
else:
print("Email sent!")
Итог
На этом всё. Сейчас, после каждой транзакции, пришедшее письмо обрабатывается и обновляется бюджет, и раз в день посылается письмо с уведомлением о состоянии бюджета. У меня ещё есть планы по добавлению функциональности, например, классификация расходов по категориям, и включение списка последних транзакций в уведомление, если получится что-то стоящее — поделюсь в другой статье. Если есть какие-то вопросы, комментарии или правки — жду в комментариях.
Автор: kazaand