В прошлой статье я начал с основ CQRS + Event Sourcing. В этот раз я предлагаю продолжить и более подробно посмотреть на ES.
В примере который я выкладывал с моей прошлой статьей магия Event Sourcing’а была скрыта за абстракцией IRepository и двумя методами IRepository.Save() и IRepository.GetById<>().
Для того чтобы поподробнее разобраться что происходит я решил рассказать о процессе сохранения и загрузки агрегата из Event Store на примере проекта Lokad IDDD Sample от Рината Абдулина. У него в аппликейшен сервисах идет прямое обращение к Event Store, без дополнительных абстракций, поэтому все выглядит очень наглядно. Application Service — это аналог CommandHandler, но который обрабатывает все комманды одного агрегата. Такоей подход очень удобный и мы тоже на него в одном проекте перешли.
ApplicationService
Интерфейс IApplicationService крайне прост.
public interface IApplicationService
{
void Execute(ICommand cmd);
}
В методом Execute мы передаем любые команды и надеемся, что что сервис их перенаправит на нужный обработчик.
Так как у Рината в примере только один аггрегат Customer, то и сервис тоже только один CustomerApplicationService. Собственно поэтому нет необходимости выносить какую-либо логику в базовый класс. Отличное решение для примера на мой взгляд.
Метод Execute передает обработку комманды одной из перегрузок метода When подходящей по сигнатуре. Реализация метода Execute очень простая с использованием динамиков, и не надо бежать рефлексией по всем методам.
public void Execute(ICommand cmd)
{
// pass command to a specific method named when
// that can handle the command
((dynamic)this).When((dynamic)cmd);
}
Начнем с комманды создания — CreateCustomer.
[Serializable]
public sealed class CreateCustomer : ICommand
{
public CustomerId Id { get; set; }
public string Name { get; set; }
public Currency Currency { get; set; }
public override string ToString()
{
return string.Format("Create {0} named '{1}' with {2}", Id, Name, Currency);
}
}
В реальном проекте у вас между UI и ApplicationService скорее всего будет message queue, ну а для примера Ринат ограничился прямой передачей комманды объекту апликейшен сервиса (см. class ApplicationServer).
После того как команда CreateCustomer попадает в метод Excecute, она перенаправляется в метод When.
public void When(CreateCustomer c)
{
Update(c.Id, a => a.Create(c.Id,c.Name, c.Currency, _pricingService, DateTime.UtcNow));
}
В метод Update мы передаем айдишку аггрегата и экшен который вызывает метод изменения состояния аггрегата. Вообще на мой згляд лучше не делать метод Create у аггрегата, а создать еще один конструктор, так как вызов метода Create в данном контексте выглядит немного странным. Мы вроде и создаем агрегат, но почему-то метод Create передаем как метод изменения состояния. С конструктором так бы уже не получилось.
Вернемся к методу Update, задача у него следующая — 1) загрузить все события для текущего аггрегата, 2) создать аггрегат и восстановить его состояние использую загруженные события, 3) выполнить переданное действие Action execute над аггрегатом и 4) сохранить новые события если они есть.
void Update(CustomerId id, Action<Customer> execute)
{
// Load event stream from the store
EventStream stream = _eventStore.LoadEventStream(id);
// create new Customer aggregate from the history
Customer customer = new Customer(stream.Events);
// execute delegated action
execute(customer);
// append resulting changes to the stream
_eventStore.AppendToStream(id, stream.Version, customer.Changes);
}
Восстановление состояния
В примере, который я показывал в прошлой статье состояние аггрегата хранилось в виде private полей в классе аггрегата. Это не совсем удобно если вы захотите добавить snapshot’ы, т.к. придется как-то высасывать состояние какждый раз или использовать рефлексию. У Рината гораздо более удобный подход — для состояние отдельный класс CustomerState, что позволяет вынести методы проекции из аггрегата и гораздо проще сохранять и загружать snapshot’ы, хоть их и нет в примере.
Как видно, в конструктор агрегату передается список событий этого же аггрегата, как не сложно догадаться, для того чтобы он восстановил своё состояние.
Агрегат в свою очередь делегирует восстановление состояние классу CustomerState.
/// <summary>
/// Aggregate state, which is separate from this class in order to ensure,
/// that we modify it ONLY by passing events.
/// </summary>
readonly CustomerState _state;
public Customer(IEnumerable<IEvent> events)
{
_state = new CustomerState(events);
}
Приведу код всего класса CustomerState, лишь уберу несколько методов проекции When.
/// <summary>
/// This is the state of the customer aggregate.
/// It can be mutated only by passing events to it.
/// </summary>
public class CustomerState
{
public string Name { get; private set; }
public bool Created { get; private set; }
public CustomerId Id { get; private set; }
public bool ConsumptionLocked { get; private set; }
public bool ManualBilling { get; private set; }
public Currency Currency { get; private set; }
public CurrencyAmount Balance { get; private set; }
public int MaxTransactionId { get; private set; }
public CustomerState(IEnumerable<IEvent> events)
{
foreach (var e in events)
{
Mutate(e);
}
}
...
public void When(CustomerCreated e)
{
Created = true;
Name = e.Name;
Id = e.Id;
Currency = e.Currency;
Balance = new CurrencyAmount(0, e.Currency);
}
public void When(CustomerRenamed e)
{
Name = e.Name;
}
public void Mutate(IEvent e)
{
// .NET magic to call one of the 'When' handlers with
// matching signature
((dynamic) this).When((dynamic)e);
}
}
Как видно в конструкторе мы форычем бежим по переданным ивентам, и передаем их в метод Mutate, который в свою очередь напрявляет их дальше в подходящий метод проекции.
Можно заметить что все проперти имеют private setter метод, что гарантирует что состояние извне мы можем изменить только передав соответствующее событие.
Состояние восстановили, теперь можно и попробовать его изменить. Вернемся немного назад к методу изменения состояние. Так как я начал разбираться с коммандой CreateCustomer, то и у агрегата посмотрим метод Create.
public void Create(CustomerId id, string name, Currency currency, IPricingService service, DateTime utc)
{
if (_state.Created)
throw new InvalidOperationException("Customer was already created");
Apply(new CustomerCreated
{
Created = utc,
Name = name,
Id = id,
Currency = currency
});
var bonus = service.GetWelcomeBonus(currency);
if (bonus.Amount > 0)
AddPayment("Welcome bonus", bonus, utc);
}
Здесь самое место сделать проверку наших бизнесс правил, так как у нас есть доступ к актуальному состоянию агрегата. У нас есть бизнесс правило что Customer может быть создан лишь один раз ( врочем еще есть и техническое ограничение), поэтому при попытки вызвать Create на уже созданном агрегате мы бросаем эксепшен.
Если же все бизнесс правила удовлетворены то в метод Apply передаем событие CustomerCreated. Метод Apply очень прост, лишь передает событие объекту состояния и добавляет его в список текущих изменений.
public readonly IList<IEvent> Changes = new List<IEvent>();
readonly CustomerState _state;
void Apply(IEvent e)
{
// pass each event to modify current in-memory state
_state.Mutate(e);
// append event to change list for further persistence
Changes.Add(e);
}
В большенству случаев на одну операцию с аггрегатом приходится одно событие. Но вот как раз в случае с методом Create событий может быть два.
После того как мы передали в метод Apply событие CustomerCreate, мы может добавить текущему кастомеру приветственный бонус, если удовлетворяетя бизнесс правило что сумма бонуса должена быть больше нуля. В таком случае вызывается метод агрегата AddPayment, который не сореджит никаких проверок а просто инициирует событие CustomerPaymentAdded.
public void AddPayment(string name, CurrencyAmount amount, DateTime utc)
{
Apply(new CustomerPaymentAdded()
{
Id = _state.Id,
Payment = amount,
NewBalance = _state.Balance + amount,
PaymentName = name,
Transaction = _state.MaxTransactionId + 1,
TimeUtc = utc
});
}
Теперь предстоит сохранить новые события и опубликовать их в Read model. Подозреваю что это делает следующая строчка.
// append resulting changes to the stream
_eventStore.AppendToStream(id, stream.Version, customer.Changes);
Убедимся…
public void AppendToStream(IIdentity id, long originalVersion, ICollection<IEvent> events)
{
if (events.Count == 0)
return;
var name = IdentityToString(id);
var data = SerializeEvent(events.ToArray());
try
{
_appendOnlyStore.Append(name, data, originalVersion);
}
catch(AppendOnlyStoreConcurrencyException e)
{
// load server events
var server = LoadEventStream(id, 0, int.MaxValue);
// throw a real problem
throw OptimisticConcurrencyException.Create(server.Version, e.ExpectedStreamVersion, id, server.Events);
}
// technically there should be a parallel process that queries new changes
// from the event store and sends them via messages (avoiding 2PC problem).
// however, for demo purposes, we'll just send them to the console from here
Console.ForegroundColor = ConsoleColor.DarkGreen;
foreach (var @event in events)
{
Console.WriteLine(" {0} r{1} Event: {2}", id,originalVersion, @event);
}
Console.ForegroundColor = ConsoleColor.DarkGray;
}
Ну почти угодал. События сериализуются и сохраняются в append only store (удалять и изменять их мы то не собираемся). Но вот вместо того чтобы отправить их в read-model (на презентационный уровень), Ринат просто выводит их на консоль.
Впрочем для примера этого достаточно.
Если вы хотите посмотреть как это все будет работать с очередью сообщений можете посмотреть пример на гитхабе из моей предыдущей статьи. Я его немного изменил и тоже внес часть инфраструктуры Event Sourcing’a в солюшен. На этом примере я хочу показать как можно добавить снэпшоты.
Snapshots
Снэпшоты нужны чтобы делать промежуточные снимки состояния аггрегата. Это позволяем не закгружать весь поток событий агрегата, а лишь только те которые произошли после того как мы сделали последний снэпшот.
Итак посмотрим на реализацию.
В классе UserCommandHandler есть метод Update очень похожий на тот что у Рината, но у меня он еще сохраняет и загружает снэпшоты. Снэпшоты делаем через каждые 100 версий.
private const int SnapshotInterval = 100;
Сначала поднимаем из репозитория снэпшот, потом загружаем поток событий которые произошли после того как мы сделали этот снэпшот. Затем передаем все это конструктору агрегата.
private void Update(string userId, Action<UserAR> updateAction)
{
var snapshot = _snapshotRepository.Load(userId);
var startVersion = snapshot == null ? 0 : snapshot.StreamVersion + 1;
var stream = _eventStore.OpenStream(userId, startVersion, int.MaxValue);
var user = new UserAR(snapshot, stream);
updateAction(user);
var originalVersion = stream.GetVersion();
_eventStore.AppendToStream(userId, originalVersion, user.Changes);
var newVersion = originalVersion + 1;
if (newVersion % SnapshotInterval == 0)
{
_snapshotRepository.Save(new Snapshot(userId, newVersion,user.State));
}
}
В конструкторе пытаемся достать состояние из снэпшота или создаем новое состояние если нету снэпшота. На полученном состоянии проигрываем все недостающие события, и в итоге получаем актуальную версию агрегата.
public UserAR(Snapshot snapshot, TransitionStream stream)
{
_state = snapshot != null ? (UserState) snapshot.Payload : new UserState();
foreach (var transition in stream.Read())
{
foreach (var @event in transition.Events)
{
_state.Mutate((IEvent) @event.Data);
}
}
}
После манипуляций с агрегатом, проверяем кратна ли версия агрегата интервалу через который мы делаем снэпшоты, и если это так, то сохраняем новый снэпшот в репозиторий. Чтобы получить из UserCommandHandler’a состояние агрегата пришлось сделать для него internal getter свойство State.
Вот и все, теперь у нас есть снэпшоты, что позволило намного быстрее востанавливать состояние агрегата если у него очень много событий.
Feedback
Если вам интересна тема CQRS+ES пожалуйста не стесняйтесь задавать вопросы в комментариях. Так же можете писать мне в скайп если нету ака на хабре. Недавно мне в скайп постучался один товарищ из Челябинска и благодаря его вопросам у меня появилось много идей для следующей статьи. Так как в моем распоряжении сейчас побольше свободного времени то я планирую писать их почаще.
Автор: AlexShkor