Звезда — на сегодняшний день самая распространенная топология компьютерных сетей. Такая структура обладает рядом преимуществ: легкостью масштабирования, надежностью (выход из строя одной машины не сказывается на других) и простота администрирования. Конечно это решение из физического уровня давно реализовано и на программном уровне. Тем не менее, представляю на суд читателей свою версию инструментария .Net для построения распределенных систем с топологией звезда.
Системы, построенные на основе такой топологии могут быть структурно организованы, например как на изображении ниже.
В этой статье я опишу создание минимального инструментария, без многих полезных фич, которые некогда использовал в своих разработках на базе представленной архитектуры. Однако этого вполне достаточно для построения реально полезных систем.
Methanum
Проект получил кодовое название Methanum исключительно из-за структурной схожести топологии с молекулой метана :). Центральный узел выполняющий роль комуникатора назван «Core». К ядру подключаются остальные приложения сети и подписываются на события. Так же каждое приложение сети может испускать события. Таким образом, через события осуществляется обмен данными в сети. События — это сериализуемый класс Event, который может содержать произвольные данные. Event минимально содержит 2 поля — строковое поле Destination, классифицирующее событие, и поле Data, содержащее словарь key value. Key — это строка, имя аргумента, Value — имеет тип object и может содержать примитивы (int, double, bool…). Для структур приходится несколько помочь системе сериализовать их.
Для начала создадим проект “methanum” библиотеки классов на C# и по ходу текста будем добавлять в него файлы.
Event
Как уже было сказано данные передаются посредством событий. Событие — это класс, включающий поле с данными Data и поле для идентификации события Destination. Также я оставил еще два поля: Id — уникальный идентификатор и DataTime содержащее время создания события. Эти дополнительные поля нужны исключительно для удобства, например, для разбора логов. Класс события так же содержит некоторое количество методов, призванных упростить жизнь программиста, их назначение думаю будет понятно из кода и в дополнительных пояснениях не нуждается.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Web.Script.Serialization;
namespace methanum
{
[DataContract]
[KnownType(typeof(List<DateTime>))]
public class Event {
/// <summary>
/// A unique id of the event
/// </summary>
[DataMember]
public Guid Id { set; get; }
/// <summary>
/// DateTime of event creation
/// </summary>
[DataMember]
public DateTime DataTime { get; set; }
/// <summary>
/// Target
/// </summary>
[DataMember]
public string Destination { get; set; }
/// <summary>
/// Data container
/// </summary>
[DataMember]
public Dictionary<string, object> Data { get; set; }
public Event() {
Init();
}
public Event(string destination) {
Init();
Destination = destination;
}
private void Init() {
Data = new Dictionary<string, object>();
Id = Guid.NewGuid();
DataTime = DateTime.Now;
}
public override string ToString() {
var properties = GetType().GetProperties();
var sb = new StringBuilder();
sb.AppendFormat("[{0}]", GetType().Name);
foreach (var property in properties) {
if (property.Name == "Data") {
sb.Append("nData = ");
string s = string.Format(" {0}", '{');
s = Data.Keys.Aggregate(s,
(current, key) => current + String.Format("n {0}t:{1}", key, Data[key]));
sb.AppendFormat("{0}n{1}", s, '}');
}
else sb.AppendFormat("n{0} = {1};", property.Name, property.GetValue(this, null));
}
return sb.ToString();
}
public void SetData(string key, object obj) {
Data[key] = obj;
}
public object GetObj(string key) {
return !Data.ContainsKey(key) ? null : Data[key];
}
public double GetDbl(string key) {
return !Data.ContainsKey(key) ? Double.NaN : Convert.ToDouble(Data[key]);
}
public int GetInt(string key) {
return !Data.ContainsKey(key) ? Int32.MinValue : Convert.ToInt32(Data[key]);
}
public bool GetBool(string key) {
return Data.ContainsKey(key) && Convert.ToBoolean(Data[key]);
}
public string GetStr(string key) {
return !Data.ContainsKey(key) ? null : Convert.ToString(Data[key]);
}
public void SetCustomData(string key, object value) {
var serializer = new JavaScriptSerializer();
var str = serializer.Serialize(value);
SetData(key, str);
}
public object GetCustom(string key, Type valueType) {
if (!Data.ContainsKey(key))
return null;
if (Data[key].GetType() != typeof(string))
return null;
var serializer = new JavaScriptSerializer();
var str = (string) Data[key];
var obj = serializer.Deserialize(str, valueType);
return obj;
}
}
}
Gate
Суть ядра заключается в реализации интерфейса, назовем его «интерфейс ворот». Основная цель ворот — предоставление функционала для регистрации клиентов и асинхронной посылки событий в обоих направлениях (от приложения к ядру и обратно).
using System.ServiceModel;
namespace methanum {
[ServiceContract(CallbackContract = typeof(IListener))]
public interface IGate {
[OperationContract]
void Subscribe();
[OperationContract]
void KillConnection();
[OperationContract]
void Fire(Event evt);
}
}
Контракт данных у нас дуплексный, в прямом направлении — от приложения к ядру — стреляем события через IGate вызовом метода void Fire(Event evt). Обратный вызов — от ядра к приложению — происходит через IListener интерфейс, о котором будет позже.
Ворота работают по следующему принципу. Когда стартует ядро, создается объект класса Gate, унаследованного от интерфейса IGate. В Gate имеется статическое поле _subscribers, в котором хранятся все активные подключения к ядру. При вызове метода Subscribe(), добавляем текущее подключение, если оно еще не было добавлено. Метод KillConnection() служит для удаления текущего подключения. Самым интересным является метод Fire(Event evt), но и в нем нет ни чего сложного. Половину метода докапываемся до Ip адреса и порта, только чтобы вывести информацию в консоль. Я оставил эту часть кода исключительно для того, чтобы продемонстрировать, как получить доступ к адресу соединения, например, чтобы фильтровать или логировать события по разрешенным адресам. Основная работа этого метода заключается в обходе всех существующих подключений и асинхронного вызова метода Receive у их слушателей IListener. Если обнаруживаем закрытое подключение, то его незамедлительно удаляем из списка активных подключений.
using System;
using System.Collections.Generic;
using System.ServiceModel;
using System.ServiceModel.Channels;
namespace methanum {
public class Gate : IGate {
private static List<OperationContext> _subscribers;
public Gate() {
if (_subscribers == null)
_subscribers = new List<OperationContext>();
}
public void Subscribe() {
var oc = OperationContext.Current;
if (!_subscribers.Exists(c => c.SessionId == oc.SessionId)) {
_subscribers.Add(oc);
Console.WriteLine("(subscribe "{0}")", oc.SessionId);
}
}
public void KillConnection() {
var oc = OperationContext.Current;
_subscribers.RemoveAll(c => c.SessionId == oc.SessionId);
Console.WriteLine("(kill "{0}")", oc.SessionId);
}
public void Fire(Event evt) {
var currentOperationContext = OperationContext.Current;
var remoteEndpointMessageProperty =
currentOperationContext.IncomingMessageProperties[RemoteEndpointMessageProperty.Name] as
RemoteEndpointMessageProperty;
var ip = "";
var port = 0;
if (remoteEndpointMessageProperty != null) {
ip = remoteEndpointMessageProperty.Address;
port = remoteEndpointMessageProperty.Port;
}
Console.WriteLine("(Fire (event . "{0}") (from . "{1}:{2}") (subscribers . {3}))", evt.Id, ip, port, _subscribers.Count);
for (var i = _subscribers.Count - 1; i >= 0; i--) {
var oc = _subscribers[i];
if (oc.Channel.State == CommunicationState.Opened) {
var channel = oc.GetCallbackChannel<IListener>();
try {
((DelegateReceive) (channel.Receive)).BeginInvoke(evt, null, null);
}
catch (Exception e) {
Console.WriteLine(e.Message);
}
}
else {
_subscribers.RemoveAt(i);
Console.WriteLine("(dead . "{0}")", oc.SessionId);
}
}
}
}
}
Listener
Чтобы передать сообщение от ядра к клиенту достаточно одного метода Receive, который определен в интерфейсе IListener.
using System.ServiceModel;
namespace methanum {
public delegate void DelegateReceive(Event evt);
interface IListener {
[OperationContract(IsOneWay = true)]
void Receive(Event evt);
}
}
От интерфейса IListener наследуется класс Connector, который реализует всю логику взаимодействия клиентского приложения и ядра. При создании экземпляра класса создается подключение к ядру, через которое передаются и принимаются сообщения. Отправка и принятие сообщений происходит в отдельных потоках, чтобы исключить блокировку приложений и ядра. Чтобы различать события, в них есть поле Destination. Фильтровать события при помощи if-then-else или switch-case конструкций неудобно, поэтому был реализован механизм, позволяющий сопоставить каждому интересующему событию в соответствие обработчик. Такое сопоставление хранится в словаре Dictionary<string, CbHandler> _handlers;. Когда событие принято, происходит поиск в словаре и, если ключ найден, вызывается соответствующий обработчик.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.Threading;
namespace methanum {
public delegate void CbHandler(Event evt);
public class Connector : IListener {
private Dictionary<string, CbHandler> _handlers;
private NetTcpBinding _binding;
private EndpointAddress _endpointToAddress;
private InstanceContext _instance;
private DuplexChannelFactory<IGate> _channelFactory;
private IGate _channel;
private Thread _fireThread;
private List<Event> _eventQueue;
public event CbHandler ReceiveEvent;
private bool _isSubscribed;
private object _channelSync = new object();
protected virtual void OnReceive(Event evt) {
CbHandler handler = ReceiveEvent;
if (handler != null) handler.BeginInvoke(evt, null, null);
}
//localhost:2255
public Connector(string ipAddress) {
init(ipAddress);
}
private void init(string ipAddress) {
_handlers = new Dictionary<string, CbHandler>();
_binding = new NetTcpBinding();
_endpointToAddress = new EndpointAddress(string.Format("net.tcp://{0}", ipAddress));
_instance = new InstanceContext(this);
Conect();
_eventQueue = new List<Event>();
_fireThread = new Thread(FireProc);
_fireThread.IsBackground = true;
_fireThread.Start();
}
private void Conect() {
_isSubscribed = false;
while (!_isSubscribed) {
try {
_channelFactory = new DuplexChannelFactory<IGate>(_instance, _binding, _endpointToAddress);
_channel = _channelFactory.CreateChannel();
_channel.Subscribe();
_isSubscribed = true;
}
catch (Exception e) {
if (!(e is EndpointNotFoundException)) throw e;
Thread.Sleep(1000);
}
}
}
private void ReConect() {
lock (_channelSync) {
try {
_channel.KillConnection();
}
catch (Exception e) {
Console.WriteLine("(ReConect-exception "{0}"", e.Message);
}
Conect();
}
}
public void Fire(Event evt) {
lock (_eventQueue) {
_eventQueue.Add(evt);
}
}
private void FireProc() {
while (true) {
var isHasEventsToFire = false;
lock (_eventQueue) {
isHasEventsToFire = _eventQueue.Any();
}
if (_isSubscribed && isHasEventsToFire) {
Event evt;
lock (_eventQueue) {
evt = _eventQueue.First();
}
try {
lock (_eventQueue) {
_eventQueue.Remove(evt);
}
_channel.Fire(evt);
}
catch (Exception) {
if (_isSubscribed)
_isSubscribed = false;
ReConect();
}
} else Thread.Sleep(10);
}
}
public void SetHandler(string destination, CbHandler handler) {
_handlers[destination] = handler;
}
public void DeleteHandler(string destination) {
if(_handlers.ContainsKey(destination)) _handlers.Remove(destination);
}
public void Receive(Event evt) {
if (_handlers.ContainsKey(evt.Destination)) {
_handlers[evt.Destination].BeginInvoke(evt, null, null);
}
OnReceive(evt);
}
static public void HoldProcess() {
var processName = Process.GetCurrentProcess().ProcessName;
var defColor = Console.ForegroundColor;
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("The {0} is ready", processName);
Console.WriteLine("Press <Enter> to terminate {0}", processName);
Console.ForegroundColor = defColor;
Console.ReadLine();
}
}
}
Для удобства создадим еще один небольшой класс, стартующий сервис.
using System;
using System.ServiceModel;
namespace methanum {
public class SrvRunner {
private ServiceHost _sHost;
public void Start(int port) {
var uris = new[] { new Uri(string.Format("net.tcp://0.0.0.0:{0}", port)) };
_sHost = new ServiceHost(typeof (Gate), uris);
_sHost.Open();
foreach (var uri2 in _sHost.BaseAddresses) {
Console.WriteLine("Start on: {0}", uri2.ToString());
}
}
public void Stop() {
_sHost.Close();
}
}
}
Core
Мы реализовали все классы, необходимые для комуникации наших приложений. Осталось создать ядро, к которому будут подключаться наши приложения. Для этого в решении создаем проект “Core” консольного приложения, к нему подключаем сборку methanum. Вообще, мы уже все написали, осталось только запустить.
using System;
using System.Linq;
using methanum;
namespace Core {
internal class CoreMain {
private static void Main(string[] args) {
int port = 0;
if ((!args.Any()) || (!int.TryParse(args[0], out port))) {
Console.WriteLine("Usage:");
Console.WriteLine("Core.exe port");
Environment.Exit(1);
}
try {
var coreSrv = new SrvRunner();
coreSrv.Start(port);
Console.WriteLine("The Core is ready.");
Console.WriteLine("Press <ENTER> to terminate Core.");
Console.ReadLine();
coreSrv.Stop();
}
catch (Exception e) {
Console.WriteLine(e.Message);
}
}
}
}
Пример использования
Для демострации создадим примитивный месенджер: создаем еще одно консольное приложение, добавляем ссылку на сборку methanum и вставляем содержимое файла Program.cs.
using System;
using System.Linq;
using methanum;
namespace ClentExamle {
class Program {
static void Main(string[] args) {
if ((!args.Any())) {
Console.WriteLine("Usage:");
Console.WriteLine("ClentExample.exe coreAddress:port");
Environment.Exit(1);
}
var userName = "";
while (String.IsNullOrWhiteSpace(userName)) {
Console.WriteLine("Please write user name:");
userName = Console.ReadLine();
}
try {
var maingate = new Connector(args[0]);
maingate.SetHandler("message", MsgHandler);
Console.WriteLine("Hello {0}, now you can send messages", userName);
while (true) {
var msg = Console.ReadLine();
var evt = new Event("message");
evt.SetData("name", userName);
evt.SetData("text", msg);
maingate.Fire(evt);
}
}
catch (Exception e) {
Console.WriteLine(e.Message);
}
}
static private void MsgHandler(Event evt) {
Console.WriteLine("[{0}] >> {1}", evt.GetStr("name"), evt.GetStr("text"));
}
}
}
Теперь запускаем приложение Core.exe указав в командной строке порт, например “Core 2255”. Затем стартуем несколько экземпляров ClentExample.exe командой “ClentExample localhost:2255”. Приложения предлагают ввести имя пользователя, после чего подключаются к ядру. В результате, получается широковещательный примитивный чат: каждое новое сообщение посылается вызовом maingate.Fire(evt), принимается в обработчике MsgHandler(Event evt).
Полный исходник доступен на гихабе methanum.
Автор: Тинькофф Банк