Привет, меня зовут Александр Васин, я бэкенд-разработчик в Едадиле. Идея этого материала началась с того, что я хотел разобрать вступительное задание (Я.Диск) в Школу бэкенд-разработки Яндекса. Я начал описывать все тонкости выбора тех или иных технологий, методику тестирования… Получался совсем не разбор, а очень подробный гайд по тому, как писать бэкенды на Python. От первоначальной идеи остались только требования к сервису, на примере которых удобно разбирать инструменты и технологии. В итоге я очнулся на сотне тысяч символов. Ровно столько потребовалось, чтобы рассмотреть всё в мельчайших подробностях. Итак, программа на следующие 100 килобайт: как строить бэкенд сервиса, начиная от выбора инструментов и заканчивая деплоем.
TL;DR: Вот репка на GitHub с приложением, а кто любит (настоящие) лонгриды — прошу под кат.
Мы разработаем и протестируем REST API-сервис на Python, упакуем его в легкий Docker-контейнер и развернем с помощью Ansible.
Реализовать REST API-сервис можно по-разному, с помощью разных инструментов. Описанное решение не единственно верное, реализацию и инструменты я выбирал исходя из своего личного опыта и предпочтений.
- Что будем делать?
- Какие инструменты выбрать?
- Разработка
- Тестирование
- Сборка
- CI
- Деплой
- Нагрузочное тестирование
- Что еще можно сделать?
- В заключение
Что будем делать?
Представим, что интернет-магазин подарков планирует запустить акцию в разных регионах. Чтобы стратегия продаж была эффективной, необходим анализ рынка. У магазина есть поставщик, регулярно присылающий (например, на почту) выгрузки данных с информацией о жителях.
Давайте разработаем REST API-сервис на Python, который будет анализировать предоставленные данные и выявлять спрос на подарки у жителей разных возрастных групп в разных городах по месяцам.
В сервисе реализуем следующие обработчики:
-
POST /imports
Добавляет новую выгрузку с данными; -
GET /imports/$import_id/citizens
Возвращает жителей указанной выгрузки; -
PATCH /imports/$import_id/citizens/$citizen_id
Изменяет информацию о жителе (и его родственниках) в указанной выгрузке; -
GET /imports/$import_id/citizens/birthdays
Вычисляет число подарков, которое приобретет каждый житель выгрузки своим родственникам (первого порядка), сгруппированное по месяцам; -
GET /imports/$import_id/towns/stat/percentile/age
Вычисляет 50-й, 75-й и 99-й перцентили возрастов (полных лет) жителей по городам в указанной выборке.
Какие инструменты выбрать?
Итак, пишем сервис на Python, используя знакомые фреймворки, библиотеки и СУБД.
В 4 лекции видеокурса рассказывается о различных СУБД и их особенностях. Для моей реализации я выбрал СУБД PostgreSQL, зарекомендовавшую себя как надежное решение c отличной документацией на русском языке, сильным русским сообществом (всегда можно найти ответ на вопрос на русском языке) и даже бесплатными курсами. Реляционная модель достаточно универсальна и хорошо понятна многим разработчикам. Хотя то же самое можно было сделать на любой NoSQL СУБД, в этой статье будем рассматривать именно PostgreSQL.
Основная задача сервиса — передача данных по сети между БД и клиентами — не предполагает большой нагрузки на процессор, но требует возможности обрабатывать несколько запросов в один момент времени. В 10 лекции рассматривается асинхронный подход. Он позволяет эффективно обслуживать нескольких клиентов в рамках одного процесса ОС (в отличие, например, от используемой во Flask/Django pre-fork-модели, которая создает несколько процессов для обработки запросов от пользователей, каждый из них потребляет память, но простаивает большую часть времени). Поэтому в качестве библиотеки для написания сервиса я выбрал асинхронный aiohttp.
В 5 лекции видеокурса рассказывается, что SQLAlchemy позволяет декомпозировать сложные запросы на части, переиспользовать их, генерировать запросы с динамическим набором полей (например, PATCH-обработчик позволяет частичное обновление жителя с произвольными полями) и сосредоточиться непосредственно на бизнес-логике. С выполнением этих запросов и передачей данных быстрее всех справится драйвер asyncpg, а подружить их поможет asyncpgsa.
Мой любимый инструмент для управления состоянием БД и работы с миграциями — Alembic. Кстати, я недавно рассказывал о нем на Moscow Python.
Логику валидации получилось лаконично описать схемами Marshmallow (включая проверки на родственные связи). С помощью модуля aiohttp-spec я связал aiohttp-обработчики и схемы для валидации данных, а бонусом получилось сгенерировать документацию в формате Swagger и отобразить ее в графическом интерфейсе.
Для написания тестов я выбрал pytest
, подробнее о нем — в 3 лекции.
Для отладки и профилирования этого проекта я использовал отладчик PyCharm (лекция 9).
В 7 лекции рассказывается, как на любом компьютере с Docker (и даже на разных ОС) можно запускать упакованное приложение без необходимости настраивать окружение для запуска и легко устанавливать/обновлять/удалять приложение на сервере.
Для деплоя я выбрал Ansible. Он позволяет декларативно описывать желаемое состояние сервера и его сервисов, работает по ssh и не требует специального софта.
Разработка
Я решил дать Python-модулю название analyzer
и использовать следующую структуру:
В файле analyzer/__init__.py
я разместил общую информацию о модуле: описание (docstring), версию, лицензию, контакты разработчиков.
$ python
>>> import analyzer
>>> help(analyzer)
Help on package analyzer:
NAME
analyzer
DESCRIPTION
Сервис с REST API, анализирующий рынок для промоакций.
PACKAGE CONTENTS
api (package)
db (package)
utils (package)
DATA
__all__ = ('__author__', '__email__', '__license__', '__maintainer__',...
__email__ = 'alvassin@yandex.ru'
__license__ = 'MIT'
__maintainer__ = 'Alexander Vasin'
VERSION
0.0.1
AUTHOR
Alexander Vasin
FILE
/Users/alvassin/Work/backendschool2019/analyzer/__init__.py
Модуль имеет две входных точки — REST API-сервис (analyzer/api/__main__.py
) и утилита управления состоянием БД (analyzer/db/__main__.py
). Файлы называются __main__.py
неспроста — во-первых, такое название привлекает внимание, по нему понятно, что файл является входной точкой.
Во-вторых, благодаря этому подходу к входным точкам можно обращаться с помощью команды python -m
:
# REST API
$ python -m analyzer.api --help
# Утилита управления состоянием БД
$ python -m analyzer.db --help
Почему нужно начать с setup.py?
Забегая вперед, подумаем, как можно распространять приложение: оно может быть упаковано в zip- (а также wheel/egg-) архив, rpm-пакет, pkg-файл для macOS и установлено на удаленный компьютер, в виртуальную машину, MacBook или Docker-контейнер.
Главная цель файла setup.py
— описать модуль для distutils/setuptools
.
В файле необходимо указать общую информацию о модуле (название, версию, автора и т. д.), но также в нем можно указать требуемые для работы модули, «экстра»-зависимости (например для тестирования), точки входа (например, исполняемые команды) и требования к интерпретатору.
Плагины setuptools позволяют собирать из описанного модуля артефакт. Есть встроенные плагины: zip, egg, rpm, macOS pkg. Остальные плагины распространяются через PyPI: wheel, xar, pex.
В сухом остатке, описав один файл, мы получаем огромные возможности. Именно поэтому разработку нового модуля нужно начинать с setup.py
.
В функции setup()
зависимые модули указываются списком:
setup(..., install_requires=["aiohttp", "SQLAlchemy"])
Но я описал зависимости в отдельных файлах requirements.txt
и requirements.dev.txt
, содержимое которых используется в setup.py
. Мне это кажется более гибким, плюс тут есть секрет: впоследствии это позволит собирать Docker-образ быстрее. Зависимости будут ставиться отдельным шагом до установки самого приложения, а при пересборке Docker-контейнера попадать в кеш.
Чтобы setup.py
смог прочитать зависимости из файлов requirements.txt
и requirements.dev.txt
, написана функция:
def load_requirements(fname: str) -> list:
requirements = []
with open(fname, 'r') as fp:
for req in parse_requirements(fp.read()):
extras = '[{}]'.format(','.join(req.extras)) if req.extras else ''
requirements.append(
'{}{}{}'.format(req.name, extras, req.specifier)
)
return requirements
Стоит отметить, что setuptools
при сборке source distribution по умолчанию включает в сборку только файлы .py
, .c
, .cpp
и .h
. Чтобы файлы с зависимостями requirements.txt
и requirements.dev.txt
попали в пакет, их необходимо явно указать в файле MANIFEST.in
.
import os
from importlib.machinery import SourceFileLoader
from pkg_resources import parse_requirements
from setuptools import find_packages, setup
module_name = 'analyzer'
# Возможно, модуль еще не установлен (или установлена другая версия), поэтому
# необходимо загружать __init__.py с помощью machinery.
module = SourceFileLoader(
module_name, os.path.join(module_name, '__init__.py')
).load_module()
def load_requirements(fname: str) -> list:
requirements = []
with open(fname, 'r') as fp:
for req in parse_requirements(fp.read()):
extras = '[{}]'.format(','.join(req.extras)) if req.extras else ''
requirements.append(
'{}{}{}'.format(req.name, extras, req.specifier)
)
return requirements
setup(
name=module_name,
version=module.__version__,
author=module.__author__,
author_email=module.__email__,
license=module.__license__,
description=module.__doc__,
long_description=open('README.rst').read(),
url='https://github.com/alvassin/backendschool2019',
platforms='all',
classifiers=[
'Intended Audience :: Developers',
'Natural Language :: Russian',
'Operating System :: MacOS',
'Operating System :: POSIX',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: Implementation :: CPython'
],
python_requires='>=3.8',
packages=find_packages(exclude=['tests']),
install_requires=load_requirements('requirements.txt'),
extras_require={'dev': load_requirements('requirements.dev.txt')},
entry_points={
'console_scripts': [
# f-strings в setup.py не используются из-за соображений
# совместимости.
# Несмотря на то, что этот пакет требует Python 3.8, технически
# source distribution для него может собираться с помощью более
# ранних версий Python. Не стоит лишать пользователей этой
# возможности.
'{0}-api = {0}.api.__main__:main'.format(module_name),
'{0}-db = {0}.db.__main__:main'.format(module_name)
]
},
include_package_data=True
)
Установить проект в режиме разработки можно следующей командой. (В editable-режиме Python не установит модуль целиком в папку site-packages
, а только создаст ссылки, поэтому любые изменения, вносимые в файлы модуля, будут видны сразу.)
# Установить модуль с обычными и extra-зависимостями "dev"
pip install -e '.[dev]'
# Установить модуль только с обычными зависимостями
pip install -e .
Как указать версии зависимостей?
Здорово, когда разработчики активно занимаются своими модулями — в таких модулях активнее исправляются ошибки, появляется новая функциональность и можно быстрее получить обратную связь. Но иногда изменения в зависимых библиотеках не имеют обратной совместимости и могут привести к ошибкам в вашем приложении, если не подумать об этом заранее.
Для каждого зависимого модуля можно указать определенную версию, например aiohttp==3.6.2
. Тогда приложение будет гарантированно собираться именно с теми версиями зависимых библиотек, с которыми оно было протестировано. Но у этого подхода есть и недостаток — если разработчики исправят критичный баг в зависимом модуле, не влияющий на обратную совместимость, в приложение это исправление не попадет.
Существует подход к версионированию Semantic Versioning, который предлагает представлять версию в формате MAJOR.MINOR.PATCH
:
MAJOR
— увеличивается при добавлении обратно несовместимых изменений;MINOR
— увеличивается при добавлении новой функциональности с поддержкой обратной совместимости;PATCH
— увеличивается при добавлении исправлений багов с поддержкой обратной совместимости.
Если зависимый модуль следует этому подходу (о чем авторы обычно сообщают в файлах README или CHANGELOG), то достаточно зафиксировать значения MAJOR
, MINOR
и ограничить минимальное значение для PATCH-версии: >= MAJOR.MINOR.PATCH, == MAJOR.MINOR.*
.
Такое требование можно реализовать с помощью оператора ~=. Например, aiohttp~=3.6.2
позволит PIP установить для aiohttp
версию 3.6.3, но не 3.7.
Если указать интервал версий зависимостей, это даст еще одно преимущество — не будет конфликтов версий между зависимыми библиотеками.
Если вы разрабатываете библиотеку, которая требует другой модуль-зависимость, то разрешите для него не одну определенную версию, а интервал. Тогда потребителям вашей библиотеки будет намного легче ее использовать (вдруг их приложение требует этот же модуль-зависимость, но уже другой версии).
Semantic Versioning — лишь соглашение между авторами и потребителями модулей. Оно не гарантирует, что авторы пишут код без багов и не могут допустить ошибку в новой версии своего модуля.
База данных
Проектируем схему
В описании обработчика POST /imports приведен пример выгрузки с информацией о жителях:
{
"citizens": [
{
"citizen_id": 1,
"town": "Москва",
"street": "Льва Толстого",
"building": "16к7стр5",
"apartment": 7,
"name": "Иванов Иван Иванович",
"birth_date": "26.12.1986",
"gender": "male",
"relatives": [2]
},
{
"citizen_id": 2,
"town": "Москва",
"street": "Льва Толстого",
"building": "16к7стр5",
"apartment": 7,
"name": "Иванов Сергей Иванович",
"birth_date": "01.04.1997",
"gender": "male",
"relatives": [1]
},
{
"citizen_id": 3,
"town": "Керчь",
"street": "Иосифа Бродского",
"building": "2",
"apartment": 11,
"name": "Романова Мария Леонидовна",
"birth_date": "23.11.1986",
"gender": "female",
"relatives": []
},
...
]
}
Первой мыслью было хранить всю информацию о жителе в одной таблице citizens
, где родственные связи были бы представлены полем relatives
в виде списка целых чисел.
- В обработчике
GET /imports/$import_id/citizens/birthdays
для получения месяцев, на которые приходятся дни рождения родственников, потребуется выполнить слияние таблицыcitizens
с самой собой. Для этого будет необходимо развернуть список с идентификаторами родственниковrelatives
с помощью фунцииUNNEST
.Такой запрос будет выполняться сравнительно медленно, и обработчик не уложится в 10-секундный таймаут:
SELECT relations.citizen_id, relations.relative_id, date_part('month', relatives.birth_date) as relative_birth_month FROM ( SELECT citizens.import_id, citizens.citizen_id, UNNEST(citizens.relatives) as relative_id FROM citizens WHERE import_id = 1 ) as relations INNER JOIN citizens as relatives ON relations.import_id = relatives.import_id AND relations.relative_id = relatives.citizen_id
- В таком подходе целостность данных в поле
relatives
не обеспечивается PostgreSQL, а контролируется приложением: технически в списокrelatives
можно добавить любое целое число, в том числе идентификатор несуществующего жителя. Ошибка в коде или человеческий фактор (редактирование записей напрямую в БД администратором) обязательно рано или поздно приведут к несогласованному состоянию данных.
Далее, я решил привести все требуемые для работы данные к третьей нормальной форме, и получилась следующая структура:
- Таблица imports состоит из автоматически инкрементируемого столбца
import_id
. Он нужен для создания проверки по внешнему ключу в таблицеcitizens
. - В таблице citizens хранятся скалярные данные о жителе (все поля за исключением информации о родственных связях).
В качестве первичного ключа используется пара (
import_id
,citizen_id
), гарантирующая уникальность жителейcitizen_id
в рамкахimport_id
.Внешний ключ
citizens.import_id -> imports.import_id
гарантирует, что полеcitizens.import_id
будет содержать только существующие выгрузки. - Таблица relations содержит информацию о родственных связях.
Одна родственная связь представлена двумя записями (от жителя к родственнику и обратно): эта избыточность позволяет использовать более простое условие при слиянии таблиц
citizens
иrelations
и получать информацию более эффективно.
Первичный ключ состоит из столбцов (import_id
,citizen_id
,relative_id
) и гарантирует, что в рамках одной выгрузкиimport_id
у жителяcitizen_id
будут родственники c уникальнымиrelative_id
.Также в таблице используются два составных внешних ключа:
(relations.import_id, relations.citizen_id) -> (citizens.import_id, citizens.citizen_id)
и(relations.import_id, relations.relative_id) -> (citizens.import_id, citizens.citizen_id)
, гарантирующие, что в таблице будут указаны существующие жительcitizen_id
и родственникrelative_id
из одной выгрузки.
Такая структура обеспечивает целостность данных средствами PostgreSQL, позволяет эффективно получать жителей с родственниками из базы данных, но подвержена состоянию гонки во время обновления информации о жителях конкурентными запросами (подробнее рассмотрим при реализации обработчика PATCH).
Описываем схему в SQLAlchemy
В лекции 5 я рассказывал, что для создания запросов с помощью SQLAlchemy необходимо описать схему базы данных с помощью специальных объектов: таблицы описываются с помощью sqlalchemy.Table
и привязываются к реестру sqlalchemy.MetaData
, который хранит всю метаинформацию о базе данных. К слову, реестр MetaData
способен не только хранить описанную в Python метаинформацию, но и представлять реальное состояние базы данных в виде объектов SQLAlchemy.
Эта возможность в том числе позволяет Alembic сравнивать состояния и генерировать код миграций автоматически.
Кстати, у каждой базы данных своя схема именования constraints по умолчанию. Чтобы вы не тратили время на именование новых constraints или на воспоминания/поиски того, как назван constraint, который вы собираетесь удалить, SQLAlchemy предлагает использовать шаблоны именования naming conventions. Их можно определить в реестре MetaData
.
# analyzer/db/schema.py
from sqlalchemy import MetaData
convention = {
'all_column_names': lambda constraint, table: '_'.join([
column.name for column in constraint.columns.values()
]),
# Именование индексов
'ix': 'ix__%(table_name)s__%(all_column_names)s',
# Именование уникальных индексов
'uq': 'uq__%(table_name)s__%(all_column_names)s',
# Именование CHECK-constraint-ов
'ck': 'ck__%(table_name)s__%(constraint_name)s',
# Именование внешних ключей
'fk': 'fk__%(table_name)s__%(all_column_names)s__%(referred_table_name)s',
# Именование первичных ключей
'pk': 'pk__%(table_name)s'
}
metadata = MetaData(naming_convention=convention)
Если указать шаблоны именования, Alembic воспользуется ими во время автоматической генерации миграций и будет называть все constraints в соответствии с ними. В дальнейшем cозданный реестр MetaData
потребуется для описания таблиц:
# analyzer/db/schema.py
from enum import Enum, unique
from sqlalchemy import (
Column, Date, Enum as PgEnum, ForeignKey, ForeignKeyConstraint, Integer,
String, Table
)
@unique
class Gender(Enum):
female = 'female'
male = 'male'
imports_table = Table(
'imports',
metadata,
Column('import_id', Integer, primary_key=True)
)
citizens_table = Table(
'citizens',
metadata,
Column('import_id', Integer, ForeignKey('imports.import_id'),
primary_key=True),
Column('citizen_id', Integer, primary_key=True),
Column('town', String, nullable=False, index=True),
Column('street', String, nullable=False),
Column('building', String, nullable=False),
Column('apartment', Integer, nullable=False),
Column('name', String, nullable=False),
Column('birth_date', Date, nullable=False),
Column('gender', PgEnum(Gender, name='gender'), nullable=False),
)
relations_table = Table(
'relations',
metadata,
Column('import_id', Integer, primary_key=True),
Column('citizen_id', Integer, primary_key=True),
Column('relative_id', Integer, primary_key=True),
ForeignKeyConstraint(
('import_id', 'citizen_id'),
('citizens.import_id', 'citizens.citizen_id')
),
ForeignKeyConstraint(
('import_id', 'relative_id'),
('citizens.import_id', 'citizens.citizen_id')
),
)
Настраиваем Alembic
Когда схема базы данных описана, необходимо сгенерировать миграции, но для этого сначала нужно настроить Alembic, об этом тоже рассказывается в лекции 5.
Чтобы воспользоваться командой alembic
, необходимо выполнить следующие шаги:
- Установить модуль:
pip install alembic
- Инициализировать Alembic:
cd analyzer && alembic init db/alembic
.Эта команда создаст файл конфигурации
analyzer/alembic.ini
и папкуanalyzer/db/alembic
со следующим содержимым:-
env.py
— вызывается каждый раз при запуске Alembic. Подключает в Alembic реестрsqlalchemy.MetaData
с описанием желаемого состояния БД и содержит инструкции по запуску миграций. script.py.mako
— шаблон, на основе которого генерируются миграции.versions
— папка, в которой Alembic будет искать (и генерировать) миграции.
-
- Указать адрес базы данных в файле alembic.ini:
; analyzer/alembic.ini [alembic] sqlalchemy.url = postgresql://user:hackme@localhost/analyzer
- Указать описание желаемого состояния базы данных (реестр
sqlalchemy.MetaData
), чтобы Alembic мог генерировать миграции автоматически:# analyzer/db/alembic/env.py from analyzer.db import schema target_metadata = schema.metadata
Alembic настроен и им уже можно пользоваться, но в нашем случае такая конфигурация имеет ряд недостатков:
- Утилита
alembic
ищетalembic.ini
в текущей рабочей директории. Путь кalembic.ini
можно указать аргументом командной строки, но это неудобно: хочется иметь возможность вызывать команду из любой папки без дополнительных параметров. - Чтобы настроить Alembic на работу с определенной базой данных, требуется менять файл
alembic.ini
. Гораздо удобнее было бы указать настройки БД переменной окружения и/или аргументом командной строки, например--pg-url
. - Название утилиты
alembic
не очень хорошо коррелирует с названием нашего сервиса (а пользователь фактически может вообще не владеть Python и ничего не знать об Alembic). Конечному пользователю было бы намного удобнее, если бы все исполняемые команды сервиса имели общий префикс, напримерanalyzer-*
.
Эти проблемы решаются с помощью небольшой обертки analyzer/db/__main__.py:
- Для обработки аргументов командной строки Alembic использует стандартный модуль
argparse
. Он позволяет добавить необязательный аргумент--pg-url
со значением по умолчанию из переменной окруженияANALYZER_PG_URL
.Кодimport os from alembic.config import CommandLine, Config from analyzer.utils.pg import DEFAULT_PG_URL def main(): alembic = CommandLine() alembic.parser.add_argument( '--pg-url', default=os.getenv('ANALYZER_PG_URL', DEFAULT_PG_URL), help='Database URL [env var: ANALYZER_PG_URL]' ) options = alembic.parser.parse_args() # Создаем объект конфигурации Alembic config = Config(file_=options.config, ini_section=options.name, cmd_opts=options) # Меняем значение sqlalchemy.url из конфига Alembic config.set_main_option('sqlalchemy.url', options.pg_url) # Запускаем команду alembic exit(alembic.run_cmd(config, options)) if __name__ == '__main__': main()
- Путь до файла
alembic.ini
можно рассчитывать относительно расположения исполняемого файла, а не текущей рабочей директории пользователя.Кодimport os from alembic.config import CommandLine, Config from pathlib import Path PROJECT_PATH = Path(__file__).parent.parent.resolve() def main(): alembic = CommandLine() options = alembic.parser.parse_args() # Если указан относительный путь (alembic.ini), добавляем в начало # абсолютный путь до приложения if not os.path.isabs(options.config): options.config = os.path.join(PROJECT_PATH, options.config) # Создаем объект конфигурации Alembic config = Config(file_=options.config, ini_section=options.name, cmd_opts=options) # Подменяем путь до папки с alembic на абсолютный (требуется, чтобы alembic # мог найти env.py, шаблон для генерации миграций и сами миграции) alembic_location = config.get_main_option('script_location') if not os.path.isabs(alembic_location): config.set_main_option('script_location', os.path.join(PROJECT_PATH, alembic_location)) # Запускаем команду alembic exit(alembic.run_cmd(config, options)) if __name__ == '__main__': main()
Когда утилита для управления состоянием БД готова, ее можно зарегистрировать в setup.py
как исполняемую команду с понятным конечному пользователю названием, например analyzer-db
:
from setuptools import setup
setup(..., entry_points={
'console_scripts': [
'analyzer-db = analyzer.db.__main__:main'
]
})
После переустановки модуля будет сгенерирован файл env/bin/analyzer-db
и команда analyzer-db
станет доступной:
$ pip install -e '.[dev]'
Генерируем миграции
Чтобы сгенерировать миграции, требуется два состояния: желаемое (которое мы описали объектами SQLAlchemy) и реальное (база данных, в нашем случае пустая).
Я решил, что проще всего поднять Postgres с помощью Docker и для удобства добавил команду make postgres
, запускающую в фоновом режиме контейнер с PostgreSQL на 5432 порту:
$ make postgres
...
$ analyzer-db revision --message="Initial" --autogenerate
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.autogenerate.compare] Detected added table 'imports'
INFO [alembic.autogenerate.compare] Detected added table 'citizens'
INFO [alembic.autogenerate.compare] Detected added index 'ix__citizens__town' on '['town']'
INFO [alembic.autogenerate.compare] Detected added table 'relations'
Generating /Users/alvassin/Work/backendschool2019/analyzer/db/alembic/versions/d5f704ed4610_initial.py ... done
Alembic в целом хорошо справляется с рутинной работой генерации миграций, но я хотел бы обратить внимание на следующее:
- Пользовательские типы данных, указанные в создаваемых таблицах, создаются автоматически (в нашем случае —
gender
), но код для их удаления вdowngrade
не генерируется. Если применить, откатить и потом еще раз применить миграцию, это вызовет ошибку, так как указанный тип данных уже существует.Удаляем тип данных gender в методе downgradefrom alembic import op from sqlalchemy import Column, Enum GenderType = Enum('female', 'male', name='gender') def upgrade(): ... # При создании таблицы тип данных GenderType будет создан автоматически op.create_table('citizens', ..., Column('gender', GenderType, nullable=False)) ... def downgrade(): op.drop_table('citizens') # После удаления таблицы тип данных необходимо удалить GenderType.drop(op.get_bind())
- В методе
downgrade
некоторые действия иногда можно убрать (если мы удаляем таблицу целиком, можно не удалять ее индексы отдельно):Напримерdef downgrade(): op.drop_table('relations') # Следующим шагом мы удаляем таблицу citizens, индекс будет удален автоматически # эту строчку можно удалить op.drop_index(op.f('ix__citizens__town'), table_name='citizens') op.drop_table('citizens') op.drop_table('imports')
Когда миграция исправлена и готова, применим ее:
$ analyzer-db upgrade head
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [alembic.runtime.migration] Running upgrade -> d5f704ed4610, Initial
Приложение
Прежде чем приступить к созданию обработчиков, необходимо сконфигурировать приложение aiohttp.
import logging
from aiohttp import web
def main():
# Настраиваем логирование
logging.basicConfig(level=logging.DEBUG)
# Создаем приложение
app = web.Application()
# Регистрируем обработчики
app.router.add_route(...)
# Запускаем приложение
web.run_app(app)
Этот код вызывает ряд вопросов и имеет ряд недостатков:
- Как конфигурировать приложение? Как минимум, необходимо указать хост и порт для подключения клиентов, а также информацию для подключения к базе данных.
Мне очень нравится решать эту задачу с помощью модуля
ConfigArgParse
: он расширяет стандартныйargparse
и позволяет использовать для конфигурации аргументы командной строки, переменные окружения (незаменимые для конфигурации Docker-контейнеров) и даже файлы конфигурации (а также совмещать эти способы). C помощьюConfigArgParse
также можно валидировать значения параметров конфигурации приложения.Пример обработки параметров с помощью ConfigArgParsefrom aiohttp import web from configargparse import ArgumentParser, ArgumentDefaultsHelpFormatter from analyzer.utils.argparse import positive_int parser = ArgumentParser( # Парсер будет искать переменные окружения с префиксом ANALYZER_, # например ANALYZER_API_ADDRESS и ANALYZER_API_PORT auto_env_var_prefix='ANALYZER_', # Покажет значения параметров по умолчанию formatter_class=ArgumentDefaultsHelpFormatter ) parser.add_argument('--api-address', default='0.0.0.0', help='IPv4/IPv6 address API server would listen on') # Разрешает только целые числа больше нуля parser.add_argument('--api-port', type=positive_int, default=8081, help='TCP port API server would listen on') def main(): # Получаем параметры конфигурации, которые можно передать как аргументами # командной строки, так и переменными окружения args = parser.parse_args() # Запускаем приложение на указанном порту и адресе app = web.Application() web.run_app(app, host=args.api_address, port=args.api_port) if __name__ == '__main__': main()
Кстати,
ConfigArgParse
, как иargparse
, умеет генерировать подсказку по запуску команды с описанием всех аргументов (необходимо позвать команду с аргументом-h
или--help
). Это невероятно облегчает жизнь пользователям вашего ПО:Например$ python __main__.py --help usage: __main__.py [-h] [--api-address API_ADDRESS] [--api-port API_PORT] If an arg is specified in more than one place, then commandline values override environment variables which override defaults. optional arguments: -h, --help show this help message and exit --api-address API_ADDRESS IPv4/IPv6 address API server would listen on [env var: ANALYZER_API_ADDRESS] (default: 0.0.0.0) --api-port API_PORT TCP port API server would listen on [env var: ANALYZER_API_PORT] (default: 8081)
- После получения переменные окружения больше не нужны и даже могут представлять опасность — например, они могут случайно «утечь» с отображением информации об ошибке. Злоумышленники в первую очередь будут пытаться получить информацию об окружении, поэтому очистка переменных окружения считается хорошим тоном.
Можно было бы воспользоваться
os.environ.clear()
, но Python позволяет управлять поведением модулей стандартной библиотеки с помощью многочисленных переменных окружения (например, вдруг потребуется включить режим отладкиasyncio
?), поэтому разумнее очищать переменные окружения по префиксу приложения, указанного вConfigArgParser
.Примерimport os from typing import Callable from configargparse import ArgumentParser from yarl import URL from analyzer.api.app import create_app from analyzer.utils.pg import DEFAULT_PG_URL ENV_VAR_PREFIX = 'ANALYZER_' parser = ArgumentParser(auto_env_var_prefix=ENV_VAR_PREFIX) parser.add_argument('--pg-url', type=URL, default=URL(DEFAULT_PG_URL), help='URL to use to connect to the database') def clear_environ(rule: Callable): """ Очищает переменные окружения, переменные для очистки определяет переданная функция rule """ # Ключи из os.environ копируются в новый tuple, чтобы не менять объект # os.environ во время итерации for name in filter(rule, tuple(os.environ)): os.environ.pop(name) def main(): # Получаем аргументы args = parser.parse_args() # Очищаем переменные окружения по префиксу ANALYZER_ clear_environ(lambda i: i.startswith(ENV_VAR_PREFIX)) # Запускаем приложение app = create_app(args) ... if __name__ == '__main__': main()
- Запись логов в stderr/файл в основном потоке блокирует цикл событий.
В лекции 9 рассказывается, что по умолчанию
logging.basicConfig()
настраиваетзапись логов в stderr
.Чтобы логирование не мешало эффективной работе асинхронного приложения, необходимо выполнять запись логов в отдельном потоке. Для этого можно воспользоваться готовым методом из модуля aiomisc.
Настраиваем логирование с помощью aiomiscimport logging from aiomisc.log import basic_config basic_config(logging.DEBUG, buffered=True)
- Как масштабировать приложение, если одного процесса станет недостаточно для обслуживания входящего трафика? Можно сначала аллоцировать сокет, затем с помощью
fork
создать несколько новых отдельных процессов, и соединения на сокете будут распределяться между ними механизмами ядра (конечно, под Windows это не работает).Примерimport os from sys import argv import forklib from aiohttp.web import Application, run_app from aiomisc import bind_socket from setproctitle import setproctitle def main(): sock = bind_socket(address='0.0.0.0', port=8081, proto_name='http') setproctitle(f'[Master] {os.path.basename(argv[0])}') def worker(): setproctitle(f'[Worker] {os.path.basename(argv[0])}') app = Application() run_app(app, sock=sock) forklib.fork(os.cpu_count(), worker, auto_restart=True) if __name__ == '__main__': main()
- Требуется ли приложению обращаться или аллоцировать какие-либо ресурсы во время работы? Если нет, по соображениям безопасности все ресурсы (в нашем случае — сокет для подключения клиентов) можно аллоцировать на старте, а затем сменить пользователя на
nobody
. Он обладает ограниченным набором привиллегий — это здорово усложнит жизнь злоумышленникам.Примерimport os import pwd from aiohttp.web import run_app from aiomisc import bind_socket from analyzer.api.app import create_app def main(): # Аллоцируем сокет sock = bind_socket(address='0.0.0.0', port=8085, proto_name='http') user = pwd.getpwnam('nobody') os.setgid(user.pw_gid) os.setuid(user.pw_uid) app = create_app(...) run_app(app, sock=sock) if __name__ == '__main__': main()
- В конце концов я решил вынести создание приложения в отдельную параметризуемую функцию
create_app
, чтобы можно было легко создавать идентичные приложения для тестирования.
Сериализация данных
Все успешные ответы обработчиков будем возвращать в формате JSON. Информацию об ошибках клиентам тоже было бы удобно получать в сериализованном виде (например, чтобы увидеть, какие поля не прошли валидацию).
Документация aiohttp
предлагает метод json_response
, который принимает объект, сериализует его в JSON и возвращает новый объект aiohttp.web.Response
с заголовком Content-Type: application/json
и сериализованными данными внутри.
from aiohttp.web import Application, View, run_app
from aiohttp.web_response import json_response
class SomeView(View):
async def get(self):
return json_response({'hello': 'world'})
app = Application()
app.router.add_route('*', '/hello', SomeView)
run_app(app)
Но существует и другой способ: aiohttp позволяет зарегистрировать произвольный сериализатор для определенного типа данных ответа в реестре aiohttp.PAYLOAD_REGISTRY
. Например, можно указать сериализатор aiohttp.JsonPayload
для объектов типа Mapping.
В этом случае обработчику будет достаточно вернуть объект Response
с данными ответа в параметре body
. aiohttp найдет сериализатор, соответствующий типу данных и сериализует ответ.
Помимо того, что сериализация объектов описана в одном месте, этот подход еще и более гибкий — он позволяет реализовывать очень интересные решения (мы рассмотрим один из вариантов использования в обработчике GET /imports/$import_id/citizens
).
from types import MappingProxyType
from typing import Mapping
from aiohttp import PAYLOAD_REGISTRY, JsonPayload
from aiohttp.web import run_app, Application, Response, View
PAYLOAD_REGISTRY.register(JsonPayload, (Mapping, MappingProxyType))
class SomeView(View):
async def get(self):
return Response(body={'hello': 'world'})
app = Application()
app.router.add_route('*', '/hello', SomeView)
run_app(app)
Важно понимать, что метод json_response
, как и aiohttp.JsonPayload
, используют стандартный json.dumps
, который не умеет сериализовать сложные типы данных, например datetime.date
или asyncpg.Record
(asyncpg
возвращает записи из БД в виде экземпляров этого класса). Более того, одни сложные объекты могут содержать другие: в одной записи из БД может быть поле типа datetime.date
.
Разработчики Python предусмотрели эту проблему: метод json.dumps
позволяет с помощью аргумента default
указать функцию, которая вызывается, когда необходимо сериализовать незнакомый объект. Ожидается, что функция приведет незнакомый объект к типу, который умеет сериализовать модуль json.
import json
from datetime import date
from functools import partial, singledispatch
from typing import Any
from aiohttp.payload import JsonPayload as BaseJsonPayload
from aiohttp.typedefs import JSONEncoder
@singledispatch
def convert(value):
raise NotImplementedError(f'Unserializable value: {value!r}')
@convert.register(Record)
def convert_asyncpg_record(value: Record):
"""
Позволяет автоматически сериализовать результаты запроса, возвращаемые
asyncpg
"""
return dict(value)
@convert.register(date)
def convert_date(value: date):
"""
В проекте объект date возвращается только в одном случае — если необходимо
отобразить дату рождения. Для отображения даты рождения должен
использоваться формат ДД.ММ.ГГГГ
"""
return value.strftime('%d.%m.%Y')
dumps = partial(json.dumps, default=convert)
class JsonPayload(BaseJsonPayload):
def __init__(self,
value: Any,
encoding: str = 'utf-8',
content_type: str = 'application/json',
dumps: JSONEncoder = dumps,
*args: Any,
**kwargs: Any) -> None:
super().__init__(value, encoding, content_type, dumps, *args, **kwargs)
Обработчики
aiohttp позволяет реализовать обработчики асинхронными функциями и классами. Классы более расширяемы: во-первых, код, относящийся к одному обработчику, можно разместить в одном месте, а во вторых, классы позволяют использовать наследование для избавления от дублирования кода (например, каждому обработчику требуется соединение с базой данных).
from aiohttp.web_urldispatcher import View
from asyncpgsa import PG
class BaseView(View):
URL_PATH: str
@property
def pg(self) -> PG:
return self.request.app['pg']
Так как один большой файл читать сложно, я решил разнести обработчики по файлам. Маленькие файлы поощряют слабую связность, а если, например, есть кольцевые импорты внутри хэндлеров — значит, возможно, что-то не так с композицией сущностей.
POST /imports
На вход обработчик получает json с данными о жителях. Максимально допустимый размер запроса в aiohttp регулируется опцией client_max_size
и по умолчанию равен 2 МБ. При превышении лимита aiohttp вернет HTTP-ответ со статусом 413: Request Entity Too Large Error.
В то же время корректный json c максимально длинными строчками и цифрами будет весить ~63 мегабайта, поэтому ограничения на размер запроса необходимо расширить.
Далее, необходимо проверить и десериализовать данные. Если они некорректные, нужно вернуть HTTP-ответ 400: Bad Request
.
Мне потребовались две схемы Marhsmallow
. Первая, CitizenSchema
, проверяет данные каждого отдельного жителя, а также десериализует строку с днем рождения в объект datetime.date
:
- Тип данных, формат и наличие всех обязательных полей;
- Отсутствие незнакомых полей;
- Дата рождения должна быть указана в формате
DD.MM.YYYY
и не может иметь значения из будущего; - Список родственников каждого жителя должен содержать уникальные существующие в этой выгрузке идентификаторы жителей.
Вторая схема, ImportSchema
, проверяет выгрузку в целом:
citizen_id
каждого жителя в рамках выгрузки должен быть уникален;- Родственные связи должны быть двусторонними (если у жителя #1 в списке родственников указан житель #2, то и у жителя #2 должен быть родственник #1).
Если данные корректные, их необходимо добавить в БД с новым уникальным import_id
.
Для добавления данных потребуется выполнить несколько запросов в разные таблицы. Чтобы в БД не осталось частично добавленных данных в случае возникновения ошибки или исключения (например, при отключении клиента, который не получил ответ полностью, aiohttp бросит исколючение CancelledError), необходимо использовать транзакцию.
Добавлять данные в таблицы необходимо частями, так как в одном запросе к PostgreSQL может быть не более 32 767 аргументов. В таблице citizens
9 полей. Соответственно, за 1 запрос в эту таблицу можно вставить только 32 767 / 9 = 3640 строк, а в одной выгрузке может быть до 10 000 жителей.
GET /imports/$import_id/citizens
Обработчик возвращает всех жителей для выгрузки с указанным import_id
. Если указанная выгрузка не существует, необходимо вернуть HTTP-ответ 404: Not Found. Это поведение выглядит общим для обработчиков, которым требуется существующая выгрузка, поэтому я вынес код проверки в отдельный класс.
from aiohttp.web_exceptions import HTTPNotFound
from sqlalchemy import select, exists
from analyzer.db.schema import imports_table
class BaseImportView(BaseView):
@property
def import_id(self):
return int(self.request.match_info.get('import_id'))
async def check_import_exists(self):
query = select([
exists().where(imports_table.c.import_id == self.import_id)
])
if not await self.pg.fetchval(query):
raise HTTPNotFound()
Чтобы получить список родственников для каждого жителя, потребуется выполнить LEFT JOIN
из таблицы citizens
в таблицу relations
, агрегируя поле relations.relative_id
с группировкой по import_id
и citizen_id
.
Если у жителя нет родственников, то LEFT JOIN
вернет для него в поле relations.relative_id
значение NULL
и в результате агрегации список родственников будет выглядеть как [NULL]
.
Чтобы исправить это некорректное значение, я воспользовался функцией array_remove.
БД хранит дату в формате YYYY-MM-DD
, а нам нужен формат DD.MM.YYYY
.
Технически форматировать дату можно либо SQL-запросом, либо на стороне Python в момент сериализации ответа с json.dumps
(asyncpg возвращает значение поля birth_date
как экземпляр класса datetime.date
).
Я выбрал сериализацию на стороне Python, учитывая, что birth_date
— единственный объект datetime.date
в проекте с единым форматом (см. раздел «Сериализация данных»).
Несмотря на то, что в обработчике выполняется два запроса (проверка на существование выгрузки и запрос на получение списка жителей), использовать транзакцию необязательно. По умолчанию PostgreSQL использует уровень изоляции READ COMMITTED
и даже в рамках одной транзакции будут видны все изменения других, успешно завершенных транзакций (добавление новых строк, изменение существующих).
Самая большая выгрузка в текстовом представлении может занимать ~63 мегабайта — это достаточно много, особенно учитывая, что одновременно может прийти несколько запросов на получение данных. Есть достаточно интересный способ получать данные из БД с помощью курсора и отправлять их клиенту по частям.
Для этого нам потребуется реализовать два объекта:
- Объект
SelectQuery
типаAsyncIterable
, возвращающий записи из базы данных. При первом обращении подключается к базе, открывает транзакцию и создает курсор, при дальнейшей итерации возвращает записи из БД. Возвращается обработчиком.Код SelectQueryfrom collections import AsyncIterable from asyncpgsa.transactionmanager import ConnectionTransactionContextManager from sqlalchemy.sql import Select class SelectQuery(AsyncIterable): """ Используется, чтобы отправлять данные из PostgreSQL клиенту сразу после получения, по частям, без буфферизации всех данных """ PREFETCH = 500 __slots__ = ( 'query', 'transaction_ctx', 'prefetch', 'timeout' ) def __init__(self, query: Select, transaction_ctx: ConnectionTransactionContextManager, prefetch: int = None, timeout: float = None): self.query = query self.transaction_ctx = transaction_ctx self.prefetch = prefetch or self.PREFETCH self.timeout = timeout async def __aiter__(self): async with self.transaction_ctx as conn: cursor = conn.cursor(self.query, prefetch=self.prefetch, timeout=self.timeout) async for row in cursor: yield row
- Сериализатор
AsyncGenJSONListPayload
, который умеет итерироваться по асинхронным генераторам, сериализовать данные из асинхронного генератора в JSON и отправлять данные клиентам по частям. Регистрируется вaiohttp.PAYLOAD_REGISTRY
как сериализатор объектовAsyncIterable
.Код AsyncGenJSONListPayloadimport json from functools import partial from aiohttp import Payload # Функция, умеющая сериализовать в JSON объекты asyncpg.Record и datetime.date dumps = partial(json.dumps, default=convert, ensure_ascii=False) class AsyncGenJSONListPayload(Payload): """ Итерируется по объектам AsyncIterable, частями сериализует данные из них в JSON и отправляет клиенту """ def __init__(self, value, encoding: str = 'utf-8', content_type: str = 'application/json', root_object: str = 'data', *args, **kwargs): self.root_object = root_object super().__init__(value, content_type=content_type, encoding=encoding, *args, **kwargs) async def write(self, writer): # Начало объекта await writer.write( ('{"%s":[' % self.root_object).encode(self._encoding) ) first = True async for row in self._value: # Перед первой строчкой запятая не нужнаа if not first: await writer.write(b',') else: first = False await writer.write(dumps(row).encode(self._encoding)) # Конец объекта await writer.write(b']}')
Далее, в обработчике можно будет создать объект SelectQuery
, передать ему SQL запрос и функцию для открытия транзакции и вернуть его в Response body
:
# analyzer/api/handlers/citizens.py
from aiohttp.web_response import Response
from aiohttp_apispec import docs, response_schema
from analyzer.api.schema import CitizensResponseSchema
from analyzer.db.schema import citizens_table as citizens_t
from analyzer.utils.pg import SelectQuery
from .query import CITIZENS_QUERY
from .base import BaseImportView
class CitizensView(BaseImportView):
URL_PATH = r'/imports/{import_id:d+}/citizens'
@docs(summary='Отобразить жителей для указанной выгрузки')
@response_schema(CitizensResponseSchema())
async def get(self):
await self.check_import_exists()
query = CITIZENS_QUERY.where(
citizens_t.c.import_id == self.import_id
)
body = SelectQuery(query, self.pg.transaction())
return Response(body=body)
aiohttp
обнаружит в реестре aiohttp.PAYLOAD_REGISTRY
зарегистрированный сериализатор AsyncGenJSONListPayload
для объектов типа AsyncIterable
. Затем сериализатор будет итерироваться по объекту SelectQuery
и отправлять данные клиенту. При первом обращении объект SelectQuery
получает соединение к БД, открывает транзакцию и создает курсор, при дальнейшей итерации будет получать данные из БД курсором и возвращать их построчно.
Этот подход позволяет не выделять память на весь объем данных при каждом запросе, но у него есть особенность: приложение не сможет вернуть клиенту соответствующий HTTP-статус, если возникнет ошибка (ведь клиенту уже был отправлен HTTP-статус, заголовки, и пишутся данные).
При возникновении исключения не остается ничего, кроме как разорвать соединение. Исключение, конечно, можно залогировать, но клиент не сможет понять, какая именно ошибка произошла.
С другой стороны, похожая ситуация может возникнуть, даже если обработчик получит все данные из БД, но при передаче данных клиенту моргнет сеть — от этого никто не застрахован.
PATCH /imports/$import_id/citizens/$citizen_id
Обработчик получает на вход идентификатор выгрузки import_id
, жителя citizen_id
, а также json с новыми данными о жителе. В случае обращения к несуществующей выгрузке или жителю необходимо вернуть HTTP-ответ 404: Not Found
.
Переданные клиентом данные требуется проверить и десериализовать. Если они некорректные — необходимо вернуть HTTP-ответ 400: Bad Request
. Я реализовал Marshmallow-схему PatchCitizenSchema
, которая проверяет:
- Тип и формат данных для указанных полей.
- Дату рождения. Она должна быть указана в формате
DD.MM.YYYY
и не может иметь значения из будущего. - Список родственников каждого жителя. Он должен иметь уникальные идентификаторы жителей
Существование родственников, указанных в поле relatives
, можно отдельно не проверять: при добавлении в таблицу relations
несуществующего жителя PostgreSQL вернет ошибку ForeignKeyViolationError
, которую можно обработать и вернуть HTTP-статус 400: Bad Request
.
Какой статус возвращать, если клиент прислал некорректные данные для несуществующего жителя или выгрузки? Семантически правильнее проверять сначала существование выгрузки и жителя (если такого нет — возвращать 404: Not Found
) и только потом —корректные ли данные прислал клиент (если нет — возвращать 400: Bad Request
). На практике часто бывает дешевле сначала проверить данные, и только если они корректные, обращаться к базе.
Оба варианта приемлемы, но я решил выбрать более дешевый второй вариант, так как в любом случае результат операции — ошибка, которая ни на что не влияет (клиент исправит данные и потом так же узнает, что житель не существует).
Если данные корректные, необходимо обновить информацию о жителе в БД. В обработчике потребуется сделать несколько запросов к разным таблицам. Если возникнет ошибка или исключение, изменения в базе данных должны быть отменены, поэтому запросы необходимо выполнять в транзакции.
Метод PATCH
позволяет передавать лишь некоторые поля для изменяемого жителя.
Обработчик необходимо написать таким образом, чтобы он не падал при обращении к данным, которые не указал клиент, а также не выполнял запросы к таблицам, данные в которых не изменились.
Если клиент указал поле relatives
, необходимо получить список существующих родственников. Если он изменился — определить, какие записи из таблицы relatives
необходимо удалить, а какие добавить, чтобы привести базу данных в соответствие с запросом клиента. По умолчанию в PostgreSQL для изоляции транзакций используется уровень READ COMMITTED
. Это означает, что в рамках текущей транзакции будут видны изменения существующих (а также добавления новых) записей других завершенных транзакций. Это может привести к состоянию гонки между конкурентными запросами.
Предположим, существует выгрузка с жителями #1
, #2
, #3
, без родственных связей. Сервис получает два одновременных запроса на изменение жителя #1: {"relatives": [2]}
и {"relatives": [3]}
. aiohttp создаст два обработчика, которые одновременно получат текущее состояние жителя из PostgreSQL.
Каждый обработчик не обнаружит ни одной родственной связи и примет решение добавить новую связь с указанным родственником. В результате у жителя #1 поле relatives равно [2,3]
.
Такое поведение нельзя назвать очевидным. Есть два варианта ожидаемо решить исход гонки: выполнить только первый запрос, а для второго вернуть HTTP-ответ
409: Conflict
(чтобы клиент повторил запрос), либо выполнить запросы по очереди (второй запрос будет обработан только после завершения первого).
Первый вариант можно реализовать, включив режим изоляции SERIALIZABLE
. Если во время обработки запроса кто-то уже успел изменить и закоммитить данные, будет брошено исключение, которое можно обработать и вернуть соответствующий HTTP-статус.
Минус такого решения — большое число блокировок в PostgreSQL, SERIALIZABLE
будет вызывать исключение, даже если конкурентные запросы меняют записи жителей из разных выгрузок.
Также можно воспользоваться механизмом рекомендательных блокировок. Если получить такую блокировку по import_id
, конкурентные запросы для разных выгрузок смогут выполняться параллельно.
Для обработки конкурентных запросов в одной выгрузке можно реализовать поведение любого из вариантов: функция pg_try_advisory_xact_lock
пытается получить блокировку и
возвращает результат boolean немедленно (если блокировку получить не удалось — можно бросить исключение), а pg_advisory_xact_lock
ожидает, пока
ресурс не станет доступен для блокировки (в этом случае запросы выполнятся последовательно, я остановился на этом варианте).
В итоге обработчик должен вернуть актуальную информацию об обновленном жителе. Можно было ограничиться возвращением клиенту данных из его же запроса (раз мы возвращаем ответ клиенту, значит, исключений не было и все запросы успешно выполнены). Или — воспользоваться ключевым словом RETURNING в запросах, изменяющих БД, и сформировать ответ из полученных результатов. Но оба этих подхода не позволили бы увидеть и протестировать случай с гонкой состояний.
К сервису не предъявлялись требования по высокой нагрузке, поэтому я решил запрашивать все данные о жителе заново и возвращать клиенту честный результат из БД.
GET /imports/$import_id/citizens/birthdays
Обработчик вычисляет число подарков, которое приобретет каждый житель выгрузки своим родственникам (первого порядка). Число сгруппировано по месяцам для выгрузки с указанным import_id
. В случае обращения к несуществующей выгрузке необходимо вернуть HTTP-ответ 404: Not Found
.
Есть два варианта реализации:
- Получить данные для жителей с родственниками из базы, а на стороне Python агрегировать данные по месяцам и сгенерировать списки для тех месяцев, для которых нет данных в БД.
- Cоставить json-запрос в базу и дописать для отсутствующих месяцев заглушки.
Я остановился на первом варианте — визуально он выглядит более понятным и поддерживаемым. Число дней рождений в определенном месяце можно получить, сделав JOIN
из таблицы с родственными связями (relations.citizen_id
— житель, для которого мы считаем дни рождения родственников) в таблицу citizens
(содержит дату рождения, из которой требуется получить месяц).
Значения месяцев не должны содержать ведущих нулей. Месяц, получаемый из поля birth_date
c помощью функции date_part
, может содержать ведущий ноль. Чтобы убрать его, я выполнил cast
к integer
в SQL-запросе.
Несмотря на то, что в обработчике требуется выполнить два запроса (проверить существование выгрузки и получить информации о днях рождения и подарках), транзакция не требуется.
По умолчанию PostgreSQL использует режим READ COMMITTED, при котором в текущей транзакции видны все новые (добавляемые другими транзакциями) и существующие (изменяемые другими транзакциями) записи после их успешного завершения.
Например, если в момент получения данных будет добавлена новая выгрузка — она никак не повлияет на существующие. Если в момент получения данных будет выполнен запрос на изменение жителя — то либо данные еще не будут видны (если транзакция, меняющая данные, не завершилась), либо транзакция полностью завершится и станут видны сразу все изменения. Целостность получаемых из базы данных не нарушится.
GET /imports/$import_id/towns/stat/percentile/age
Обработчик вычисляет 50-й, 75-й и 99-й перцентили возрастов (полных лет) жителей по городам в выборке с указанным import_id. В случае обращения к несуществующей выгрузке необходимо вернуть HTTP-ответ 404: Not Found
.
Несмотря на то, что в обработчике выполняется два запроса (проверка на существование выгрузки и получение списка жителей), использовать транзакцию необязательно.
Есть два варианта реализации:
- Получить из БД возраста жителей, сгруппированные по городам, а затем на стороне Python вычислить перцентили с помощью numpy (который в задании указан как эталонный) и округлить до двух знаков после запятой.
- Сделать всю работу на стороне PostgreSQL: функция percentile_cont вычисляет перцентиль с линейной интерполяцией, затем округляем полученные значения до двух знаков после запятой в рамках одного SQL-запроса, а numpy используем для тестирования.
Второй вариант требует передавать меньше данных между приложением и PostgreSQL, но у него есть не очень очевидный подводный камень: в PostgreSQL округление математическое, (SELECT ROUND(2.5)
вернет 3), а в Python — бухгалтерское, к ближайшему целому (round(2.5)
вернет 2).
Чтобы тестировать обработчик, реализация должна быть одинаковой и в PostgreSQL, и в Python (реализовать функцию с математическим округлением в Python выглядит проще). Стоит отметить, что при вычислении перцентилей numpy и PostgreSQL могут возвращать немного отличающиеся числа, но с учетом округления эта разница будет незаметна.
Тестирование
Что нужно проверить в этом приложении? Во-первых, что обработчики отвечают требованиям и выполняют требуемую работу в окружении, максимально близком к боевому. Во-вторых, что миграции, которые изменяют состояние базы данных, работают без ошибок. В-третьих, есть ряд вспомогательных функций, которые тоже было бы правильно покрыть тестами.
Я решил воспользоваться фреймворком pytest из-за его гибкости и простоты в использовании. Он предлагает мощный механизм подготовки окружения для тестов — фикстуры, то есть функции с декоратором pytest.mark.fixture
, названия которых можно указать параметром в тесте. Если pytest обнаружит в аннотации теста параметр с названием фикстуры, он выполнит эту фикстуру и передаст результат в значении этого параметра. А если фикстура является генератором, то параметр теста примет значение, возвращаемое yield
, и после окончания теста выполнится вторая часть фикстуры, которая может очистить ресурсы или закрыть соединения.
Для большинства тестов нам потребуется база данных PostgreSQL. Чтобы изолировать тесты друг от друга, можно перед выполнением каждого теста создавать отдельную базу данных, а после выполнения — удалять ее.
import os
import uuid
import pytest
from sqlalchemy import create_engine
from sqlalchemy_utils import create_database, drop_database
from yarl import URL
from analyzer.utils.pg import DEFAULT_PG_URL
PG_URL = os.getenv('CI_ANALYZER_PG_URL', DEFAULT_PG_URL)
@pytest.fixture
def postgres():
tmp_name = '.'.join([uuid.uuid4().hex, 'pytest'])
tmp_url = str(URL(PG_URL).with_path(tmp_name))
create_database(tmp_url)
try:
# Это значение будет иметь параметр postgres в функции-тесте
yield tmp_url
finally:
drop_database(tmp_url)
def test_db(postgres):
"""
Пример теста, использующего PostgreSQL
"""
engine = create_engine(postgres)
assert engine.execute('SELECT 1').scalar() == 1
engine.dispose()
C этой задачей здорово справился модуль sqlalchemy_utils, учитывающий особенности разных баз данных и драйверов. Например, PostgreSQL не разрешает выполнение CREATE DATABASE
в блоке транзакции. При создании БД sqlalchemy_utils
переводит psycopg2
(который обычно выполняет все запросы в транзакции) в режим autocommit.
Другая важная особенность: если к PostgreSQL подключен хотя бы один клиент — базу данных нельзя удалить, а sqlalchemy_utils
отключает всех клиентов перед удалением базы. БД будет успешно удалена, даже если зависнет какой-нибудь тест, имеющий активные подключения к ней.
PostgreSQL потребуется нам в разных состояниях: для тестирования миграций необходима чистая база данных, в то время как обработчики требуют, чтобы все миграции были применены. Изменять состояние базы данных можно программно с помощью команд Alembic, для их вызова требуется объект конфигурации Alembic.
from types import SimpleNamespace
import pytest
from analyzer.utils.pg import make_alembic_config
@pytest.fixture()
def alembic_config(postgres):
cmd_options = SimpleNamespace(config='alembic.ini', name='alembic',
pg_url=postgres, raiseerr=False, x=None)
return make_alembic_config(cmd_options)
Обратите внимание, что у фикстуры alembic_config
есть параметр postgres
— pytest
позволяет не только указывать зависимость теста от фикстур, но и зависимости между фикстурами.
Этот механизм позволяет гибко разделять логику и писать очень краткий и переиспользуемый код.
Обработчики
Для тестирования обработчиков требуется база данных с созданными таблицами и типами данных. Чтобы применить миграции, необходимо программно вызвать команду upgrade Alembic. Для ее вызова потребуется объект с конфигурацией Alembic, который мы уже определили фикстурой alembic_config
. База данных с миграциями выглядит как вполне самостоятельная сущность, и ее можно представить в виде фикстуры:
from alembic.command import upgrade
@pytest.fixture
async def migrated_postgres(alembic_config, postgres):
upgrade(alembic_config, 'head')
# Возвращаем DSN базы данных, которая была смигрирована
return postgres
Когда миграций в проекте становится много, их применение для каждого теста может занимать слишком много времени. Чтобы ускорить процесс, можно один раз создать базу данных с миграциями и затем использовать ее в качестве шаблона.
Помимо базы данных для тестирования обработчиков, потребуется запущенное приложение, а также клиент, настроенный на работу с этим приложением. Чтобы приложение было легко тестировать, я вынес его создание в функцию create_app
, которая принимает параметры для запуска: базу данных, порт для REST API и другие.
Аргументы для запуска приложения можно также представить в виде отдельной фикстуры. Для их создания потребуется определить свободный порт для запуска тестируемого приложения и адрес до смигрированной временной базы данных.
Для определения свободного порта я воспользовался фикстурой aiomisc_unused_port
из пакета aiomisc.
Стандартная фикстура aiohttp_unused_port
тоже вполне бы подошла, но она возвращает функцию для определения свободых портов, в то время как aiomisc_unused_port
возвращает сразу номер порта. Для нашего приложения требуется определить только один свободный порт, поэтому я решил не писать лишнюю строчку кода с вызовом aiohttp_unused_port
.
@pytest.fixture
def arguments(aiomisc_unused_port, migrated_postgres):
return parser.parse_args(
[
'--log-level=debug',
'--api-address=127.0.0.1',
f'--api-port={aiomisc_unused_port}',
f'--pg-url={migrated_postgres}'
]
)
Все тесты с обработчиками подразумевают запросы к REST API, работа напрямую с приложением aiohttp
не требуется. Поэтому я сделал одну фикстуру, которая запускает приложение и с помощью фабрики aiohttp_client
создает и возвращает подключенный к приложению стандартный тестовый клиент aiohttp.test_utils.TestClient
.
from analyzer.api.app import create_app
@pytest.fixture
async def api_client(aiohttp_client, arguments):
app = create_app(arguments)
client = await aiohttp_client(app, server_kwargs={
'port': arguments.api_port
})
try:
yield client
finally:
await client.close()
Теперь, если в параметрах теста указать фикстуру api_client
, произойдет следующее:
- Фикстура
postgres
создаст базу данных (зависимость дляmigrated_postgres
). - Фикстура
alembic_config
создаст объект конфигурации Alembic, подключенный к временной базе данных (зависимость дляmigrated_postgres
). - Фикстура
migrated_postgres
применит миграции (зависимость дляarguments
). - Фикстура
aiomisc_unused_port
обнаружит свободный порт (зависимость дляarguments
). - Фикстура
arguments
создаст аргументы для запуска (зависимость дляapi_client
). - Фикстура
api_client
создаст и запустит приложение и вернет клиента для выполнения запросов. - Выполнится тест.
- Фикстура
api_client
отключит клиента и остановит приложение. - Фикстура
postgres
удалит базу данных.
Фикстуры позволяют избежать дублирования кода, но помимо подготовки окружения в тестах есть еще одно потенциальное место, в котором будет очень много одинакового кода — запросы к приложению.
Во-первых, сделав запрос, мы ожидаем получить определенный HTTP-статус. Во-вторых, если статус совпадает с ожидаемым, то перед работой с данными необходимо убедиться, что они имеют правильный формат. Здесь легко ошибиться и написать обработчик, который делает правильные вычисления и возвращает правильный результат, но не проходит автоматическую валидацию из-за неправильного формата ответа (например, забыть обернуть ответ в словарь с ключом data
). Все эти проверки можно было бы сделать в одном месте.
В модуле analyzer.testing
я подготовил для каждого обработчика функцию-помощник, которая проверяет статус HTTP, а также формат ответа с помощью Marshmallow.
GET /imports/$import_id/citizens
Я решил начать с обработчика, возвращающего жителей, потому что он очень полезен для проверки результатов работы других обработчиков, изменяющих состояние базы данных.
Я намеренно не использовал код, добавляющий данные в базу из обработчика POST /imports
, хотя вынести его в отдельную функцию несложно. Код обработчиков имеет свойство меняться, а если в коде, добавляющем в базу, будет какая-либо ошибка, есть вероятность, что тест перестанет работать как задумано и неявно для разработчиков перестанет показывать ошибки.
Для этого теста я определил следующие наборы данных для тестирования:
- Выгрузка с несколькими родственниками. Проверяет, что для каждого жителя будет правильно сформирован список с идентификаторами родственников.
- Выгрузка с одним жителем без родственников. Проверяет, что поле
relatives
— пустой список (из-заLEFT JOIN
в SQL-запросе список родственников может быть равен[None]
). - Выгрузка с жителем, который сам себе родственник.
- Пустая выгрузка. Проверяет, что обработчик разрешает добавить пустую выгрузку и не падает с ошибкой.
Чтобы запустить один и тот же тест отдельно на каждой выгрузке, я воспользовался еще одним очень мощным механизмом pytest — параметризацией. Этот механизм позволяет обернуть функцию-тест в декоратор pytest.mark.parametrize
и описать в нем, какие параметры должна принимать функция-тест для каждого отдельного тестируемого случая.
import pytest
from analyzer.utils.testing import generate_citizen
datasets = [
# Житель с несколькими родственниками
[
generate_citizen(citizen_id=1, relatives=[2, 3]),
generate_citizen(citizen_id=2, relatives=[1]),
generate_citizen(citizen_id=3, relatives=[1])
],
# Житель без родственников
[
generate_citizen(relatives=[])
],
# Выгрузка с жителем, который сам себе родственник
[
generate_citizen(citizen_id=1, name='Джейн', gender='male',
birth_date='17.02.2020', relatives=[1])
],
# Пустая выгрузка
[],
]
@pytest.mark.parametrize('dataset', datasets)
async def test_get_citizens(api_client, dataset):
"""
Этот тест будет вызван 4 раза, отдельно для каждого датасета
"""
Итак, тест добавит выгрузку в базу данных, затем с помощью запроса к обработчику получит информацию о жителях и сравнит эталонную выгрузку с полученной. Но как сравнить жителей?
Каждый житель состоит из скалярных полей и поля relatives
— списка идентификаторов родственников. Список в Python — упорядоченный тип, и при сравнении порядок элементов каждого списка имеет значение, но при сравнении списков с родственниками порядок не должен иметь значение.
Если привести relatives
к множеству перед сравнением, то при сравнении не получится обнаружить ситуацию, когда у одного из жителей в поле relatives
есть дубли. Если отсортировать список с идентификаторами родственников, это позволит обойти проблему разного порядка идентификаторов родственников, но при этом обнаружить дубли.
При сравнении двух списков с жителями можно столкнуться с похожей проблемой: технически, порядок жителей в выгрузке не важен, но важно обнаружить, если в одной выгрузке будет два жителя с одинаковыми идентификаторами, а в другой нет. Так что помимо упорядочивания списка с родственниками relatives для каждого жителя необходимо упорядочить жителей в каждой выгрузке.
Так как задача сравнения жителей возникнет еще не раз, я реализовал две функции: одну для сравнения двух жителей, а вторую для сравнения двух списков с жителями:
from typing import Iterable, Mapping
def normalize_citizen(citizen):
"""
Возвращает жителя с упорядоченным списком родственников
"""
return {**citizen, 'relatives': sorted(citizen['relatives'])}
def compare_citizens(left: Mapping, right: Mapping) -> bool:
"""
Сравнивает двух жителей
"""
return normalize_citizen(left) == normalize_citizen(right)
def compare_citizen_groups(left: Iterable, right: Iterable) -> bool:
"""
Упорядочивает списки с родственниками для каждого жителя, списки с жителями
и сравнивает их
"""
left = [normalize_citizen(citizen) for citizen in left]
left.sort(key=lambda citizen: citizen['citizen_id'])
right = [normalize_citizen(citizen) for citizen in right]
right.sort(key=lambda citizen: citizen['citizen_id'])
return left == right
Чтобы убедиться, что этот обработчик не возвращает жителей других выгрузок, я решил перед каждым тестом добавлять дополнительную выгрузку с одним жителем.
POST /imports
Я определил следующие наборы данных для тестирования обработчика:
- Корректные данные, ожидается успешное добавление в БД.
- Житель без родственников (самый простой).
Обработчику необходимо добавить данные в две таблицы. Если не обрабатывается ситуация, когда у жителя нет родственников, будет выполнен пустой insert в таблицу родственных связей, что приведет к ошибке.
- Житель с родственниками (более сложный, обычный).
Проверяет, что обработчик корректно сохраняет данные и о жителе и его родственных связях.
- Житель сам себе родственник.
Про этот случай было много вопросов, поэтому в шутку решил добавить и его. :)
- Выгрузка с максимального размера
Проверяет, что aiohttp позволяет загружать такие объемы данных и что при большом количестве данных в PostgreSQL не отправляется больше 32 767 аргументов (обработчик должен выполнить несколько запросов).
- Пустая выгрузка
Обработчик должен учитывать такой случай и не падать, пытаясь выполнить пустой insert в таблицу с жителями.
- Житель без родственников (самый простой).
- Данные с ошибками, ожидаем HTTP-ответ 400: Bad Request.
- Дата рождения некорректная (будущее время).
- citizen_id в рамках выгрузки не уникален.
- Родственная связь указана неверно (есть только от одного жителя к другому, но нет обратной).
- У жителя указан несуществующий в выгрузке родственник.
- Родственные связи не уникальны.
Если обработчик отработал успешно и данные были добавлены, необходимо получить добавленных в БД жителей и сравнить их с эталонной выгрузки. Для получения жителей я воспользовался уже протестированным обработчиком GET /imports/$import_id/citizens
, а для сравнения — функцией compare_citizen_groups
.
PATCH /imports/$import_id/citizens/$citizen_id
Валидация данных во многом похожа на описанную в обработчике POST /imports
с небольшими исключениями: есть только один житель и клиент может передать только те поля, которые пожелает.
Я решил использовать следующие наборы с некорректными данными, чтобы проверить, что обработчик вернет HTTP-ответ 400: Bad request
:
- Поле указано, но имеет некорректный тип и/или формат данных
- Указана некорректная дата рождения (будущее время).
- Поле
relatives
содержит несуществующего в выгрузке родственника.
Также необходимо проверить, что обработчик корректно обновляет информацию о жителе и его родственниках.
Для этого создадим выгрузку с тремя жителями, два из которых — родственники, и отправим запрос с новыми значениями всех скалярных полей и новым идентификатором родственника в поле relatives
.
Чтобы убедиться, что обработчик различает жителей разных выгрузок перед тестом (и, например, не изменит жителей с одинаковыми идентификаторами из другой выгрузки), я создал дополнительную выгрузку с тремя жителями, которые имеют такие же идентификаторы.
Обработчик должен сохранить новые значения скалярных полей, добавить нового указанного родственника и удалить связь со старым, не указанным родственником. Все изменения родственных связей должны быть двусторонними. Изменений в других выгрузках быть не должно.
Поскольку такой обработчик может быть подвержен состоянию гонки (это рассматривалось в разделе «Разработка»), я добавил два дополнительных теста. Один воспроизводит проблему с состоянием гонки (расширяет класс обработчика и убирает блокировку), второй доказывает, что проблема с состоянием гонки не воспроизводится.
GET /imports/$import_id/citizens/birthdays
Для тестирования этого обработчика я выбрал следующие наборы данных:
- Выгрузка, в которой у жителя есть один родственник в одном месяце и два родственника в другом.
- Выгрузка с одним жителем без родственников. Проверяет, что обработчик не учитывает его при расчетах.
- Пустая выгрузка. Проверяет, что обработчик не упадет с ошибкой и вернет в ответе корректный словарь с 12 месяцами.
- Выгрузка с жителем, который сам себе родственник. Проверяет, что житель купит себе подарок в месяц своего рождения.
Обработчик должен возвращать в ответе все месяцы, даже если в эти месяцы нет дней рождений. Чтобы избежать дублирования, я сделал функцию, которой можно передать словарь, чтобы она дополнила его значениями для отсутствующих месяцев.
Чтобы убедиться, что обработчик различает жителей разных выгрузок, я добавил дополнительную выгрузку с двумя родственниками. Если обработчик по ошибке использует их при расчетах, то результаты будут некорректными и обработчик упадет с ошибкой.
GET /imports/$import_id/towns/stat/percentile/age
Особенность этого теста в том, что результаты его работы зависят от текущего времени: возраст жителей вычисляется исходя из текущей даты. Чтобы результаты тестирования не менялись с течением времени, текущую дату, даты рождения жителей и ожидаемые результаты необходимо зафиксировать. Это позволит легко воспроизвести любые, даже краевые случаи.
Как лучше зафиксировать дату? В обработчике для вычисления возраста жителей используется PostgreSQL-функция AGE
, принимающая первым параметром дату, для которой необходимо рассчитать возраст, а вторым — базовую дату (определена константой TownAgeStatView.CURRENT_DATE
).
from unittest.mock import patch
import pytz
CURRENT_DATE = datetime(2020, 2, 17, tzinfo=pytz.utc)
@patch('analyzer.api.handlers.TownAgeStatView.CURRENT_DATE', new=CURRENT_DATE)
async def test_get_ages(...):
...
Для тестирования обработчика я выбрал следующие наборы данных (для всех жителей указывал один город, потому что обработчик агрегирует результаты по городам):
- Выгрузка с несколькими жителями, у которых завтра день рождения (возраст — несколько лет и 364 дня). Проверяет, что обработчик использует в расчетах только количество полных лет.
- Выгрузка с жителем, у которого сегодня день рождения (возраст — ровно несколько лет). Проверяет краевой случай — возраст жителя, у которого сегодня день рождения, не должен рассчитаться как уменьшенный на 1 год.
- Пустая выгрузка. Обработчик не должен на ней падать.
Эталон для расчета перцентилей — numpy
с линейной интерполяцией, и эталонные результаты для тестирования я рассчитал именно им.
Также нужно округлять дробные значения перцентилей до двух знаков после запятой. Если вы использовали в обработчике для округления PostgreSQL, а для расчета эталонных данных — Python, то могли заметить, что округление в Python 3 и PostgreSQL может давать разные результаты.
# Python 3
round(2.5)
> 2
-- PostgreSQL
SELECT ROUND(2.5)
> 3
Дело в том, что Python использует банковское округление до ближайшего четного, а PostgreSQL — математическое (half-up). В случае, если расчеты и округление производятся в PostgreSQL, было бы правильным в тестах также использовать математическое округление.
Сначала я описал наборы данных с датами рождения в текстовом формате, но читать тест в таком формате было неудобно: приходилось каждый раз вычислять в уме возраст каждого жителя, чтобы вспомнить, что проверяет тот или иной набор данных. Конечно, можно было обойтись комментариями в коде, но я решил пойти чуть дальше и написал функцию age2date
, которая позволяет описать дату рождения в виде возраста: количества лет и дней.
import pytz
from analyzer.utils.testing import generate_citizen
CURRENT_DATE = datetime(2020, 2, 17, tzinfo=pytz.utc)
def age2date(years: int, days: int = 0, base_date=CURRENT_DATE) -> str:
birth_date = copy(base_date).replace(year=base_date.year - years)
birth_date -= timedelta(days=days)
return birth_date.strftime(BIRTH_DATE_FORMAT)
# Сколько лет этому жителю? Посчитать несложно, но если их будет много?
generate_citizen(birth_date='17.02.2009')
# Жителю ровно 11 лет и у него сегодня день рождения
generate_citizen(birth_date=age2date(years=11))
Чтобы убедиться, что обработчик различает жителей разных выгрузок, я добавил дополнительную выгрузку с одним жителем из другого города: если обработчик по ошибке использует его, в результатах появится лишний город и тест сломается.
Интересный факт: когда я писал этот тест 29 февраля 2020 года, у меня внезапно перестали генерироваться выгрузки с жителями из-за бага в Faker (2020-й — високосный год, а другие годы, которые выбирал Faker, не всегда были високосными и в них не было 29 февраля). Не забывайте фиксировать даты и тестировать краевые случаи!
Миграции
Код миграций на первый взгляд кажется очевидным и наименее подверженным ошибкам, зачем его тестировать? Это очень опасное заблуждение: самые коварные ошибки миграций могут проявить себя в самый неподходящий момент. Даже если они не испортят данные, то могут стать причиной лишнего даунтайма.
Существующая в проекте initial миграция изменяет структуру базы данных, но не изменяет данные. От каких типовых ошибок можно защититься в подобных миграциях?
- Метод
downgrade
не реализован или не удалены все созданные в миграции сущности (особенно это касается пользовательских типов данных, которые создаются автоматически при создании таблицы, я про них уже упоминал).Это приведет к тому, что миграцию нельзя будет применить два раза (применить-откатить-применить): при откате не будут удалены все созданные миграцией сущности, при повторном создании миграция пройдет с ошибкой — тип данных уже существует.
- Cинтаксические ошибки и опечатки.
- Ошибки в связях миграций (цепочка нарушена).
Большинство этих ошибок обнаружит stairway-тест. Его идея — применять миграции по одной, последовательно выполняя методы upgrade
, downgrade
, upgrade
для каждой миграции. Такой тест достаточно один раз добавить в проект, он не требует поддержки и будет служить верой и правдой.
А вот если миграция, помимо структуры, изменяла бы данные, то потребовалось бы написать хотя бы один отдельный тест, проверяющий, что данные корректно изменяются в методе upgrade
и возвращаются к изначальному состоянию в downgrade
. На всякий случай: проект с примерами тестирования разных миграций, который я подготовил для доклада про Alembic на Moscow Python.
Сборка
Конечный артефакт, который мы собираемся разворачивать и который хотим получить в результате сборки, — Docker-образ. Для сборки необходимо выбрать базовый образ c Python. Официальный образ python:latest
весит ~1 ГБ и, если его использовать в качестве базового, образ с приложением будет огромным. Существуют образы на основе ОС Alpine, размер которых намного меньше. Но с растущим количеством устанавливаемых пакетов размер конечного образа вырастет, и в итоге даже образ, собранный на основе Alpine, будет не таким уж и маленьким. Я выбрал в качестве базового образа snakepacker/python — он весит немного больше Alpine-образов, но основан на Ubuntu, которая предлагает огромный выбор пакетов и библиотек.
Еще один способ уменьшить размер образа с приложением — не включать в итоговый образ компилятор, библиотеки и файлы с заголовками для сборки, которые не потребуются для работы приложения.
Для этого можно воспользоваться многоступенчатой сборкой Docker:
- С помощью «тяжелого» образа
snakepacker/python:all
(~1 ГБ, в сжатом виде ~500 МБ) создаем виртуальное окружение, устанавливаем в него все зависимости и модуль с приложением. Этот образ нужен исключительно для сборки, он может содержать компилятор, все необходимые библиотеки и файлы с заголовками.FROM snakepacker/python:all as builder # Создаем виртуальное окружение RUN python3.8 -m venv /usr/share/python3/app # Копируем source distribution в контейнер и устанавливаем его COPY dist/ /mnt/dist/ RUN /usr/share/python3/app/bin/pip install /mnt/dist/*
- Готовое виртуальное окружение копируем в «легкий» образ
snakepacker/python:3.8
(~100 МБ, в сжатом виде ~50 МБ), который содержит только интерпретатор требуемой версии Python.Важно: в виртуальном окружении используются абсолютные пути, поэтому его необходимо скопировать по тому же адресу, по которому оно было собрано в контейнере-сборщике.
FROM snakepacker/python:3.8 as api # Копируем готовое виртуальное окружение из контейнера builder COPY --from=builder /usr/share/python3/app /usr/share/python3/app # Устанавливаем ссылки, чтобы можно было воспользоваться командами # приложения RUN ln -snf /usr/share/python3/app/bin/analyzer-* /usr/local/bin/ # Устанавливаем выполняемую при запуске контейнера команду по умолчанию CMD ["analyzer-api"]
Чтобы сократить время на сборку образа, зависимые модули приложения можно установить до его установки в виртуальное окружение. Тогда Docker закеширует их и не будет устанавливать заново, если они не менялись.
############### Образ для сборки виртуального окружения ################
# Основа — «тяжелый» (~1 ГБ, в сжатом виде ~500 ГБ) образ со всеми необходимыми
# библиотеками для сборки модулей
FROM snakepacker/python:all as builder
# Создаем виртуальное окружение и обновляем pip
RUN python3.8 -m venv /usr/share/python3/app
RUN /usr/share/python3/app/bin/pip install -U pip
# Устанавливаем зависимости отдельно, чтобы закешировать. При последующей сборке
# Docker пропустит этот шаг, если requirements.txt не изменится
COPY requirements.txt /mnt/
RUN /usr/share/python3/app/bin/pip install -Ur /mnt/requirements.txt
# Копируем source distribution в контейнер и устанавливаем его
COPY dist/ /mnt/dist/
RUN /usr/share/python3/app/bin/pip install /mnt/dist/*
&& /usr/share/python3/app/bin/pip check
########################### Финальный образ ############################
# За основу берем «легкий» (~100 МБ, в сжатом виде ~50 МБ) образ с Python
FROM snakepacker/python:3.8 as api
# Копируем в него готовое виртуальное окружение из контейнера builder
COPY --from=builder /usr/share/python3/app /usr/share/python3/app
# Устанавливаем ссылки, чтобы можно было воспользоваться командами
# приложения
RUN ln -snf /usr/share/python3/app/bin/analyzer-* /usr/local/bin/
# Устанавливаем выполняемую при запуске контейнера команду по умолчанию
CMD ["analyzer-api"]
Для удобства сборки я добавил команду make upload
, которая собирает Docker-образ и загружает его на hub.docker.com.
CI
Теперь, когда код покрыт тестами и мы умеем собирать Docker-образ, самое время автоматизировать эти процессы. Первое, что приходит в голову: запускать тесты на создание пул-реквестов, а при добавлении изменений в master-ветку собирать новый Docker-образ и загружать его на Docker Hub (или GitHub Packages, если вы не собираетесь распространять образ публично).
Я решил эту задачу с помощью GitHub Actions. Для этого потребовалось создать YAML-файл в папке .github/workflows
и описать в нем workflow (c двумя задачами: test
и publish
), которое я назвал CI
.
Задача test
выполняется при каждом запуске workflow CI
, с помощью services поднимает контейнер с PostgreSQL, ожидает, когда он станет доступен, и запускает pytest
в контейнере snakepacker/python:all
.
Задача publish
выполняется, только если изменения были добавлены в ветку master
и если задача test
была выполнена успешно. Она собирает source distribution контейнером snakepacker/python:all
, затем собирает и загружает Docker-образ с помощью docker/build-push-action@v1
.
name: CI
# Workflow должен выполняться при добавлении изменений
# или новом пул-реквесте в master
on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
jobs:
# Тесты должны выполняться при каждом запуске workflow
test:
runs-on: ubuntu-latest
services:
postgres:
image: docker://postgres
ports:
- 5432:5432
env:
POSTGRES_USER: user
POSTGRES_PASSWORD: hackme
POSTGRES_DB: analyzer
steps:
- uses: actions/checkout@v2
- name: test
uses: docker://snakepacker/python:all
env:
CI_ANALYZER_PG_URL: postgresql://user:hackme@postgres/analyzer
with:
args: /bin/bash -c "pip install -U '.[dev]' && pylama && wait-for-port postgres:5432 && pytest -vv --cov=analyzer --cov-report=term-missing tests"
# Сборка и загрузка Docker-образа с приложением
publish:
# Выполняется только если изменения попали в ветку master
if: github.event_name == 'push' && github.ref == 'refs/heads/master'
# Требует, чтобы задача test была выполнена успешно
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: sdist
uses: docker://snakepacker/python:all
with:
args: make sdist
- name: build-push
uses: docker/build-push-action@v1
with:
username: ${{ secrets.REGISTRY_LOGIN }}
password: ${{ secrets.REGISTRY_TOKEN }}
repository: alvassin/backendschool2019
target: api
tags: 0.0.1, latest
Теперь при добавлении изменений в master во вкладке Actions на GitHub можно увидеть запуск тестов, сборку и загрузку Docker-образа:
А при создании пул-реквеста в master-ветку в нем также будут отображаться результаты выполнения задачи test
:
Деплой
Чтобы развернуть приложение на предоставленном сервере, нужно установить Docker, Docker Compose, запустить контейнеры с приложением и PostgreSQL и применить миграции.
Эти шаги можно автоматизировать с помощью системы управления конфигурациями Ansible. Она написана на Python, не требует специальных агентов (подключается прямо по ssh), использует jinja-шаблоны и позволяет декларативно описывать желаемое состояние в YAML-файлах. Декларативный подход позволяет не задумываться о текущем состоянии системы и действиях, необходимых, чтобы привести систему к желаемому состоянию. Вся эта работа ложится на плечи модулей Ansible.
Ansible позволяет сгруппировать логически связанные задачи в роли и затем переиспользовать. Нам потребуются две роли: docker
(устанавливает и настраивает Docker) и analyzer
(устанавливает и настраивает приложение).
Роль docker
добавляет в систему репозиторий с Docker, устанавливает и настраивает пакеты docker-ce
и docker-compose
.
Опционально можно наладить автоматическое возобновление работы REST API после перезагрузки сервера. Ubuntu позволяет решить эту задачу силами системы инициализации systemd
. Она управляет юнитами, представляющими собой различные ресурсы (демоны, сокеты, точки монтирования и другие). Чтобы добавить новый юнит в systemd, необходимо описать его конфигурацию в отдельном файле .service и разместить этот файл в одной из специальных папок, например в /etc/systemd/system
. Затем юнит можно запустить, а также включить для него автозагрузку.
Пакет docker-ce
при установке автоматически создаст файл с конфигурацией юнита — необходимо только убедиться, что он запущен и включается при запуске системы. Для Docker Compose файл конфигурации docker-compose@.service
будет создан силами Ansible. Символ @
в названии указывает systemd, что юнит является шаблоном. Это позволяет запускать сервис docker-compose
с параметром — например, с названием нашего сервиса, который будет подставлен вместо %i
в файле конфигурации юнита:
[Unit]
Description=%i service with docker compose
Requires=docker.service
After=docker.service
[Service]
Type=oneshot
RemainAfterExit=true
WorkingDirectory=/etc/docker/compose/%i
ExecStart=/usr/local/bin/docker-compose up -d --remove-orphans
ExecStop=/usr/local/bin/docker-compose down
[Install]
WantedBy=multi-user.target
Роль analyzer
сгенерирует из шаблона файл docker-compose.yml
по адресу /etc/docker/compose/analyzer
, зарегистрирует приложение как автоматически запускаемый сервис в systemd
и применит миграции. Когда роли готовы, необходимо описать playbook.
---
- name: Gathering facts
hosts: all
become: yes
gather_facts: yes
- name: Install docker
hosts: docker
become: yes
gather_facts: no
roles:
- docker
- name: Install analyzer
hosts: api
become: yes
gather_facts: no
roles:
- analyzer
Список хостов, а также переменные, использованные в ролях, можно указать в inventory-файле hosts.ini
.
[api]
130.193.51.154
[docker:children]
api
[api:vars]
analyzer_image = alvassin/backendschool2019
analyzer_pg_user = user
analyzer_pg_password = hackme
analyzer_pg_dbname = analyzer
После того, как все файлы Ansible будут готовы, запустим его:
$ ansible-playbook -i hosts.ini deploy.yml
Допустим, мы сгенерировали выгрузку с жителями, вызвали друг за другом все обработчики, каждый из них отработал менее чем за 10 секунд. Достаточно ли этого? Можно предположить, что скорость обработки данных будет деградировать при увеличении количества данных, загружаемых в сервис. Важно понимать, сколько выгрузок сможет обработать сервис, прежде чем обработчики перестанут укладываться в ограничения.
Хоть для тестирования данного сервиса и не требуется генерировать высокий RPS, его нагрузочное тестирование имеет свою особенность: использовать статический набор запросов не получится. Например, чтобы получить список жителей, необходимо иметь идентификатор выгрузки import_id
, который возвращается обработчиком POST /imports
и может оказаться любым целым числом. Этот подход называется тестированием по сценарию.
Учитывая, что генерация данных уже реализована на Python 3, я решил воспользоваться фреймворком Locust.
Чтобы выполнить нагрузочное тестирование, необходимо описать сценарий в файле locustfile.py
и запустить модуль командой locust
. Затем результаты тестирования можно наблюдать на графиках в веб-интерфейсе или таблице результатов в консоли.
Графики Locust показывают общую информацию. Мне было интересно узнать, на каком раунде сервис не уложится в таймаут. Я добавил переменную с номером текущей
итерации self.round
и логивание каждого запроса с указанием итерации тестирования и времени выполнения.
# locustfile.py
import logging
from http import HTTPStatus
from locust import HttpLocust, constant, task, TaskSet
from locust.exception import RescheduleTask
from analyzer.api.handlers import (
CitizenBirthdaysView, CitizensView, CitizenView, TownAgeStatView
)
from analyzer.utils.testing import generate_citizen, generate_citizens, url_for
class AnalyzerTaskSet(TaskSet):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.round = 0
def make_dataset(self):
citizens = [
# Первого жителя создаем с родственником. В запросе к
# PATCH-обработчику список relatives будет содержать только другого
# жителя, что потребует выполнения максимального кол-ва запросов
# (как на добавление новой родственной связи, так и на удаление
# существующей).
generate_citizen(citizen_id=1, relatives=[2]),
generate_citizen(citizen_id=2, relatives=[1]),
*generate_citizens(citizens_num=9998, relations_num=1000,
start_citizen_id=3)
]
return {citizen['citizen_id']: citizen for citizen in citizens}
def request(self, method, path, expected_status, **kwargs):
with self.client.request(
method, path, catch_response=True, **kwargs
) as resp:
if resp.status_code != expected_status:
resp.failure(f'expected status {expected_status}, '
f'got {resp.status_code}')
logging.info(
'round %r: %s %s, http status %d (expected %d), took %rs',
self.round, method, path, resp.status_code, expected_status,
resp.elapsed.total_seconds()
)
return resp
def create_import(self, dataset):
resp = self.request('POST', '/imports', HTTPStatus.CREATED,
json={'citizens': list(dataset.values())})
if resp.status_code != HTTPStatus.CREATED:
raise RescheduleTask
return resp.json()['data']['import_id']
def get_citizens(self, import_id):
url = url_for(CitizensView.URL_PATH, import_id=import_id)
self.request('GET', url, HTTPStatus.OK,
name='/imports/{import_id}/citizens')
def update_citizen(self, import_id):
url = url_for(CitizenView.URL_PATH, import_id=import_id, citizen_id=1)
self.request('PATCH', url, HTTPStatus.OK,
name='/imports/{import_id}/citizens/{citizen_id}',
json={'relatives': [i for i in range(3, 10)]})
def get_birthdays(self, import_id):
url = url_for(CitizenBirthdaysView.URL_PATH, import_id=import_id)
self.request('GET', url, HTTPStatus.OK,
name='/imports/{import_id}/citizens/birthdays')
def get_town_stats(self, import_id):
url = url_for(TownAgeStatView.URL_PATH, import_id=import_id)
self.request('GET', url, HTTPStatus.OK,
name='/imports/{import_id}/towns/stat/percentile/age')
@task
def workflow(self):
self.round += 1
dataset = self.make_dataset()
import_id = self.create_import(dataset)
self.get_citizens(import_id)
self.update_citizen(import_id)
self.get_birthdays(import_id)
self.get_town_stats(import_id)
class WebsiteUser(HttpLocust):
task_set = AnalyzerTaskSet
wait_time = constant(1)
Выполнив 100 итераций c максимальными выгрузками, я убедился, что время работы всех обработчиков укладывается в ограничения:
Как видно на графике распределения времени ответов обработчиков, скорость обработки запросов почти не деградирует с ростом количества данных (желтый — 95 перцентиль, зеленый — медиана). Даже со ста выгрузками сервис будет работать эффективно.
На графиках потребления ресурсов виден всплеск — установка приложения с помощью Ansible и далее ровное потребление ресурсов с ~20.15 до ~20.30 под нагрузкой от Locust.
Что еще можно сделать?
Профилирование приложения показало, что около четверти всего времени выполнения запросов уходит на сериализацию и десериализацию JSON: данных, отправляемых и получаемых из сервиса, достаточно много. Эти процессы можно существенно ускорить с помощью библиотеки orjson, но сервис придется немного подготовить — orjson
не является drop-in-заменой для стандартного модуля json
Обычно для продакшена требуется несколько копий сервиса, чтобы обеспечить отказоустойчивость и справиться с нагрузкой. Для управления группой сервисов нужен инструмент, показывающий, «жива» ли копия сервиса. Решить эту задачу можно обработчиком /health
, который опрашивает все требуемые для работы ресурсы, в нашем случае — базу данных. Если SELECT 1
выполняется меньше чем за секунду, то сервис жив. Если нет — нужно обратить на него внимание.
Когда приложение очень интенсивно работает с сетью, uvloop может здорово увеличить производительность.
Немаловажным фактором является и читабельность кода. Один мой коллега, Юрий Шиканов, написал объединяющий несколько инструментов модуль gray для автоматической проверки и оформления кода, который легко добавить в pre-commit
Git-хук, настроить одним файлом конфигурации или переменными окружения. Gray позволяет сортировать импорты (isort), оптимизирует выражения python в соответствии с новыми версиями языка (pyupgrade), добавляет запятые в конце вызовов функций, импортов, списков и т. д. (add-trailing-comma), а также приводит кавычки к единому виду (unify).
* * *
На этом у меня все: мы разработали, покрыли тестами, собрали и развернули сервис, а также провели нагрузочное тестирование.
Благодарности
Я хотел бы выразить огромную благодарность ребятам, которые нашли время принять участие в написании этой статьи, поревьювить код, внести свои идеи и замечания: Марии Зеленовой zelma, Владимиру Соломатину leenr, Анастасии Семёновой morkov, Юрию Шиканову dizballanze, Михаилу Шушпанову mishush, Павлу Мосеину pavkazzz и особенно Дмитрию Орлову orlovdl.
Автор: Александр Васин