Недавно возникла необходимость решить следующую несложную задачку: есть несколько десятков устройств (учебных комплексов), у которых нужно регулярно запрашивать их текущее состояние. Комплексы общаются по протоколу UDP, и хотелось сделать так, чтобы не задумываться о цикле опроса и определении, от какого же устройства пришел ответ, а просто посылать запрос — и когда пришел результат — записывать его. Задачу эту я решал и раньше, но захотелось посмотреть, насколько концепция async/await упростит и сократит код. Оказалось, что финальный результат занимает меньше странички.
Вся логика опроса состоит всего лишь из двух методов — цикла чтения сокета UDP и метода посылки команды на устройство.
Когда посылаем команду, есть две вещи, которые надо принять во внимание — это 1) после посылки команды нам надо ждать ответа от устройства и 2) ответ может не прийти — тогда необходимо вернуть исключение, которое скажет нам о таймауте.
Асинхронный метод посылки команды выглядит следующим образом (*см. Update 1):
public async Task<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
{
var tokenSource = new CancellationTokenSource(timeOut);
var token = tokenSource.Token;
var tcs = new TaskCompletionSource<byte[]>();
var key = new Tuple<string, int>(ip, port);
_tcsDictionary.Add(key, tcs);
await _client.SendAsync(msg, msg.Length, ip, port);
var result = await tcs.Task.WithCancellation(token);
if(_tcsDictionary.ContainsKey(key)) _tcsDictionary.Remove(key);
return result;
}
Здесь _client — это стандартный UdpClient.
Мы посылаем команду и по await ждем результата, который нам должен вернуть Task, сохраненный в словарике с ключом нашего соединения (именно от него мы и ждем ответ). Когда чтение начинается — мы заносим TaskCompletionSource в словарик, когда мы получаем ответ и соединение больше не нужно — удаляем из словарика.
Сам словарик (ConcurrentDictionary используем вместо Dictionary для того, чтобы избежать проблем с кросспоточными вызовами):
private ConcurrentDictionary<Tuple<string,int>, TaskCompletionSource<byte[]>> _tcsDictionary;
Тут есть момент, который заслуживает внимания — это метод-расширение WithCancellation(token). Он нужен для того, чтобы поддержать отмену операции при помощи CancellationToken, и отменяет задачу, возвращая исключение при превышении заданного таймаута.
static class TaskExtension
{
public static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
using (cancellationToken.Register(
s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
if (task != await Task.WhenAny(task, tcs.Task))
throw new OperationCanceledException(cancellationToken);
return await task;
}
}
А вот и сам цикл чтения: читаем, пока хватит сил, и если пришедшая датаграмма имеет адресом соединение, ключ с параметрами которого мы уже занесли в словарик, то результат помещается в TaskCompletionSource по этому ключу, и мы переходим обратно в метод посылки сообщения на await tcs.Task, только уже имея на руках нужный результат от устройства, этот результат и вернем в место вызова.
Task.Run(() =>
{
IPEndPoint ipEndPoint = null;
while (true)
{
var receivedBytes = _client.Receive(ref ipEndPoint);
var ip = ipEndPoint.Address.ToString();
var port = ipEndPoint.Port;
var key = new Tuple<string, int>(ip, port);
TaskCompletionSource<byte[]> tcs;
if(_tcsDictionary.TryGetValue(key,out tcs)) tcs.SetResult(receivedBytes);
}
});
Итог радует. Вот так async-await упростил задачу опроса множества устройств по протоколу UDP.
Update 1
Как было справедливо отмечено в комментариях, метод SendReceiveUdpAsync необходимо переписать таким образом, чтобы в случае отмены задачи и выброса исключения удалялось значение из словарика:
public async Task<byte[]> SendReceiveUdpAsync(byte[] msg, string ip, int port, int timeOut)
{
try{
...
return result;
}
finally{
if(_tcsDictionary.ContainsKey(key)) _tcs.Dictionary.Remove(key);
}
}
Автор: NeoNN