Паттерн Transactional Outbox

в 14:15, , рубрики: outbox, transactional outbox, атомарность, идемпотентность, Программирование, согласованность, транзакции

Какую потенциальную проблему видите в коде?

await _applicationService.Create(application);
await _queue.Publish(new ApplicationCreatedEvent(application));

Сначала создается заявка в БД, после событие о создании отправляется в брокер сообщений(MQ) для оповещения другого сервиса о появлении новой заявки.

Здесь может произойти ошибка в момент отправки события в очередь, когда наша сущность уже создана в БД.
Это приведет к несогласованному состоянию, так как заявка будет хранится только в нашей системе, а другая система не узнает о ней.

Как решить эту проблему, может воспользуемся транзакцией и откатим сохранение, если при отправке события в очередь произойдет ошибка?

await using var transaction = await _unitOfWork.BeginTransaction();

await _applicationService.Add(application); // отслеживание заявки для вставки
await _unitOfWork.SaveChanges(); // выполняется запрос на вставку в БД

try
{
    await _queue.Publish(new ApplicationCreatedEvent(application));
}
catch(Exception e)
{
    _logger.LogError(e, "Ошибка при отправке события в очередь");
    await transaction.Rollback();
    return;
}
  
await transaction.Commit();

Тут уже обработали проблему, но не все случаи, тк ошибка может произойти на стороне БД, когда будем коммитить изменения.
Событие в очередь отправили, а у себя не сохранили, тоже не согласованность.

Проблема заключается в том, что эти два действия - сохранение состояния и публикация события, обычно не происходят атомарно.
Поэтому на любом из этих этапов может возникнуть сбой и одно из действий не будет выполнено, что приведет к несогласованному состоянию системы.

Тут нам поможет паттерн Transactional Outbox - как избежать потери сообщений в микросервисной архитектуре.

В качестве решения предлагается атомарно в одной транзакции сохранить изменения сущности и само событие, а вторым шагом выполнять публикацию ранее сохраненного события.
Если сущность и соответствующее ему событие сохраняются в рамках одной транзакции, то это гарантирует, что данные не будут потеряны.

  1. В одной транзакции создаем сущность и событие (в другой таблице), которое должно отправиться в очередь.

await _applicationRepository.Add(application);
await _eventRepository.Add(new ApplicationCreatedEvent(application));
await _unitOfWork.SaveChanges();
  1. Отправкой событий из базы в брокер будет заниматься отдельный процесс - Outbox processor.
    Реализовать можно в том же сервисе в виде фоновой задачи на основе IHostedService (в ASP.NET Core), Cron джобы или развернуть отдельный Worker Service для этого процесса.
    Есть одна проблема - с какой частотой сканировать БД для отправки сообщений в очередь?
    Нужно учитывать такие факторы: как часто сообщения приходят, насколько быстро нужно доставлять сообщения, производительность БД и тп.

Паттерн Transactional Outbox - 1

Как работает Outbox Processor?

Он берет пачку событий из базы и обрабатывает данные события, отправляя их в очередь, а после удаляет или помечает события как отправленные (ниже пример отправки события).

async Task SendEvent(ApplicationCreatedEvent applicationCreatedEvent)
{
    try
    {
        await _queue.Publish(applicationCreatedEvent);
        await _eventRepository.Delete(applicationCreatedEvent);
    }
    catch(Exception e)
    {
        _logger.LogError(e, "Ошибка при обработке события");
    }
}

Теперь проблема потери сообщений решена, но все ли это или может есть еще какая-то проблема?

Осталась проблема с дублями событий в очереди, тк при удалении события может произойти ошибка, а оно уже было отправлено в очередь.

Для этого нужно реализовать идемпотентную обработку событий - продюсеру необходимо отправлять идентификатор события в сообщении, а подписчику проверять по идентификатору ранее обработанные события.

P.S.:

Если сообщения не настолько критичны, то профукать одну отправку не страшно и можно оставить реализацию как в первом варианте и не пользоваться данным паттерном.

Если нужна более сложная обработка транзакций, то использовать SAGA, чтобы обрабатывать откаты уже выполненных транзакций со сложной цепочкой взаимодействия, хорошо подойдет для распределенных систем.

Автор: cmekoneup

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js