Насколько сложно построить полноценный сервис email-маркетинга? Что для этого нужно предусмотреть? Какие подводные камни могут встретиться на пути пытливых умов разработчиков?
Давайте попробуем разобраться вместе. В рамках нескольких статей я расскажу о том, как я уже больше года делаю свой собственный сервис email-рассылок, какие уроки для себя извлек и что планирую со всем этим делать дальше.
Сразу оговорюсь, что в статье рассмотрена только техническая сторона вопроса.
Первую часть можно прочитать здесь.
Кратко о себе
Я пишу на Python вот уже 5 лет, в основном использую Django, PostgreSQL, умею готовить JavaScript на уровне jQuery + KnockoutJS, иногда пишу на React. В свободное от основной работы время занимаюсь фрилансом на UpWork и собственными интернет-проектами, об одном из которых сейчас и планирую рассказать. Занимаюсь я этим проектом уже около полутора лет.
О чем эта статья?
В первой части я вкратце рассказал о том, какие технологии я использую в качестве основных в своем продукте (Python, Django, PostgreSQL), как работает трекинг писем, а так же привел немного статистики о работе сервиса.
В этой части я расскажу:
1. Как у меня получилось избавить пользователей от боли с помощью асинхронных задач. Рассмотрим одновременное выполнение нескольких задач с помощью Celery.
2. Как я организовал работу с таблицами, содержащими несколько миллионов записей в условиях очень ограниченного по ресурсам сервера. Расскажу, почему иногда денормализованные таблицы — это хорошо.
3. Как я сумел быстро и с минимальными временными затратами мигрировать часть функциональности монолитного Django проекта в отдельные микросервисы и какие технологии я для этого использовал.
Итак, приступим.
Асинхронные задачи в python проектах, или почему ваш пользователь не должен ждать
Рано или поздно все взрослеющие проекты сталкиваются с проблемами производительности, особенно если мы говорим о web-приложениях, обрабатывающих запросы от браузеров пользователей в синхронном режиме.
Простейший пример
Допустим, вы используете типичный для python-проекта стек технологий: свежую сборку uwsgi, nginx в качестве веб-сервера + ваше python-приложение. Nginx отдает статические файлы и пробрасывает корень на uwsgi. В конфигурации uwsgi у вас есть N worker-ов.
Это все будет работать хорошо ровно до тех пор, пока вы не столкнетесь с большим количеством «тяжелых» запросов (загрузка больших файлов, например), которые потребуют на обработку значительное время, близкое или большее чем harakiri uwsgi. После этого воркеры uwsgi не будут успевать обрабатывать запросы от пользователей, а пользователи начнут получать от nginx-а 502 ошибки и ругаться матом в отзывах о вашем «сервисе».
Конечно, можно прибегать к решениям типа tornado, asyncio + aiohttp, но это немного про другое, ведь мы уже имеем синхронное Django приложение, которое очень жалко выбрасывать переписывать.
Пытливый читатель скажет, что можно комбинировать Django с aiohttp с помощью использования нескольких upstream для nginx, но сейчас я хочу рассмотреть более стандартный подход.
Сегодня наиболее стандартным подходом в синхронных python проектах является использование библиотеки для обработки асинхронных задач Celery.
Вся информация ниже справедлива для версии 3.x, т.к. я использую именно её. В 4 версии Celery много всего поменялось.
Для тех читателей, кто еще незнаком с этой библиотекой краткая ремарка о том, что она умеет:
1. Вы сможете создавать задачи (которые представляют собой обычные функции), которые Celery будет асинхронно выполнять (или запускать на выполнение в нужное вам время с помощью beat).
Пример простейшей задачи, которая отправляет некое системное email письмо:
from celery import current_app
...
@current_app.task()
def send_service_message(subject, recipients, template, html_template, context):
html_mail(
subject=subject,
template=template,
html_template=html_template,
recipients=recipients,
context=context
)
Вызов этой задачи осуществляется следующим образом:
from app.message.tasks import send_service_message
...
send_service_message.delay(
_(u'%s: новый подписчик' % subscriber.list.project.domain),
[user.email],
'',
'site/subscriber/email/subscription_notification.html',
context
)
2. Если вы используете Django, вы получите достаточно удобный интерфейс для управления этими задачами, который позволит определять, например, параметры запуска для каждой задачи в runtime.
3. Вся эта система чаще всего работает на основе RabbitMQ (используется в качестве брокера сообщений) и, чаще всего, управляется с помощью supervisord. Также в качестве брокера может использоваться БД (самый медленный вариант, но вполне сгодится для разработки), Redis (авторы Celery не советуют его использовать, поскольку протокол очень тяжело сопровождать), Amazon SQS.
Пример использования в моем проекте
Мой проект предоставляет пользователям сервисы email-рассылок и транзакционной почты.
Соответственно, потенциально мы имеем 2 типа асинхронных задач:
1. Длинные задачи. Рассылки могут быть и по 20-25 тысяч писем, выполняться в течении 6-7 часов из-за необходимых таймаутов при отправки писем.
2. Короткие задачи. Пользователь зарегистрировался на сайте, бекенд которого отправил в API моего сервиса запрос на отправку письма с подтверждением регистрации и уже через несколько секунд (а лучше — быстрее) пользователь должен увидеть это письмо в своем почтовом ящике.
Если забить на планирование и запускать все задачи в рамках одной очереди (т.е. по умолчанию), все короткие задачи, при наличии уже выполняющейся длинной задачи, будут ждать окончания её выполнения.
Понятно, что получение транзакционных писем спустя 6-7 часов, это не вариант, так что приходим к использованию нескольких очередей для параллельной обработки задач.
Для этого конфигурируем сами очереди в нашем проекте:
# settings.py
...
CELERY_QUEUES = (
Queue('celery', Exchange('celery'), routing_key='celery'),
Queue('bulk', Exchange('bulk'), routing_key='bulk'),
Queue('transactional', Exchange('transactional'), routing_key='transactional'),
)
CELERY_ROUTES = {
'app.campaign.tasks.start_campaign': {
'queue': 'bulk'
},
'app.message.tasks.send_service_message': {
'queue': 'transactional'
},
'app.message.api.tasks.send_message': {
'queue': 'transactional'
}
}
В своем проекте я использую три очереди:
1. Transactional — очередь для отправки транзакционных писем
2. Bulk — очередь для отправки рассылок
3. Celery — очередь для остальных задач.
Соответственно, в supervisor.d конфигурация принимает примерно следующий вид:
[program:celery]
command=<PROJECT_ROOT>/venv/bin/celery worker -A conf.celery -l INFO --concurrency=4 --pidfile=/var/run/celery/celery.pid -Q celery
...
[program:bulk]
command=<PROJECT_ROOT>/venv/bin/celery worker -A conf.celery -l INFO --concurrency=4 --pidfile=/var/run/celery/bulk.pid -Q bulk
...
[program:transactional]
command=<PROJECT_ROOT>/venv/bin/celery worker -A conf.celery -l INFO --concurrency=4 --pidfile=/var/run/celery/transactional.pid -Q transactional
...
Понятно, что многие опции конфигурации здесь опущены. Суть в том, что на каждую очередь необходимо запустить свой процесс в supervisor.
Постановка задачи в конкретную очередь происходит следующим образом:
send_message.apply_async(kwargs={'mailer': self, 'message': message}, queue='transactional')
В итоге, получаем одновременную обработку нескольких задач, пользователи вроде как радуются, сервис работает достаточно быстро, воркеры uwsgi свободны.
Отслеживание состояния задач
Посмотреть состояние задач достаточно просто:
celery -A conf.celery inspect scheduled
celery -A conf.celery inspect active
celery -A conf.celery inspect reserved
Так же для более удобного мониторинга можно использовать решение Flower.
Хочу отметить, что я не считаю себя супер-экспертом в Celery и буду рад, если читатели поделятся своим опытом её использования.
Денормализация больших таблиц — за и против
Рано или поздно вы накопите данные. И с ними надо будет как-то работать.
Мой сервис обрабатывает около 400 000 писем в месяц (это очень мало), при этом я чищу данные о письмах старше 90 дней, и всё равно имею около 2 млн записей в таблицах, которые содержат данные трекинга.
Эти данные необходимо показывать в статистике, с ними нужно работать, по ним нужно делать аналитику, чтобы выявлять спамеров и т.д.
В начале, мои таблицы были супер-нормализованы, построены по всем канонам SQL. Однако реальность такова, что ORM Django зачастую сильно усложняет жизнь вам и вашему проекту за счет достаточно сложных JOIN-ов между вашими таблицами.
Плюс, не всегда очевидно сколько запросов происходит при итерировании queryset-а, особенно для начинающих разработчиков.
В итоге, чтобы посчитать достаточно простой запрос:
SELECT d.date,
SUM(CASE WHEN me.status = 'SENT' THEN 1 ELSE 0 END) AS sent_count,
SUM(CASE WHEN me.status = 'OPENED' THEN 1 ELSE 0 END) AS opened_count,
SUM(CASE WHEN me.status = 'REDIRECTED' THEN 1 ELSE 0 END) AS redirected_count,
SUM(CASE WHEN me.status = 'HARDBOUNCED' THEN 1 ELSE 0 END) AS hardbounced_count,
SUM(CASE WHEN me.status = 'SOFTBOUNCED' THEN 1 ELSE 0 END) AS softbounced_count,
SUM(CASE WHEN me.status = 'UNSUBSCRIBED' THEN 1 ELSE 0 END) AS unsubscribed_count
FROM (
SELECT TO_CHAR(date_trunc('day', ('2017-03-01'::DATE - offs)), 'YYYY-MM-DD')
AS date
FROM generate_series(0, 30, 1)
AS offs
) d
LEFT OUTER JOIN MESSAGE_MESSAGEEVENT me
ON (d.date=to_char(date_trunc('day', me.date_created), 'YYYY-MM-DD') AND status IN
('SENT', 'OPENED', 'REDIRECTED', 'HARDBOUNCED', 'SOFTBOUNCED', 'UNSUBSCRIBED') AND id IN (...))
GROUP BY date ORDER BY date
У меня уходило около 100 секунд. Ужасающее время, не правда ли? Происходило это по нескольким причинам:
1. Во-первых, никогда не делайте так: AND id IN (...), если у вас в массиве может быть около 100 000 элементов :)
2. Во-вторых, гораздо лучше написать d.date=me.date_created::date, чем d.date=to_char(date_trunc('day', me.date_created), 'YYYY-MM-DD').
3. В-третьих у меня в таблице были внешние ключи на несколько других ключей. Из-за чего JOIN-ы работали очень медленно.
К чему я пришел
1. Я добавил поля для явных значений связанных записей дополнительно к внешним ключам (частично денормализовал таблицы).
2. Переписал запрос:
SELECT d.date,
SUM(CASE WHEN me.status = 'SENT' THEN 1 ELSE 0 END) AS sent,
SUM(CASE WHEN me.status = 'OPENED' THEN 1 ELSE 0 END) AS opened,
SUM(CASE WHEN me.status = 'HARDBOUNCED' THEN 1 ELSE 0 END) AS hardbounced,
SUM(CASE WHEN me.status = 'SOFTBOUNCED' THEN 1 ELSE 0 END) AS softbounced,
SUM(CASE WHEN me.status = 'UNSUBSCRIBED' THEN 1 ELSE 0 END) AS unsubscribed,
SUM(CASE WHEN me.status = 'REDIRECTED' THEN 1 ELSE 0 END) AS redirected
FROM (
SELECT ('2017-03-01'::DATE - offs)::date
AS date
FROM generate_series(0, 30, 1)
AS offs
) d
LEFT OUTER JOIN MESSAGE_MESSAGEEVENT me ON (
d.date=me.date_created::date AND
me.date_created >= '2017-02-01 00:00:00' AND
me.date_created <= '2017-03-01 23:59:59' AND
true = true AND me.project_id = ...
)
GROUP BY date ORDER BY date;
В итоге, время обработки этого запроса сократилось до 3 секунд, что уже вполне терпимо. Explain:
GroupAggregate (cost=61552.47..82232.11 rows=200 width=11) (actual time=2445.092..2986.524 rows=31 loops=1)
-> Merge Left Join (cost=61552.47..73614.51 rows=491920 width=11) (actual time=2445.048..2857.069 rows=69314 loops=1)
Merge Cond: ((('2017-03-01'::date - offs.offs)) = ((me.date_created)::date))
-> Sort (cost=62.33..64.83 rows=1000 width=4) (actual time=0.127..0.174 rows=31 loops=1)
Sort Key: (('2017-03-01'::date - offs.offs))
Sort Method: quicksort Memory: 26kB
-> Function Scan on generate_series offs (cost=0.00..12.50 rows=1000 width=4) (actual time=0.034..0.069 rows=31 loops=1)
-> Materialize (cost=61490.14..62719.94 rows=98384 width=15) (actual time=2444.913..2695.888 rows=69312 loops=1)
-> Sort (cost=61490.14..61736.10 rows=98384 width=15) (actual time=2444.908..2539.290 rows=69312 loops=1)
Sort Key: ((me.date_created)::date)
Sort Method: external merge Disk: 2152kB
-> Bitmap Heap Scan on message_messageevent me (cost=11442.78..51647.59 rows=98384 width=15) (actual time=1922.446..2253.982 rows=69312 loops=1)
Recheck Cond: (project_id = ...)
Filter: ((date_created >= '2017-02-01 00:00:00+01'::timestamp with time zone) AND (date_created <= '2017-03-01 23:59:59+01'::timestamp with time zone))
-> Bitmap Index Scan on message_messageevent_b098ad43 (cost=0.00..11418.19 rows=236446 width=0) (actual time=1870.742..1870.742 rows=284445 loops=1)
Index Cond: (project_id = ...)
Total runtime: 2988.107 ms
Какие минусы у этого решения?
Во-первых, таблица занимает больше места на диске. Во-вторых, строковые значения в денормализованной таблице должны быть актуальными. Т.е. если вы меняете строковый идентификатор в родительской таблице, вы должны поменять его и в дочерней денормализованной таблице. К счастью, миграции Django позволяют это достаточно просто реализовать.
Миграция функциональности монолитного проекта в микросервисы
Одна из проблем взросления любого более-менее крупного проекта заключается в том, что его становится всё тяжелее и тяжелее поддерживать. Кодовая база растет, количество тестов растет. Плюс, если вы допустите ошибку в одной части проекта, из-за неё могут пострадать и другие. Например, случайный SyntaxError завалит весь проект целиком.
Также становятся все более актуальными проблемы производительности, связанные с уже описанными мной тяжелыми запросами.
Решение — выделить часть функциональности в отдельный микросервис, т.е.:
1. Вынести кодовую базу и тесты, относящейся к этой функциональности, в отдельный проект.
2. Создать для него отдельный инстанс uwsgi и отдельный upstream или server в nginx.
3. Связать его с помощью брокера типа RabbitMQ или HTTP API с основным проектом.
…
4. PROFIT!
Плюсы микросервисной архитектуры:
1. Больше нет опасности завалить проект из-за ошибки в одной из его функциональных частей.
2. Можно переиспользовать её в других проектах. Особенно актуально для всевозможных сокращателей ссылок, сжимателей картинок и тому подобных прикладных микросервисах.
3. Можно обновлять зависимости сервисов независимо друг от друга. Например, если вы используете legacy версию Django в основном проекте, вы можете использовать более новые версии в микросервисах без необходимости адаптировать значительную по объему кодовую базу.
В моем случае мне необходимо было реализовать простенькую админку для управления своим проектом. Работать она должна была отдельно, из-за того, что в ней доступны достаточно тяжелые отчеты по использованию сервиса моими пользователями, и забивать воркеров uwsgi основного проекта мне ими не хотелось.
Технологии, которые я использовал:
1. Django Rest Framework (далее — DRF) для построения HTTP API в рамках основного проекта.
2. Для написания клиента системы администрирования — ReactJS, NodeJS, Babel, ES6 и прочие непонятные мне слова из мира фронтенда.
Построение API
DRF позволяет очень быстро выстраивать удобные REST API интерфейсы в рамках уже существующих проектов. В моем случае мне было необходимо выстроить интерфейс, который бы удовлетворял следующим требованиям:
1. Доступ к ручкам API должен быть защищенным. Только определенные пользователи, наделенные правами администратора, должны иметь доступ к системе администрирования.
2. Интерфейс должен обеспечивать удобный способ взаимодействия для клиентского SPA приложения.
3. Из коробки должны работать ручки для получения списков объектов, информации по конкретному объекту, удаления объектов и т.д. В общем, API должно быть достаточно для выстраивания нормального CRUD интерфейса на стороне клиента.
Авторизация
Для авторизации запросов с клиента я использовал готовое решение из DRF — Token-аутентификацию.
Всё, что необходимо было сделать для её включения:
1. Добавить 'rest_framework.authtoken' в INSTALLED_APPS.
2. Запустить миграции.
3. Создать токены для пользователей, которые должны иметь доступ к системе администрирования.
Далее создаем общий mixin, который будем подмешивать в нужные нам ViewSet-ы:
# permissions.py
from rest_framework.permissions import IsAdminUser
class IsProjectAdminUser(IsAdminUser):
def has_permission(self, request, view):
return request.user.is_authenticated and request.user.is_admin
# views.py
from rest_framework.authentication import TokenAuthentication
from app.contrib.api.permissions import IsProjectAdminUser
class AdminAuthMixin(object):
authentication_classes = [TokenAuthentication]
permission_classes = [IsProjectAdminUser]
Далее мы достаточно просто можем использовать этот mixin для подключения авторизации в наши ViewSet:
...
from app.contrib.api.views import AdminAuthMixin, MultiSerializerViewSet
...
class CampaignsViewSet(AdminAuthMixin, MultiSerializerViewSet):
queryset = Campaign.objects.filter(date_archived__isnull=True)
filter_class = CampaignsFilter
serializers = {
'list': CampaignsListSerializer,
'retrieve': CampaignDetailSerializer,
}
Взаимодействие со стороны клиента
Для взаимодействия с API со стороны клиента отлично подошла библиотека axios.
Пример выполнения запроса к API:
axios.get('http://example.com/api/entrypoint/', {params: this.state.query_params})
.then(response => {
const items = response.data.results.map(obj => obj);
this.setState({
is_loading: false,
items: items,
count: response.data.count
});
});
Целиком код компонентов React приложения приводить не буду, т.к. он достаточно стандартен.
P.S.
В следующих статьях я расскажу о том, как без боли организовать логирование в большом проекте на основе logstash, kibana и elasticsearch, а так же коснусь клиентской документации и поддержки на основе решений HelpScout и GitBook.
Спасибо за внимание!
Автор: astrikovd