Осознаем проблему
Предположим, что у нас есть Postgres, Kafka и Elastic. В Postgres живет сущность, например, item.
Иногда возникает необходимость создать поисковый индекс отдельно от основной базы.
Например, если мы не хотим обрушивать поисковый трафик на Postgres мы можем вынести данные из него в Elastic.
И все бы хорошо, если актуальность не требуется — просто запускать крон пересборки индекса в Elastic каждую ночь.
Но что если данные нам нужны актуальные, и ситуация, что через секунду после записи сущности в Postgres запрос на её чтение отдает Not Found — неприемлема?
Ищем варианты решения
Самое простое, что можно предложить — это в API до и после записи в Postgres записать и в Elastic.
Но консистентным это назвать нельзя, что если запись в Elastic не пройдет после успеха в Postgres? Сущность будет потеряна. Навсегда. Поменять местами запись в Postgres и в Elastic? Еще хуже, появятся фантомные сущности, которых нет в основном хранилище, но есть в поисковом.
Ивантяй
Уверенный в себе Middle
Да ладно, и Elastic и Postgres — штуки надежные, такого не будет. Инфа сотка.
Из‑за теории больших чисел, помимо увеличения задержки ответа, такие ошибки будут, в зависимости от нагрузки, уже через час после деплоя или через неделю.
Outbox
Приблизим к реальности и предположим, что события обновления нашей сущности нужны не только для поискового индекса, но и для других задач в соседних командах. Тогда берем и реализуем шаблон outbox.
Теперь в одной ACID транзакции у нас происходит запись и в таблицу item
, и в таблицу events
. А потом, отдельный поток берет события пачками и записывает их в топик Kafka. И здесь уже удаляем из events
после успешной записи в Kafka. Такая схема исключает потерю событий, но добавляет риск дублирования.
Indexator service или добро пожаловать в мир распределенных приложений
Переносимся в Indexator service, который будет отвечать за интеграцию Kafka и Elastic.
Предположим, у нас есть три топика item_created
, item_updated
, item_deleted
в которых приходит ID
сущности. Первая идея просто создать по потоку на каждый топик, и выполнять соответствующие команды в Elastic.
Первая версия такой и была, после чего начались жалобы от бизнеса: «я обновил item, а читаю старую версию», «я удалил сущность, а она осталась».
Дело в том, что порядок событий при чтении в Kafka гарантируется только на уровне партиции, а здесь мы читаем три (а если в топике больше 1 партиции то и больше трех) разные партиции параллельно. Поскольку у нас происходит батчевание в outbox, и запись в Kafka также происходит батчем, то порядок может быть утерян еще проще.
Например, сущность создали и сразу удалили. Из‑за батчевания два события item_created
и item_deleted
поедут в Kafka в одном запросе. И тут мы имеем race condition между потоками, и реальный порядок обработки событий занимает все возможные перестановки: [created, deleted]
, [deleted, created]
. Во втором случае мы сначала получим ошибку, что удаляем несуществующий документ, а потом создадим фантом.
Те же проблемы и с другими событиями и их сочетаниями.
Восстановить порядок
Но как?
Мы можем добавить дату возникновения события, например fired_at
. Что это нам даст? Можно будет сохранять и «настаивать» события перед их обработкой. Например, получив deleted
ждать сколько‑то секунд в надежде что придет created
. Такой вариант, подойдет, только он создаст еще большую задержку обработки.
Текущий подход состоит в определении применимости события к текущему состоянию. Например, мы получили событие deleted
, но в Elastic нет сущности для удаления, и запрос DELETE index/item_id
возвращает Not Found
. Это признак того, что событие выбилось из порядка. Такое событие можно направить на retry‑очередь. Там оно будет пробовать себя до тех пор, пока не придет created
, после чего произойдет удаление и будет достигнут eventual consistency.
Можно построить таблицу:
Событие |
Состояние |
created |
не должно существовать |
updated |
должно существовать |
deleted |
должно существовать |
Остается проблема с упорядочиванием updated
и deleted
, поскольку у них одно требование к состоянию.
Неправильные последовательности, где updated
пришел после deleted
не слишком вредны, поскольку сущность и так удаляется.
Остается упорядочить события updated
между собой, чтобы старая версия не перезаписывала новую. Здесь можно воспользоваться известным подходом оптимистичной блокировки: добавляем в события и в документ Elastic поле version
и организуем наши обновления так, чтобы документ с меньшей версией не перезаписал документ с большей.
Итог
Вот так можно построить несложную согласованность в конечном итоге для CRUD в реальном продакшене.
Вещи, которые опустили, чтобы не усложнять
Дубли событий
Помимо потери порядка, в распределенных системах часто встречаются еще и дубли, в итоге пример неправильной последовательности выше [deleted, created]
может превратиться в [created, deleted, created].
В таком случае логика выше перестает работать, сущность сначала будет создана, удалена, и снова создана, выходит опять фантом.
Чтобы побороть это можно делать soft delete, не удалять сущность по настоящему, а просто помечать удаленной, и игнорировать последний created
. Главное вовремя подчищать такие документы.
Цикл чтение-запись в Elastic
Я упомянул ранее, что нужно сверять версии перед обновлением документа, но Elastic — и сам является распределенной системой, где гарантии, что на момент чтения версии документа, она не устареет? Для этого придется реализовать оптимистичную блокировку при записи в Elastic.
Пересоздание индекса
В реальной жизни, помимо создания индекса в Elastic впервые, придется его пересоздавать при возникновении проблем и аварий. Например, внесли баг в indexator и он попортил данные.
Здесь возникает проблема как консистентно подружить потоковую запись в реальном времени с пересозданием индекса, которое может занять час, а то и более.
Этот челлендж заслуживает, возможно, отдельной статьи.
Как понять, что оно консистентно
Возможно стоит написать интеграционные тесты, генерирующие различные последовательности команд, и проверяющие, что в конечном итоге состояние стало правильным.
Автор: yonesko