Всем привет.
В этой статье рассмотрим принцип многопоточного TCP сервера приложений в котором реализуем синхронные и асинхронные вызовы, а также разграничение доступа к процедурам и сжатие данных.
С чего все начиналось.
Все началось с непростого выбора с чего начать реализацию взаимодействия между собой нескольких разных приложений работающих через сеть интернет. Казалось-бы WCF способен решить такую задачу, но, к сожалению, он не лишен минусов и некоторых проблем, а его принцип работы сильно сказывается на скорости передачи данных. Нужно было простое решение и в то же время достаточно функциональное.
Начнем наш проект с библиотеки классов, в которой создадим интерфейсы процедур и функций для приложений различных типов.
public interface ICommon
{
string[] GetAvailableUsers();
void ChangePrivileges(string Login, string password);
}
public interface IDog
{
bool TryFindObject(out object obj);
int Bark(int nTimes);
}
public interface ICat
{
void CutTheText(ref string Text);
}
Здесь мы описали сигнатуру процедур для трех разных уровней доступа. Как можно догадаться, Common будет содержать процедуры доступные для всех типов удаленных клиентов. Dog и Cat здесь, это наши два типа удаленных клиентов, процедуры каждого из них будут доступны только им самим.
Здесь же создадим класс, с помощью которого мы будем обмениваться данными между сервером и клиентами.
[Serializable]
public class Message
{
public Message(string Command, object[] Parameters)
{
this.Command = Command;
if (Parameters != null) this.prms = Parameters;
}
public bool IsSync;
public bool IsEmpty = true;
public readonly string Command;
public object ReturnValue;
public object[] prms;
public Exception Exception;
}
Клиент
Клиент будет реализовать проксирующую связь между методами интерфейса и сервером. Для этого создадим класс реализующий прокси:
private class Proxy<T> : RealProxy where T : class
{
UniservClient client;
public Proxy(UniservClient client): base(typeof(T))
{
this.client = client;
}
public override IMessage Invoke(IMessage msg)
{
IMethodCallMessage call = (IMethodCallMessage)msg;
object[] parameters = call.Args;
int OutArgsCount = call.MethodBase.GetParameters().Where(x => x.IsOut).Count();
Message result = client.Execute(call.MethodName, parameters);
parameters = parameters.Select((x, index) => result.prms[index] ?? x).ToArray();
return new ReturnMessage(result.ReturnValue, parameters, OutArgsCount, call.LogicalCallContext, call);
}
}
И создадим свойства для доступа к методам интерфейсов:
public ICommon Common { get; private set; }
public IDog Dog { get; private set; }
public ICat Cat { get; private set; }
Инициализируем прокси и свойства:
CommonProxy = new Proxy<ICommon>(this);
DogProxy = new Proxy<IDog>(this);
CatProxy = new Proxy<ICat>(this);
Common = (ICommon)CommonProxy.GetTransparentProxy();
Dog = (IDog)DogProxy.GetTransparentProxy();
Cat = (ICat)CatProxy.GetTransparentProxy();
Обработка команд сервера:
private void Listener()
{
while (true)
{
try
{
if (ListenerToken.IsCancellationRequested) return;
if (!IsConnected) _Connect();
while (true)
{
if (ListenerToken.IsCancellationRequested) return;
Message msg = ReceiveData<Message>();
if (msg.Command == "OnPing")
{
// отражаем пинг
SendData(msg);
if (Events.OnPing != null) Events.OnPing.BeginInvoke(null, null);
continue;
}
if (msg.IsSync)
{ // получен результат синхронной процедуры
SyncResult(msg);
}
else
{
// асинхронный вызов события
try
{
// ищем соответствующий Action
var pi = typeof(IEvents).GetProperty(msg.Command, BindingFlags.Instance | BindingFlags.Public);
if (pi == null) throw new Exception(string.Concat("Свойство "", msg.Command, "" не найдено"));
var delegateRef = pi.GetValue(this, null) as Delegate;
// инициализируем событие
if (delegateRef != null) ThreadPool.QueueUserWorkItem(state => delegateRef.DynamicInvoke(msg.prms));
}
catch (Exception ex)
{
throw new Exception(string.Concat("Не удалось выполнить делегат "", msg.Command, """), ex);
}
}
}
}
catch (TaskCanceledException)
{
return;
}
catch (Exception ex)
{
if (Events.OnError != null) Events.OnError.BeginInvoke(ex, null, null);
}
finally
{
_Dicsonnect();
}
Thread.Sleep(2000);
}
}
За выполнение удаленных процедуры отвечают методы:
private Message Execute(string MethodName, object[] parameters)
{
lock (syncLock)
{
_syncResult = new Message(MethodName, parameters);
_syncResult.IsSync = true;
_OnResponce.Reset();
SendData(_syncResult);
_OnResponce.Wait(); // ожидаем ответ сервера
if (_syncResult.IsEmpty)
{// произошел дисконект, результат не получен
throw new Exception(string.Concat("Ошибка при получении результата на команду "", MethodName, """));
}
if (_syncResult.Exception != null) throw _syncResult.Exception; // исключение переданное сервером
return _syncResult;
}
}
private void SyncResult(Message msg)
{ // получен результат выполнения процедуры
_syncResult = msg;
_syncResult.IsEmpty = false;
_OnResponce.Set(); // разблокируем поток
}
В общих чертах клиент работает следующим образом:
Когда приложение вызывает метод интерфейса, выполнение переходит к прокси-серверу. Прокси отправляет серверу имя вызываемого метода вместе с его параметрами и блокирует поток в ожидании результата выполнения. Разбор ответов сервера происходит в другом потоке, который разрешает продолжение заблокированного потока.
Сервер
Создадим иерархию классов, которые будут определять доступность тех или иных функций:
public class Cat_Ring0 : Ring2, ICat
{
public Cat_Ring0(User u) : base(u)
{
up.UserType = UserType.Cat;
}
public void CutTheText(ref string Text)
{
Text = Text.Remove(Text.Length - 1);
}
}
public class Dog_Ring0 : Dog_Ring1, IDog
{
public Dog_Ring0(User u) : base(u)
{
up.UserType = UserType.Dog;
}
public int Bark(int nTimes)
{
var ConnectedDogs = ConnectedUsers.ToArray().Where(x => x.UserType == UserType.Dog).Select(x => x.nStream);
ConnectedDogs.AsParallel().ForAll(nStream =>
{
SendMessage(nStream, new Message("OnBark", new object[] { nTimes}));
});
return ConnectedDogs.Count();
}
}
public class Dog_Ring1 : Ring2
{
public Dog_Ring1(User u): base(u)
{
up.UserType = UserType.Dog;
}
public bool TryFindObject(out object obj)
{
obj = "TheBall";
return true;
}
}
public class Ring2 : Ring, ICommon
{
public Ring2(User u) : base(u) { }
public string[] GetAvailableUsers()
{
return new string[] { "Dog0", "Dog1", "Tom" };
}
public void ChangePrivileges(string Animal, string password)
{
switch (Animal)
{
case "Dog0":
if (password != "groovy!") throw new Exception("Не верный пароль");
up.ClassInstance = new Dog_Ring0(up);
break;
case "Dog1":
if (password != "_password") throw new Exception("Не верный пароль");
up.ClassInstance = new Dog_Ring1(up);
break;
case "Tom":
if (password != "TheCat") throw new Exception("Не верный пароль");
up.ClassInstance = new Cat_Ring0(up);
break;
default:
throw new Exception("Такого пользователя не существует");
}
}
}
public abstract class Ring
{
public readonly User up;
public Ring(User up)
{
this.up = up;
}
}
Теперь достаточно поместить процедуру в определенное “кольцо” что бы соответствующий клиент имел к ней доступ. Ring0 это верхний уровень доступа, пользователь этого типа имеет доступ не только к находящимся в нем процедурам, но и процедурам во всех наследуемых классах. Изначально пользователь попадает в Ring2, который реализует только общие методы доступные всем. Далее с помощью ChangePrivileges() пользователь может, пройдя авторизацию, попасть в определенный тип «кольца» с определенным уровнем доступа.
Основная работа сервера сводится к следующему методу:
private void ProcessMessage(Message msg, User u)
{
string MethodName = msg.Command;
if (MethodName == "OnPing") return;
// ищем запрошенный метод в кольце текущего уровня
MethodInfo method = u.RingType.GetMethod(MethodName, BindingFlags.Instance | BindingFlags.Public | BindingFlags.FlattenHierarchy);
try
{
if (method == null)
{
throw new Exception(string.Concat("Метод "", MethodName, "" недоступен"));
}
try
{
// выполняем метод интерфейса
msg.ReturnValue = method.Invoke(u.ClassInstance, msg.prms);
}
catch (Exception ex)
{
throw ex.InnerException;
}
// возвращаем ref и out параметры
msg.prms = method.GetParameters().Select(x => x.ParameterType.IsByRef ? msg.prms[x.Position] : null).ToArray();
}
catch (Exception ex)
{
msg.Exception = ex;
}
finally
{
// возвращаем результат выполнения запроса
SendMessage(u.nStream, msg);
}
}
Свойство ClassInstance содержит экземпляр “кольца” в котором будет выполняться поиск процедуры по ее имени.
Пример лога выполнения:
В результате получилось простое и элегантное решение аналогичное WCF.
Исходник можно взять тут
Автор: vitaliy91