Какую потенциальную проблему видите в коде?
await _applicationService.Create(application);
await _queue.Publish(new ApplicationCreatedEvent(application));
Сначала создается заявка в БД, после событие о создании отправляется в брокер сообщений(MQ) для оповещения другого сервиса о появлении новой заявки.
Здесь может произойти ошибка в момент отправки события в очередь, когда наша сущность уже создана в БД.
Это приведет к несогласованному состоянию, так как заявка будет хранится только в нашей системе, а другая система не узнает о ней.
Как решить эту проблему, может воспользуемся транзакцией и откатим сохранение, если при отправке события в очередь произойдет ошибка?
await using var transaction = await _unitOfWork.BeginTransaction();
await _applicationService.Add(application); // отслеживание заявки для вставки
await _unitOfWork.SaveChanges(); // выполняется запрос на вставку в БД
try
{
await _queue.Publish(new ApplicationCreatedEvent(application));
}
catch(Exception e)
{
_logger.LogError(e, "Ошибка при отправке события в очередь");
await transaction.Rollback();
return;
}
await transaction.Commit();
Тут уже обработали проблему, но не все случаи, тк ошибка может произойти на стороне БД, когда будем коммитить изменения.
Событие в очередь отправили, а у себя не сохранили, тоже не согласованность.Проблема заключается в том, что эти два действия - сохранение состояния и публикация события, обычно не происходят атомарно.
Поэтому на любом из этих этапов может возникнуть сбой и одно из действий не будет выполнено, что приведет к несогласованному состоянию системы.
Тут нам поможет паттерн Transactional Outbox - как избежать потери сообщений в микросервисной архитектуре.
В качестве решения предлагается атомарно в одной транзакции сохранить изменения сущности и само событие, а вторым шагом выполнять публикацию ранее сохраненного события.
Если сущность и соответствующее ему событие сохраняются в рамках одной транзакции, то это гарантирует, что данные не будут потеряны.
-
В одной транзакции создаем сущность и событие (в другой таблице), которое должно отправиться в очередь.
await _applicationRepository.Add(application);
await _eventRepository.Add(new ApplicationCreatedEvent(application));
await _unitOfWork.SaveChanges();
-
Отправкой событий из базы в брокер будет заниматься отдельный процесс - Outbox processor.
Реализовать можно в том же сервисе в виде фоновой задачи на основе IHostedService (в ASP.NET Core), Cron джобы или развернуть отдельный Worker Service для этого процесса.
Есть одна проблема - с какой частотой сканировать БД для отправки сообщений в очередь?
Нужно учитывать такие факторы: как часто сообщения приходят, насколько быстро нужно доставлять сообщения, производительность БД и тп.
Как работает Outbox Processor?
Он берет пачку событий из базы и обрабатывает данные события, отправляя их в очередь, а после удаляет или помечает события как отправленные (ниже пример отправки события).
async Task SendEvent(ApplicationCreatedEvent applicationCreatedEvent)
{
try
{
await _queue.Publish(applicationCreatedEvent);
await _eventRepository.Delete(applicationCreatedEvent);
}
catch(Exception e)
{
_logger.LogError(e, "Ошибка при обработке события");
}
}
Теперь проблема потери сообщений решена, но все ли это или может есть еще какая-то проблема?
Осталась проблема с дублями событий в очереди, тк при удалении события может произойти ошибка, а оно уже было отправлено в очередь.
Для этого нужно реализовать идемпотентную обработку событий - продюсеру необходимо отправлять идентификатор события в сообщении, а подписчику проверять по идентификатору ранее обработанные события.
P.S.:
Если сообщения не настолько критичны, то профукать одну отправку не страшно и можно оставить реализацию как в первом варианте и не пользоваться данным паттерном.
Если нужна более сложная обработка транзакций, то использовать SAGA, чтобы обрабатывать откаты уже выполненных транзакций со сложной цепочкой взаимодействия, хорошо подойдет для распределенных систем.
Автор: cmekoneup