Доброго времени суток Хабр. Вдохновленный моделью синхронизации потоков в go и сигналов в QT появилась идея реализовать нечто подобное на c#.
Если интересно, прошу под кат.
В данный момент синхронизация потоков в c# вызывает некоторые затруднения, в частности передача примитивов синхронизации между объектами Вашего приложения и поддержка этого всего в дальнейшем.
Текущая модель с Task и IAsyncResult а так же TPL в целом решают все проблемы при должном проектировании но хотелось создать простой класс через который можно будет отправлять и принимать сигналы с блокировкой потока.
В общем в голове созрел некий интерфейс:
public interface ISignal<T> : IDisposable
{
void Send(T signal);
T Receive();
T Receive(int timeOut);
}
, где T — сущность которую необходимо передать получателю.
Пример вызова:
[TestMethod]
public void ExampleTest()
{
var signal = SignalFactory.GetInstanse<string>();
var task1 = Task.Factory.StartNew(() => // старт потока
{
Thread.Sleep(1000);
signal.Send("Some message");
});
// блокировка текущего потока
string message = signal.Receive();
Debug.WriteLine(message);
}
Для получения объекта сигнала создадим фабрику.
public static class SignalFactory
{
public static ISignal<T> GetInstanse<T>()
{
return new Signal<T>();
}
public static ISignal<T> GetInstanse<T>(string name)
{
return new CrossProcessSignal<T>(name);
}
}
Signal — internal класс для синхронизации внутри одного процесса. Для синхронизации необходима ссылка на объект.
CrossProcessSignal — internal класс который может синхронизировать потоки в отдельных процессах(но об этом чуть позже).
Теперь о реализации Signal
Первое, что приходит на ум, в Receive блокировать выполнение потока с помощью Semaphore а в методе Send вызывать Release() этого семафора с количеством блокированных потоков.
После разблокировки потоков возвращать результат из поля класса T buffer. Но мы не знаем какое количество потоков будет висеть в Receive и нет гарантии что к вызову Release не подбежит еще пара потоков.
В качестве примитива синхронизации был выбран AutoResetEvent. Для каждого нового потока будет создаваться свой AutoResetEvent, хранить все это добро мы будем в словаре Dictionary<int,AutoResetEvent> где ключ это id потока.
Собственно поля класса выглядят так:
private T buffer;
Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();
private volatile object sync = new object();
private bool isDisposabled = false;
Объект sync будет нам необходим при вызове Send, дабы несколько потоков не начали перетирать буфер.
isDisposabled флаг указывающий был ли вызван Dispose(), если не вызван то вызываем его в деструкторе.
public void Dispose()
{
foreach(var resetEvent in events.Values)
{
resetEvent.Dispose();
}
isDisposabled = true;
}
~Signal()
{
if (!isDisposabled)
{
Dispose();
}
}
Теперь о методе Receive.
public T Receive()
{
var waiter = GetEvents();
waiter.WaitOne();
waiter.Reset();
return buffer;
}
GetEvents() достает из словаря AutoResetEvent если есть, если нет то создает новый и кладет его в словарь.
waiter.WaitOne() блокировка потока до ожидания сигнала.
waiter.Reset() сброс текущего состояния AutoResetEvent. Следующий вызов WaitOne приведет к блокировке потока.
Осталось только вызвать метод Set для каждого AutoResetEvent.
public void Send(T signal)
{
lock (sync)
{
buffer = signal;
foreach(var autoResetEvent in events.Values)
{
autoResetEvent.Set();
}
}
}
Проверить данную модель можно тестом:
private void SendTest(string name = "")
{
ISignal<string> signal;
if (string.IsNullOrEmpty(name))
{
signal = SignalFactory.GetInstanse<string>(); // создаем локальный сигнал
}
else
{
signal = SignalFactory.GetInstanse<string>(name);
}
var task1 = Task.Factory.StartNew(() => // старт потока
{
for (int i = 0; i < 10; i++)
{
// блокировка потока, ожидание сигнала
var message = signal.Receive();
Debug.WriteLine($"Thread 1 {message}");
}
});
var task2 = Task.Factory.StartNew(() => // старт потока
{
for (int i = 0; i < 10; i++)
{
// блокировка потока, ожидание сигнала
var message = signal.Receive();
Debug.WriteLine($"Thread 2 {message}");
}
});
for (int i = 0; i < 10; i++)
{
// отправка сигнала ожидающим потокам.
signal.Send($"Ping {i}");
Thread.Sleep(50);
}
}
using System.Collections.Generic;
using System.Threading;
namespace Signal
{
internal class Signal<T> : ISignal<T>
{
private T buffer;
Dictionary<int,AutoResetEvent> events = new Dictionary<int, AutoResetEvent>();
private volatile object sync = new object();
private bool isDisposabled = false;
~Signal()
{
if (!isDisposabled)
{
Dispose();
}
}
public T Receive()
{
var waiter = GetEvents();
waiter.WaitOne();
waiter.Reset();
return buffer;
}
public T Receive(int timeOut)
{
var waiter = GetEvents();
waiter.WaitOne(timeOut);
waiter.Reset();
return buffer;
}
public void Send(T signal)
{
lock (sync)
{
buffer = signal;
foreach(var autoResetEvent in events.Values)
{
autoResetEvent.Set();
}
}
}
private AutoResetEvent GetEvents()
{
var threadId = Thread.CurrentThread.ManagedThreadId;
AutoResetEvent autoResetEvent;
if (!events.ContainsKey(threadId))
{
autoResetEvent = new AutoResetEvent(false);
events.Add(threadId, autoResetEvent);
}
else
{
autoResetEvent = events[threadId];
}
return autoResetEvent;
}
public void Dispose()
{
foreach(var resetEvent in events.Values)
{
resetEvent.Dispose();
}
isDisposabled = true;
}
}
}
Данной реализации есть куда расти в плане надежности. В исходниках есть межпроцессорная реализация этой идеи с передачей сигнала через shared memory, если будет интересно могу написать об этом отдельную статью.
Автор: Kalatyn11