Лет 8 назад я начал работать в команде, которая разрабатывала один сервис. Интерфейс сервиса был достаточно прост, всего 4 метода, и выполнял он одну единственную задачу. В течение всего этого времени код постоянно изменялся: реализовались новые бизнес-правила и ограничения, добавлялась версионность. В один прекрасный момент, front-end‘у понадобился очень небольшой функционал, который был «зарыт» глубоко в сервисе. Реализация необходимой функции была разработана в виде компоненты и не представляло никаких проблем дать к ней доступ из сервиса через дополнительный метод… Кроме одной: нарушалась логическая связанность методов сервиса, то есть его «внутренности» начали становиться «внешностями».
Проблему можно было бы решить, если преобразовать все эти небольшие внутренние компоненты, к которым потребовался доступ извне, в отдельные сервисы. В таком случае, front-end мог бы получить доступ к их функционалу; основной же сервис стал бы более компактным и его роль сводилась к оркестровке вызовов.
Мы использовали WCF для построения сервисов. Разворачивать сервис в 50 строчек кода на WCF, как минимум на 3-4 серверах, с load-balancer‘ом, новыми URL‘ами и прочими наворотами, казалось не очень хорошей идеей. А хотелось какой-то легкости, перспективы…
Несколько лет спустя я принимал участие в другом проекте на Workflow Foundation. Глядя на то, что получалось в XAML-редакторе, я подумал: «А почему-бы не представить весь workflow, как последовательность сообщений»?
Kinoпробы
Поиск по имеющимся решениям, честно говоря, я не делал. На тот момент (4-5 лет назад) об „Orleans“ было мало что известно, а об Akka я узнал уже после начала велосипедостроения. С одной стороны, это плохо, недостойно профессионального разработчика и все такое. С другой стороны, могло получиться что-то новенькое… Насколько хорошо или плохо все получилось, может судить уважаемый читатель.
Итак, я занялся созданием kino: actor-like communication framework на NetMQ. Суффикс «-like» потому, что классические актеры имеют иерархическую организацию, супервизоров, они stateful и, вообще, целая математическая модель там у них… Тут все проще, но, тем не менее, актеры будут и у нас.
Вкратце, что здесь к чему
Основным средством общения в kino является сообщение. Каждое сообщение имеет версию и тип, которые используются для поиска соответствующего обработчика. Есть небольшое отклонение от правила, но пока не будем об этом.
Актеры (Actors) — основные потребители и производители сообщений. Actor объявляет свой интерфейс, указывая тип и версию сообщения, которые он может получать. Есть еще один участник массовки, MessageHub, который также может получать и отправлять сообщения. Однако, между ними есть определенные различия. Actor нужно рассматривать, как сервис: он может ответить только при получении входящего сообщения. MessageHub – это клиент, который может отправить сообщение и (попытаться) получить ответное сообщения, если необходимо. Итак, чаще всего, начальное сообщение отправляется через MessageHub и обрабатывается одним или несколькими Actors.
Для поиска адресатов сообщений необходим MessageRouter. Он хранит таблицу маршрутизации — соответствие версии (Version) и типа сообщения (Identity) со списком Actors, которые его могут обработать. Для одного процесса достаточно одного MessageRouter‘а.
Для выхода за рамки одного процесса/хоста нам необходимо получить знание о внешнем мире, то есть о других MessageRouter‘ах и их таблицах маршрутизации. Источником для получения такого знания является Rendezvous сервис. Это – единственный well-known адрес, который должен быть сконфигурирован для приложения на базе kino. Rendezvous принимает от всех и раздает всем подключенным MessageRouter‘ам информацию о добавлении новых и удалении несуществующих маршрутов, ping‘ует установленные подключения. Rendezvous сервис формирует единую сеть компонентов kino.
Тоже, но более детально
1. Message
Так выглядит типичное сообщение, которое можно отправить гулять по сети kino:
public class MyMessage : Payload
{
private static readonly byte[] MessageIdentity = "NAMESPACE.MYMESSAGE".GetBytes();
private static readonly byte[] MessageVersion = "1.0".GetBytes();
// Здесь идут свойства сообщения, т.е. то, что мы в итоге хотим передать
public override byte[] Version => MessageVersion;
public override byte[] Identity => MessageIdentity;
}
Поддерживается 3 способа распределения (Distribution Pattern) сообщений: unicast, broadcast и direct. В первом случае сообщение отправляется только одному обработчику, зарегистрированному в сети. Во втором – всем.
IPayload payload = new MyMessage();
IMessage message = Message.Create(payload, DistributionPattern.Broadcast);
В случае direct distribution, который может быть особо полезен при тестировании, сообщение отправляется конкретному MessageRouter‘у:
IMessage message = Message.CreateFlowStartMessage(new MyMessage());
message.SetReceiverNode(receiverIdentity);
// Теперь можно отправлять сообщение
Добраться до данных в полученном сообщении можно следующим образом:
MyMessage payload = message.GetPayload<MyMessage>();
2. Actors
Для создания своего актера необходимо унаследовать класс от Actor и реализовать в нем хотя бы один метод-обработчик сообщения:
public class MyMessageProcessor : Actor
{
[MessageHandlerDefinition(typeof (MyMessage))]
public async Task<IActorResult> MyMessageHandler(IMessage message)
{
// тело метода
}
[MessageHandlerDefinition(typeof (OtherMessage))]
public Task<IActorResult> OtherMessageHandler(IMessage message)
{
// тело метода
}
}
```cs
Все актеры по умолчанию регистрируются глобально, то есть доступны во всей сети *kino*. Если вы хотите обрабатывать сообщения только в локальном процессе, можно объявить регистрацию обработчиков локальной:
```cs
public class LocalMessageProcessor : Actor
{
// Данная регистрация доступна только локально
[MessageHandlerDefinition(typeof (LocalMessage), true)]
public async Task<IActorResult> MyMessageHandler(IMessage message)
{
// тело метода
}
}
Фреймворк гарантирует, что метод-обработчик актера получит сообщения только того типа, который задекларирован. Возвращаемый же результат может представлять из себя одно или несколько сообщений любых типов, причем, с разными distribution pattern. То есть, мы можем послать ответ начальному отправителю и одновременно сообщить всем остальным еще о чем-нибудь:
public class MyMessageProcessor : Actor
{
[MessageHandlerDefinition(typeof (MyMessage))]
public async Task<IActorResult> MyMessageHandler(IMessage message)
{
MyMessage payload = message.GetPayload<MyMessage>();
var result = await UpdateDb(payload);
IMessage response = Message.Create(new ResponseMessage(result));
IMessage notifyRequest = Message.Create(new NotifyMessage(result), DistributionPattern.Broadcast);
return new ActionResult(response, notifyRequest);
}
}
3. ActorHost
Об ActorHost мы еще не говорили. Это компонент, который выполняет несколько функций:
- хранит ссылки на методы-обработчики всех зарегистрированных у него Actors
- регистрирует обработчики в MessageRouter
- принимает сообщения на обработку от MessageRouter к Actors и отправляет ответы обратно в MessageRouter.
Вызов методов-обработчиков в ActorHost происходит в одном потоке (исключение составляют асинхронные методы). Поэтому, ActorHost не поддерживает множественные регистрации обработчиков одного и того же сообщения. Если же необходимо масштабирование одного и того же типа Actor в рамках одного процесса, требуется создание нового экземпляра ActorHost для каждого из них. Все эти сложности по выбору и созданию ActorHosts берет на себя ActorHostManager:
// Создаем первый экземпляр актера MyActor
IActor actor = new MyActor();
// По умолчанию, выбирается первый доступный ActorHost
actorHostManager.AssignActor(actor);
// Нам показалось, что одного мало
actor = new MyActor();
// Указываем, что нам обязательно нужен второй экземпляр MyActor
actorHostManager.AssignActor(actor, ActorHostInstancePolicy.AlwaysCreateNew);
4. MessageHub
Вернемся немного назад, с чего все началось. А началось с того, что появилась необходимость разнести код из одного WCF-сервиса в несколько доступных по сети компонент. В результате, вместо вызовов сотни методов в одном процессе у нас получился некий поток сообщений (message flow), которые, вдобавок, путешествуют по разным серверам. Тем не менее, и функционал и поведение для конечного пользователя сервиса должны, в идеале, оставаться прежними. То есть, если раньше клиент вызывал метод сервиса синхронно и ожидал получение ответа, то со всем этим kino паттерн работы клиента не должен поменяться кардинальным образом. Необходимо из всего этого потока сообщений определить, что является ответом клиенту и доставить его обратно.
MessageHub как раз призван решить эту задачу. С его помощью можно отправить сообщение в сеть kino, не дожидаясь ответа:
IPayload payload = message.GetPayload<MyMessage>();
IMessage message = Message.CreateFlowStartMessage(payload);
messageHub.SendOneWay(message);
А можно так же указать, что отправитель ожидает определенный ответ:
// Создаем сообщение, представляющее запрос
IMessage request = Message.CreateFlowStartMessage(new StartMessage());
// Говорим, что мы заинтересованы в получении ответа определенного типа
ICallbackPoint callbackPoint = CallbackPoint.Create<ResultMessage>();
// Теперь отправляем сообщение и ожидаем результат
using(IPromise promise = messageHub.EnqueueRequest(request, callbackPoint))
{
if(promise.GetResponse().Wait(timeout))
{
// Обрабатываем полученный результат
ResultMessage result = promise.GetResponse().Result.GetPayload<ResultMessage>();
}
else
{
// Попробуем снова…
}
}
5. MessageRouter
MessageRouter представляет собой узел в сети kino. К нему подключаются другие компоненты, ActorHosts и MessageHubs, для обмена сообщениями. В свою очередь, MessageRouters находят подобных себе и подключаются друг к другу с помощью Rendezvous сервиса, формируя таким образом сеть kino.
В качестве транспорта в kino используется библиотека NetMQ. Она, практически, вбита во фреймворк гвоздями и использовать другой транспорт не планировалось.
Итак, маршрутизация сообщений. Она осуществляется по следующим алгоритмам:
• Unicast-сообщение:
НАЙТИ локально зарегистрированный ActorHost или MessageHub, которые могут обработать сообщение
ЕСЛИ найден
Отправить сообщение на обработку
ИНАЧЕ
НАЙТИ MessageRouter в сети, который может обработать сообщение
ЕСЛИ найден
Отправить сообщение на обработку
ИНАЧЕ
Сообщение не обработано!
• Broadcast-сообщение:
НАЙТИ все локально зарегистрированные ActorHosts и MessageHubs, которые могут обработать сообщение
ЕСЛИ найдены
Отправить сообщение на обработку
ЕСЛИ broadcast-сообщение отправлено из локально зарегистрированного Actor
НАЙТИ все MessageRouter в сети, которые могут обработать сообщение
ЕСЛИ найдены
Отправить сообщение на обработку
ЕСЛИ не найдено ни одного обработчика локально или удаленно
Сообщение не обработано!
• Direct-сообщение:
ЕСЛИ unicast-сообщение
НАЙТИ MessageRouter в сети, указанный в сообщении, как получатель (ReceiverNode), который может обработать сообщение
ЕСЛИ найден
Отправить сообщение на обработку
ИНАЧЕ
Сообщение не обработано!
ИНАЧЕ
<маршрутизация broadcast-сообщения>
6. Rendezvous
Rendezvous сервис – единственный well-known сервис, адрес которого должен быть сконфигурирован для всех узлов одной сети kino. Он выполняет следующие функции:
- broadcast-рассылка сообщений об изменениях в маршрутизации: добавление новых и удаление недействительных маршрутов,
- broadcast-рассылка PING сообщений для мониторинга подключенных узлов,
- broadcast-рассылка ответных PONG сообщений от подключенных узлов.
При необходимости, Rendezvous сервис можно установить на кластер серверов. Выбранный на основании консенсуса лидер отвечает за все вышеперечисленные функции. В случае «падения» кластера, сеть kino продолжит работу. Однако, информация об изменениях в маршрутизации будет недоступна. Когда работа Rendezvous сервиса будет восстановлена, узлы получат обновление конфигурации сети.
Открытые вопросы
- Ну, собственно говоря, увидеть что-то в продакшине. Пока до этого не дошло…
- Как работать с сообщениями разного wire-формата в одной сети
- Возможные проблемы при большом количестве подключений к Rendezvous сервису, пакетная обработка PONG-сообщений
- Объединение нескольких сетей kino, то есть маршрутизация между узлами, подключенными к разным Rendezvous серверам/кластерам
Проект kino на Github'е: https://github.com/iiwaasnet/kino
Wiki: https://github.com/iiwaasnet/kino/wiki
Автор: aosja