В этой статье хочу рассказать о том, как написать полезный сервис, для получения ИНН по персональным данным (паспортные данные). ИНН физического лица получаем с использование сайта https://service.nalog.ru/. Похожая функциональность, скорее всего, уже где-то и кем-то была реализована. Основная идея статьи - поделиться опытом работы с Python в части создания законченного проекта с использованием контейнера зависимостей, создания слушателей для RabbitMQ и работой с базой данных MongoDB. Работа с клиентами сервиса реализована через RabbitMQ в режиме непрерывного чтения очереди, отправкой результата в выходную очередь. Сервис будет жить в Kubernetes, что требует наличие liveness и readiness проб. Для этого используется веб-сервер.
Общие сведения
Сервис будем реализовывать на Python 3.10 с использованием библиотек aio-pika, fastapi, pydantic, motor и других библиотек, которые будут указаны в pyproject.toml проекта. В качестве базы данных используем MongoDB 4+. Обращение к сервису налоговой выполняется при помощи библиотеки aiohttp. Проект размещён в публичном доступе на GitHub.
Приложение функционирует как слушатель входной очереди и веб-сервер для отдачи liveness и readiness-проб. При получении сообщения в очередь, из заголовка reply-to вычитывается имя выходной очереди, в которую будет направлен ответ. Обработка запроса передаётся в сервис, который проверяет наличие похожего запроса в базе данных. В случае отсутствия данных по клиенту, выполняется запрос к внешнему сервису. Внешний сервис может обработать какое-то количество сообщений без запроса капчи. После превышения лимитов, которые доподлинно не известны (но изменяются при общей повышенной нагрузке), сообщение помещается в мёртвую очередь и через указанное в настройках время возвращается в обработку.
Подготовительные работы для базы данных не требуется. При первом подключении к MongoDB будут созданы необходимые коллекции и индексы.
Контракт общения с сервисом
Определим контракт входного сообщения в формате JSON:
Hidden text
{
"requestId": str,
"firstName": str,
"lastName": str,
"middleName": str,
"birthDate": date,
"documentSerial": str,
"documentNumber": str,
"documentDate": date
}
Все поля интуитивно понятны. Атрибут requestId
должен быть уникален в пределах всех сообщений, имеет смысл передавать его как строковое представление GUID
.
Имя выходной очереди может передаваться через поле reply-to заголовка сообщения.
Контракт выходного сообщения будет следующим:
Hidden text
{
"requestId": str,
"inn": str,
"cached": bool,
"details": str,
"elapsedTime": float
}
В ответе будем отдавать код запроса, собственно ИНН и время, за которое отработал сервис запрос и признак кэшированного ответа.
Структура проекта
Общая структура директорий проекта следующая.
src
|--inn_service
|--clients
|--connection_managers
|--core
|--infrastructure
|--controllers
|--handlers
|--http_server
|--queue_manager
|--models
|--repositories
|--serializers
|--services
main.py
.env.example
.gitignore
docker-compose.yaml
pyproject.yaml
В корневой директории будут размещаться инструменты запуска проекта: docker-compose, make-файл запуска линтинга и тестов. Собственно проект размещён в src/inn_service и содержит:
-
clients - клиенты для подключения к действительным поставщикам данных (nalog.ru и прочие);
-
connection_managers - инфраструктурные подключения к базе данных, очередям;
-
core - общий код приложения (собственно приложение, контейнер);
-
infrastructure - менеджер обработчиков очередей, сами обработчики, инфраструктурные контроллеры;
-
models - моделей приложения, DTO-объекты;
-
repositories - репозиторий для работы с базой данных;
-
serializers - сериализаторы входных запросов, данных для отправки в провайдер ИНН;
-
services - сервисы приложения.
Работу по созданию виртуального подключения переложим на PyCharm и poetry. Краткая команда установки: poetry install
.
Настройки приложения
Начнём разработку с создания настроек приложения, используя BaseSettings из пакета pydantic.
В файле settings.py будут находиться настройки.
Hidden text
class Settings(BaseSettings):
app_name: str = 'INN service'
app_request_retry_times: int # Количество попыток обработки внешнего запроса
app_request_retry_sec: int # Время задержки в секундах перед повторной обработкой запроса
http_host: str
http_port: int
http_handler: str = 'asyncio'
mongo_host: str
mongo_port: str
mongo_user: str
mongo_pass: str
mongo_name: str
mongo_rs: Optional[str] = None
mongo_auth: str
mongo_timeout_server_select: int = 5000
rabbitmq_host: str
rabbitmq_port: int
rabbitmq_user: str
rabbitmq_pass: str
rabbitmq_vhost: str
rabbitmq_exchange_type: str
rabbitmq_prefetch_count: int
rabbitmq_source_queue_name: str
client_nalog_url: str # Адрес внешнего сервиса для получения ИНН
client_nalog_timeout_sec: int # Таймаут ожидания ответа от сервиса
client_nalog_retries: int # Количество попыток запросов к внешнему сервису
client_nalog_wait_sec: int # Время ожидания между попытками client_nalog_retries
@property
def mongo_dsn(self) -> str:
mongo_dsn = 'mongodb://{}:{}@{}:{}/{}'.format(
self.mongo_user,
self.mongo_pass,
self.mongo_host,
self.mongo_port,
self.mongo_auth
)
if self.mongo_rs:
mongo_dsn += f'?replicaSet={self.mongo_rs}'
return mongo_dsn
@property
def rabbitmq_dsn(self) -> str:
return 'amqp://{}:{}@{}:{}/{}'.format(
self.rabbitmq_user,
self.rabbitmq_pass,
self.rabbitmq_host,
self.rabbitmq_port,
self.rabbitmq_vhost
)
Предлагаю не указывать значения по умолчанию для настроек. Если что-то пойдёт не так, то сразу увидим проблему. В этот момент можно подготовить сразу и файл .env.example, содержащий настройки по-умолчанию для сервиса.
Подключения к инфраструктуре
Создадим слой подключения к инфраструктуре rabbitmq, mongodb через компоненты aio-pika и motor:
poetry add motor aio-pika fast fastapi uvicorn injector
Слой подключения будет размещаться в connection_managers и предназначен для организация подключения к базе данных и менеджеру очередей. Добавим две миксины для создания механизма регистрации автозапуска и завершения приложения. Механизм автозапуска функций применяется при старте приложения для инициализации подключения к RabbitMQ и MongoDB, а также для создания индексов в коллекции базы данных. В случае возникновения ошибок при подключении, приложение не стартует и выдаётся ошибка в логи.
Hidden text
class StartupEventMixin(ABC):
@abstractmethod
def startup(self) -> Coroutine:
raise NotImplementedError
class ShutdownEventMixin(ABC):
@abstractmethod
def shutdown(self) -> Coroutine:
raise NotImplementedError
На примере RabbitConnectionManager продемонстрируем реализацию.
Hidden text
class RabbitConnectionManager(StartupEventMixin, ShutdownEventMixin, EventLiveProbeMixin):
def startup(self) -> Coroutine:
return self.create_connection()
def shutdown(self) -> Coroutine:
return self.close_connection()
async def create_connection(self) -> None:
self.logger.info('Create connection RabbitMQ')
try:
self._connection = await connect_robust(self._dsn)
self._connection.reconnect_callbacks.add(self.on_connection_restore)
self._connection.close_callbacks.add(self.on_close_connection)
self.connected = True
except ConnectionError as exc:
err_message = f'Rabbit connection problem: {exc}'
self.logger.error(err_message)
raise ConnectionError(err_message)
async def close_connection(self) -> None:
if self._connection:
await self._connection.close()
# ... некоторый код пропущен, полная версия на гитхабе
def on_close_connection(self, *args):
self.logger.error('Lost connection to RabbitMQ...')
self.connected = False
def on_connection_restore(self, *args):
self.logger.info('Connection to RabbitMQ has been restored...')
self._channel = None
self._exchange = None
self.connected = True
При подключении к RabbitMQ устанавливаются функции коллбэков для реагирования на потерю соединения и его восстановление.
Менеджер обработчиков
Менеджер обработчиков предназначен для управления слушателями (consumers) очередей. В проекте используется концепция "мёртвых очередей", которая позволяет отложить сообщение на некоторое время и вернуться к его обработке позже. Причиной для этого может являться долгий ответ от провайдера, временные ошибки провайдера, требование ввода капчи из-за нагрузки. Достаточно подробно механизм мёртвых очередей технически разобран в статье Отложенные ретраи силами RabbitMQ. Каждый обработчик очереди должен хранить и возвращать признак использования ретраев, время между возвратами в основную очередь на обработку, а также имя очереди, которую планирует слушать. Основной код обработчика находится в run_handler
. От функции ожидается True
при успешном обработке либо непоправимой ошибке запроса (некорректное тело сообщения) и False
, если запрос не удалось обработать, но следует повторить позднее.
Код базового обработчика:
Hidden text
class BaseHandler(ABC):
def __init__(
self,
settings: Settings,
logger: AppLogger,
rabbitmq_connection: RabbitConnectionManager
) -> None:
self.settings = settings
self.logger = logger
self.rabbitmq_connection = rabbitmq_connection
@abstractmethod
def get_use_retry(self) -> bool:
raise NotImplementedError
def get_retry_ttl(self) -> int:
return 0
@abstractmethod
def get_source_queue(self) -> str:
raise NotImplementedError
def convert_seconds_to_mseconds(self, value: int) -> int:
return value * 1000
@abstractmethod
async def run_handler(
self,
message: dict,
request_id: Optional[str],
result_queue: Optional[str],
count_retry: Optional[int] = 0
) -> bool:
raise NotImplementedError
Собственно единственный наследник класса RequestHandler
, реализующий приём и обработку сообщения:
Hidden text
class RequestHandler(BaseHandler):
def __init__(
self,
settings: Settings,
logger: AppLogger,
rabbitmq_connection: RabbitConnectionManager,
service: InnService
) -> None:
super().__init__(settings, logger, rabbitmq_connection)
self.source_queue_name = self.settings.rabbitmq_source_queue_name
self.retry_times = self.settings.app_request_retry_times
self.retry_sec = self.settings.app_request_retry_sec
self.service = service
def get_source_queue(self) -> str:
return self.source_queue_name
def get_use_retry(self) -> bool:
return True
def get_retry_ttl(self) -> int:
return self.retry_sec
async def run_handler(
self,
message: dict,
request_id: Optional[str],
result_queue: Optional[str],
count_retry: Optional[int] = 0
) -> bool:
if count_retry > self.retry_times:
self.logger.warning(f'Request {request_id} was rejected by excess attempts {self.retry_times} times')
return True
self.logger.info(f'Get request {request_id} for response {result_queue}')
client_data = RequestSerializer.parse_obj(message)
response = await self.service.get_client_inn(client_data)
if result_queue:
json_message = response.dict()
await self.rabbitmq_connection.send_data_by_queue(json_message, result_queue)
return True
При получении сообщения проверяем количество повторного попадания в очередь через параметр count_retry
. В случае превышения - отправляем статус обработки сообщения (ошибку) в выходную очередь и приостанавливаем обработку данного сообщения. RequestSerializer.parse_obj(message)
не обёрнут в try...except блок потому как менеджер очередей контролирует ошибки преобразования сообщений ValidationError.
Работа с базой данных
Выбор на MongoDB пал из-за простоты использования, отсутствия миграций, гибкой схемы обработки данных. В задаче нет необходимости в хранении зависимых данных, оформление связей между таблицами. Для работы с данными будем использовать паттерн Репозиторий.
В базовом репозитории расположены функции работы с данными, индексами в нотации Mongo, а в конкретных классах реализуем необходимые сервису функции. Создание индексов выполняется при старте приложения в фоновом режиме (флаг background), для чего используется имплементация миксины StartupEventMixin. Запросы набора данных поддерживают пагинацию и сортировку.
Конкретный класс создаётся на каждую отдельную коллекцию. В проекте один репозиторий для клиентских запросов. Модель для хранения данных находится в директории models и называется ClientDataModel
. Клиентская модель создана с типизацией, поддерживаемой MongoDB (datetime вместо date), для атрибута created_at указана функция генерации значения по умолчанию через default_factory. Также в модель добавлена функция подсчёта времени обработки запроса elapsed_time
и метод класса для создания объекта из клиентского запроса.
Hidden text
class ClientDataModel(BaseModel):
created_at: datetime = Field(default_factory=datetime.utcnow)
request_id: str
first_name: str
last_name: str
middle_name: str
birth_date: datetime
birth_place: str = Field(default='')
passport_num: str
document_date: datetime
executed_at: Optional[datetime]
inn: Optional[str]
error: Optional[str]
@classmethod
def create_from_request(cls, request: RequestMqSerializer) -> 'ClientDataModel':
return ClientDataModel(
request_id=request.request_id,
first_name=request.first_name,
last_name=request.last_name,
middle_name=request.middle_name,
birth_date=datetime.combine(request.birth_date, datetime.min.time()),
passport_num='{} {}'.format(request.document_serial, request.document_number),
document_date=datetime.combine(request.document_date, datetime.min.time()),
)
@property
def elapsed_time(self) -> float:
end = self.executed_at or datetime.utcnow()
return (end - self.created_at).total_seconds()
Код базового репозитория:
Hidden text
class BaseRepository(StartupEventMixin):
def __init__(self, mongodb_connection_manager: MongoConnectionManager, setting: Settings) -> None:
self.mongodb_connection_manager = mongodb_connection_manager
self.db_name = setting.mongo_name
@property
def collection_name(self) -> str:
raise NotImplementedError
@property
def collection_indexes(self) -> Iterable[IndexDef]:
raise NotImplementedError
def startup(self) -> Coroutine:
return self.create_indexes()
async def create_index(self, field_name: str, sort_id: int) -> None:
connection = await self.mongodb_connection_manager.get_connection()
collection = connection[self.db_name][self.collection_name]
await collection.create_index([(field_name, sort_id), ], background=True)
async def create_indexes(self) -> None:
tasks = []
for index_item in self.collection_indexes:
tasks.append(self.create_index(index_item.name, index_item.sort))
asyncio.ensure_future(asyncio.gather(*tasks))
async def get_one_document(self, criteria: dict) -> Optional[dict]:
connection = await self.mongodb_connection_manager.get_connection()
collection = connection[self.db_name][self.collection_name]
return await collection.find_one(criteria)
async def get_list_document(
self,
criteria: dict,
sort_criteria: Optional[list] = None,
limit: Optional[int] = 0,
skip: Optional[int] = 0,
) -> List[dict]:
if not sort_criteria:
sort_criteria = []
connection = await self.mongodb_connection_manager.get_connection()
cursor = connection[self.db_name][self.collection_name].find(
criteria,
limit=limit,
skip=skip,
sort=sort_criteria
)
result = list()
async for data in cursor:
result.append(data)
return result
async def save_document(self, data: dict) -> str:
connection = await self.mongodb_connection_manager.get_connection()
result = await connection[self.db_name][self.collection_name].insert_one(data)
return result.inserted_id
async def update_document(self, criteria: dict, data: dict) -> None:
connection = await self.mongodb_connection_manager.get_connection()
await connection[self.db_name][self.collection_name].update_one(criteria, {'$set': data})
Сервисный слой
Сервисный слой выполняет всю необходимую обработку с данными.
-
обращение в базу данных для поиска аналогичного запроса (request_id и паспортные данные);
-
отдать результат, если данные были найдены;
-
выполнить запрос к API;
-
сохранить результат запроса в базу данных;
-
вернуть ответ.
В сервисном слое попытался абстрагироваться от работы с инфраструктурой. Возврат ответа производится в вызывающую функцию, которая должна знать куда вернуть ответ. В данном случае, менеджер очередей "знает" куда ему ответить благодаря наличию поля reply-to в заголовке запроса. Возвращаемое значение оформлено в виде DTO-объекта (RequestDTO
).
Код класса InnService
:
Hidden text
class InnService:
def __init__(
self,
settings: Settings,
logger: AppLogger,
client: NalogApiClient,
storage: RequestRepository
) -> None:
self.settings = settings
self.logger = logger
self.client = client
self.storage_repository = storage
async def get_client_inn_from_storage(self, client_data: RequestSerializer) -> Optional[RequestModel]:
client_passport = f'{client_data.document_serial} {client_data.document_number}'
client_request = await self.storage_repository.find_request(client_passport, client_data.request_id)
return client_request
def update_status(self, model: RequestModel, inn: str, error: str) -> None:
model.inn = inn
model.error = error
async def get_client_inn(self, client_data: RequestSerializer) -> RequestDTO:
"""Получение клиентского ИНН"""
start_process = datetime.utcnow()
model = RequestModel.create_from_request(client_data)
# Получить данные из БД
existing_data = await self.get_client_inn_from_storage(client_data)
if existing_data:
elapsed_time = (datetime.utcnow() - start_process).total_seconds()
return RequestDTO(
request_id=client_data.request_id,
inn=existing_data.inn,
elapsed_time=elapsed_time,
cashed=True
)
# Сделать фактический запрос в Nalog API
request = NalogApiRequestSerializer.create_from_request(client_data)
error, result = None, ''
try:
result = await self.client.send_request_for_inn(request)
except NalogApiClientException as exception:
self.logger.error('Error request to Nalog api service', details=str(exception))
error = str(exception)
self.update_status(model, result, error)
await self.storage_repository.save_request(model)
return RequestDTO(
request_id=model.request_id,
inn=model.inn,
details=model.error,
elapsed_time=model.elapsed_time
)
Второй сервис в приложении - это сервис опроса инфраструктуры для health-check. Инфраструктурные менеджеры, которые необходимо мониторить, должны наследоваться от миксины EventLiveProbeMixin
и реализовать функцию is_connected
.
Клиент
Клиент NalogApiClient предназначен для выполнения POST запроса к https://service.nalog.ru/inn.do и разбора статуса ответа. Функция непосредственного оформления запроса обёрнута в retry декоратор повторителя запроса при возникновении ошибок. Настройки повторителя в общих настройках приложения.
Hidden text
class NalogApiClient:
CLIENT_EXCEPTIONS = (
NalogApiClientException,
aiohttp.ClientProxyConnectionError,
aiohttp.ServerTimeoutError,
)
def __init__(self, settings: Settings, logger: AppLogger):
self.nalog_api_service_url = settings.client_nalog_url
self.request_timeout = settings.client_nalog_timeout_sec
self.retries_times = settings.client_nalog_retries
self.retries_wait = settings.client_nalog_wait_sec
self.logger = logger
self.timeout = aiohttp.ClientTimeout(total=self.request_timeout)
@property
def _headers(self):
return {
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Language": "ru-RU,ru",
"Connection": "keep-alive",
"Origin": "https://service.nalog.ru",
"Referer": self.nalog_api_service_url,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Sec-GPC": "1",
"X-Requested-With": "XMLHttpRequest",
}
async def send_request_for_inn(self, nalog_api_request: NalogApiRequestSerializer) -> Optional[str]:
self.logger.debug(f'Request to nalog api service for {nalog_api_request.client_fullname}')
form_data = nalog_api_request.dict(by_alias=True)
@retry(self.CLIENT_EXCEPTIONS, logger=self.logger, attempts=self.retries_times, wait_sec=self.retries_wait)
async def make_request(client_session: aiohttp.ClientSession):
async with client_session.post(url=self.nalog_api_service_url, data=form_data) as response:
if response.status not in [http.HTTPStatus.OK, http.HTTPStatus.NOT_FOUND]:
response_text = await response.text()
raise NalogApiClientException(response_text)
data = await response.json()
code = data.get('code')
captcha_required = data.get('captchaRequired')
if captcha_required:
raise NalogApiClientException(f'Captcha required for request {nalog_api_request.client_fullname}')
if code == 0:
return 'no inn'
elif code == 1:
return data.get('inn')
else:
raise NalogApiClientException(f'Unable to parse response! Details: {response}')
async with aiohttp.ClientSession(timeout=self.timeout, headers=self._headers) as session:
return await make_request(session)
Контейнер
Контейнер предназначен для сборки необходимых зависимостей и передачи их в приложение. Наш контейнер собран в классе ApplicationContainer. Все зависимости пробрасываются в виде синглтонов @singleton и регистрируются как провайдеры зависимостей типов @provider предоставляемых библиотекой injector. При написании тестов необходимо подготовить другой контейнер с актуальными fake или stub-объектами.
Основной интерес по работе с контейнером сосредоточен в классе ContainerManager
, который используется для проверки реализации миксин EventSubscriberMixin
и EventLiveProbeMixin
. Функция get_event_collection
формирует списки функций обратного вызова для старта и выхода из приложения. Собственно проход по спискам и вызов функций обратного вызова реализован в функциях: run_startup
и run_shutdown
.
Hidden text
class ContainerManager:
def __init__(self, cls_container: Type[Container]) -> None:
self._container = Injector(cls_container())
self._bindings = self._container.binder._bindings
def get_container(self) -> Injector:
return self._container
def get_live_probe_handlers(self) -> List[Type[Callable]]:
result = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, EventLiveProbeMixin):
binding_obj = self._container.get(binding)
result.append(binding_obj.is_connected)
return result
def get_startup_handlers(self):
handlers = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, StartupEventMixin):
binding_obj = self._container.get(binding)
handlers.append(binding_obj.startup())
return handlers
def get_shutdown_handlers(self):
handlers = []
binding_collection = [binding for binding in self._bindings]
for binding in binding_collection:
if issubclass(binding, ShutdownEventMixin):
binding_obj = self._container.get(binding)
handlers.append(binding_obj.shutdown())
return handlers
async def run_startup(self) -> None:
exception = None
for handler in self.get_startup_handlers():
if exception:
handler.close()
else:
try:
await handler
except Exception as exc:
exception = exc
if exception is not None:
raise exception
async def run_shutdown(self) -> None:
handlers = []
for handler in self.get_shutdown_handlers():
handlers.append(handler)
await asyncio.gather(*handlers)
Собственно сам контейнер, в котором производится инициализация нужных экземпляров классов. При написании тестов будет создан аналогичный контейнер.
Hidden text
class ApplicationContainer(Container):
@singleton
@provider
def provide_settings(self) -> Settings:
return Settings()
# ... немного кода пропущено
@singleton
@provider
def provide_mongodb_connection(self, settings: Settings, logger: AppLogger) -> MongoConnectionManager:
return MongoConnectionManager(settings, logger)
@singleton
@provider
def provide_rabbitmq_connection(self, settings: Settings, logger: AppLogger) -> RabbitConnectionManager:
return RabbitConnectionManager(settings, logger)
@singleton
@provider
def provide_nalog_api_client(self, settings: Settings, logger: AppLogger) -> NalogApiClient:
return NalogApiClient(settings, logger)
@singleton
@provider
def provide_request_repository(self, settings: Settings, mongo_connection: MongoConnectionManager) -> RequestRepository:
return RequestRepository(mongo_connection, settings)
Приложение
Основная задача приложения - собрать всё воедино и запустить общий поток выполнения. Код сборки приложения предельно простой, инициализацию классов выполняет менеджер контейнера. Сборка приложения выполняется следующими шагами:
-
получение контейнера, передача его в менеджер контейнеров;
-
инициализация event_loop;
-
добавление обработчиков для очередей;
-
запуск инициализаторов для инфраструктурного слоя (реализующих startup миксины);
-
запуск веб-сервера FastAPI для отдачи health-check;
-
включение глобального обработчика ошибок.
Hidden text
class Application:
def __init__(self, cls_container: Type[Container]) -> None:
self.loop = asyncio.get_event_loop()
self.container_manager = ContainerManager(cls_container)
self.container = self.container_manager.get_container()
self.settings = self.container.get(Settings)
self.logger = self.container.get(AppLogger)
self.live_probe_service = self.container.get(LiveProbeService)
self.queue_manager = self.container.get(QueueManager)
self.app_name = self.settings.app_name
self.http_server = None
def init_application(self):
self.http_server = ServerAPIManager(self.container)
request_handler = self.container.get(RequestHandler)
self.queue_manager.add_handler(request_handler)
live_probe_handlers = self.container_manager.get_live_probe_handlers()
for handler in live_probe_handlers:
self.live_probe_service.add_component(handler)
def run(self) -> None:
self.logger.info(f'Starting application {self.app_name}')
self.init_application()
try:
self.loop.run_until_complete(self.container_manager.run_startup())
tasks = asyncio.gather(
self.http_server.serve(),
self.queue_manager.run_handlers_async(),
)
self.loop.run_until_complete(tasks)
self.loop.run_forever()
except BaseException as exception:
exit(1)
finally:
self.loop.run_until_complete(self.container_manager.run_shutdown())
self.loop.close()
self.logger.info('Application disabled')
Приложение стартует из main-скрипта с использованием небольшой библиотеки typer. Маленькая библиотека имеет возможность удобно обрабатывать параметры командной строки.
Hidden text
import typer
from core.application import Application
from app_container import ApplicationContainer
def main():
try:
application = Application(ApplicationContainer)
application.run()
except BaseException as exc:
typer.echo(f'Error starting application. Details: {str(exc)}')
if __name__ == "__main__":
typer.run(main)
Как это всё запустить?
Проект содержит файл docker-compose для сборки. Необходимо скопировать файл .env.example
в файл .env
.
docker compose build
docker compose up
После выполнения этих команд, будет запущен экземпляр mongodb на 27017 порту и rabbitmq на 5672 порту с админкой на 15672. В административную панель RabbitMQ можно зайти по адресу http://localhost:15672. В разделе очередей необходимо создать новую очередь, в которую будут направляться результаты работы сервиса и прибиндить её к exchange по умолчанию (direct).
Продолжение следует
В статье рассмотрена тема разработки приложения на Python с использованием очередей, контейнером зависимостей и поддержкой health-check. Предлагаю обсудить архитектуру в комментариях, а затем продолжить развивать сервис. Следующими итерациями планирую добавить гипотетического не бесплатного клиента, которого будем использовать после определённого количества запросов в бесплатный сервис. И в завершении написать тесты.
Материалы, которые могут быть полезны для понимания материала:
-
RabbitMQ Tutorials (англ) - https://www.rabbitmq.com/getstarted.html
-
Презентация Dependency injection от создателя Dependency Injector (рус). В проекте используется библиотека injector, но общий смысл romanmogylatov6331 доносит понятно - https://www.youtube.com/watch?v=VdtxdDeG7RA
Автор: Дмитрий Филюшин