Всем привет!
Последние годы я занимаюсь разработкой под Андроид на Котлине. Не так давно, за неимением RxJava на Kotlin multiplatform, мы начали использовать корутины и flow – холодные стримы для Котлина из коробки. До Андроида я много лет провёл с C#, и там свои корутины есть уже очень давно, только их там так называть не принято. Но вот про аналог flow на async/await я не слышал. Основной инструмент для реактивного программирования – Rx.Net (собственно, здесь rx и родился). Вот я и решил поностальгировать и попробовать напилить велосипед.
Далее подразумевается, что читатель имеет представление о штуках, про которые говорилось в предыдущем абзаце. Для нетерпеливых — сразу ссылка на репозиторий:
https://github.com/ILAgent/flowsharp
Дисклеймер: данный код не претендует на использование в продакшене. Это — концепт, не более того. Что-то может работать не совсем так, как задумывалось.
IFlow и IFlowCollector
Что ж, начнём с того, что перепишем в лоб интерфейсы Flow и FlowCollector на C#.
Было:
interface Flow<out T> {
suspend fun collect(collector: FlowCollector<T>)
}
interface FlowCollector<in T> {
suspend fun emit(value: T)
}
Стало:
public interface IFlow<out T>
{
Task Collect(IFlowCollector<T> collector);
}
public interface IFlowCollector<in T>
{
Task Emit(T item);
}
Полагаю, отличия понятны и объясняются разной реализацией асинхронности.
Чтобы воспользоваться этими интерфейсами, их надо реализовать. Вот что получилось:
internal class Flow<T> : IFlow<T>
{
private readonly Func<IFlowCollector<T>, Task> _emitter;
public Flow(Func<IFlowCollector<T>, Task> emitter)
{
_emitter = emitter;
}
public Task Collect(IFlowCollector<T> collector)
{
return _emitter(collector);
}
}
internal class FlowCollector<T> : IFlowCollector<T>
{
private readonly Func<T, Task> _handler;
public FlowCollector(Func<T, Task> handler)
{
_handler = handler;
}
public Task Emit(T item)
{
return _handler(item);
}
}
В конструктор flow передаём функцию, которая будет эмитить значения. А в конструктор коллектора – функцию, которая будет обрабатывать каждое эмитированное значение. Использовать это можно так
var flow = new Flow<int>(async collector =>
{
await collector.Emit(1);
await Task.Delay(1000);
await collector.Emit(2);
await Task.Delay(1000);
await collector.Emit(3);
});
var collector = new FlowCollector<int>(async item => Console.WriteLine(item));
await flow.Collect(collector);
Думаю, в коде выше всё понятно. Сначала мы создаём Flow, затем создаём коллектор (обработчик каждого элемента). Затем запускаем Flow, «подписав» на него коллектор. Если добавить немного сахара (см. гитхаб), то получим что-то вроде этого:
await Flow<int>(async collector =>
{
await collector.Emit(1);
await Task.Delay(1000);
await collector.Emit(2);
await Task.Delay(1000);
await collector.Emit(3);
})
.Collect(Console.WriteLine);
На Котлине это выглядит вот так:
scope.launch{
flow{
emit(1)
delay(1000)
…
}.collect{ printl(it) }
}
Лично мне в варианте на Шарпе больше всего не нравится необходимость явно указывать тип элемента при создании флоу. Но дело тут не в том, что вывод типов в Котлине сильно круче. Функция flow выглядит так:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
Как мы видим, параметр block помечен аннотацией BuilderInference, которая и подсказывает компилятору, что тип надо взять из этого параметра. Кто-нибудь знает, можно ли напилить подобное для C# на Roslyn?
CancellationToken
В rx есть подписка, от которой можно отписаться. В Kotlin Flow за отмену отвечает Job, которую возвращает билдер, либо Coroutine Scope. Нам тоже определённо необходим инструмент, позволяющий Flow завершиться досрочно. В C# для отмены асинхронных операций используется, не побоюсь этого слова, паттерн Cancellation Token. CancellationToken – это класс, объект которого предоставляет асинхронной операции информацию о том, что она отменена. Он прокидывается в асинхронную операцию при старте, и эта операция сама смотрит за его состоянием. А меняется состояние извне.
Короче, нам надо прокинуть CancellationToken в наши Flow и FlowCollector.
public interface IFlow<out T>
{
Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default);
}
public interface IFlowCollector<in T>
{
Task Emit(T item, CancellationToken cancellationToken = default);
}
Реализацию пастить сюда не буду – см. гитхаб.
Тест теперь будет выглядеть вот так:
var cts = new CancellationTokenSource();
var flowTask = Flow<int>(async (collector, cancellationToken) =>
{
await collector.Emit(1);
await Task.Delay(2000, cancellationToken);
await collector.Emit(2);
await Task.Delay(2000, cancellationToken);
await collector.Emit(3);
})
.Collect(item => Log(item), cts.Token);
var cancelationTask = Task.Run(async () =>
{
await Task.Delay(3000);
cts.Cancel();
});
await flowTask;
Суть такова. Параллельно Flow запускаем операцию, которая через 3 секунды его отменит. В результате Flow не успевает эмитировать третий элемент и завершается с TaskCanceledException, что и является требуемым поведением.
Немного практики
Давайте попробуем использовать то, что получилось, на практике. Например, обернём какой-нибудь event в наш Flow. В Rx.Net для этого даже существует библиотечный метод FromEventPattern.
Чтобы не связываться с реальным UI, я написал класс ClicksEmulator, который генерирует условные нажатия на кнопку мыши через случайные интервалы времени.
public class ClicksEmulator
{
public enum Button { Left, Right }
public class ClickEventArgs : EventArgs
{
//…
public int X { get; }
public int Y { get; }
public Button Button { get; }
}
public event EventHandler<ClickEventArgs> ButtonClick;
public async Task Start(CancellationToken cancellationToken = default) {… }
}
Я опустил реализацию, т.к. она здесь не очень важна. Главное – это event ButtonClick, который мы хотим превратить во Flow. Для это напишем метод-расширение
public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator)
{
return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) =>
{
void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args);
emulator.ButtonClick += clickHandler;
cancellationToken.Register(() =>
{
emulator.ButtonClick -= clickHandler;
});
await Task.Delay(-1, cancellationToken);
});
}
Сначала мы объявляем обработчик события, который ничего не делает, кроме передачи аргумента события в коллектор. Затем подписываемся на события и регистрируем отписку в случае отмены (завершения) flow. Ну и далее бесконечно ждём и слушаем события ButtonClick, пока cancellationToken не выстрелит.
Если вы использовали callbackFlow или channelFlow в Котлине или создавали холодные Observable из listener’ов в Rx, то вы отметите, что структура кода во всех случаях очень схожа. Это прекрасно, но возникает вопрос – чем Flow в данном случае лучше, чем сырой event? Вся сила реактивных стримов – в операторах, которые выполняют разные преобразования над ними: фильтрацию, маппинг и многие другие, более сложные. Но у нас их пока нет. Давайте попробуем что-нибудь с этим сделать.
Filter, Map, OnNext
Начнем с одного из самых простых операторов — Filter. Он, как это очевидно из названия, будет фильтровать элементы flow в соответствии с заданным предикатом. Это будет extension-метод, применяемый к оригинальному flow и возвращающий flow только с отфильтрованными элементами. Получается, нам надо брать каждый элемент из оригинального flow, проверять, и эмитить дальше, если предикат возвращает true. Так и сделаем:
public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) =>
FlowFactory.Flow<T>((collector, cancellationToken) =>
source.Collect(item =>
{
if (predicate(item))
collector.Emit(item);
}, cancellationToken)
);
Теперь, если нам нужны нажатия только на левую кнопку мыши, можно написать так:
emulator
.Clicks()
.Filter(click => click.Button == ClicksEmulator.Button.Left)
.Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
По аналогии напишем операторы Map и OnNext. Первый преобразует каждый элемент исходного flow в другой с помощью переданной функции-маппера. Второй будет возвращать flow с теми же элементами, что и оригинальный, но выполняя на каждом какое-то действие Action (обычно логирование).
public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) =>
FlowFactory.Flow<R>((collector, cancellationToken) =>
source.Collect(
item => collector.Emit(mapper(item)),
cancellationToken
)
);
public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) =>
FlowFactory.Flow<T>((collector, cancellationToken) =>
source.Collect(item =>
{
action(item);
collector.Emit(item);
}, cancellationToken)
);
И пример использования:
emulator
.Clicks()
.OnNext(click => Log($"{click.Button} {click.X} {click.Y}"))
.Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1)
.Collect(item => Log($"{item}"), cts.Token);
Вообще для реактивных стримов придумано очень много операторов, их можно найти, например, здесь: http://reactivex.io/documentation/operators.html
И ничего не мешает реализовать любые из них для IFlow.
Те, кто знаком с Rx.Net, знают, что там, помимо новых и специфичных операторов для IObservable, используются методы-расширения из Linq-to-objects, и это позволяет рассматривать стримы как “коллекции событий” и манипулировать ими с помощью привычных Linq-методов. Почему бы вместо того, чтобы писать операторы самим, не попробовать поставить IFlow на рельсы Linq?
IAsyncEnumerable
В C# 8 завезли асинхронную версию IEnumerable — IAsyncEnumerable — интерфейс коллекции, по которой можно итерироваться асинхронно. Принципиальная разница между IAsyncEnumerable и реактивными стримами (IObservable и IFlow ) вот в чём. IAsyncEnumerable, как и IEnumerable — это pull-модель. Мы итерируемся по коллекции сколько и когда нам надо и сами тянем из неё элементы. Стримы — это push. Мы подписываемся на события и “реагируем” на них, когда они приходят — на то они и реактивные. Однако от pull-модели можно добиться push-like поведения. Это называется long polling https://en.wikipedia.org/wiki/Push_technology#Long_polling. Суть такая: мы, итерируясь по коллекции, запрашиваем очередной её элемент и ждём сколь угодно долго, пока коллекция нам его не вернёт, т.е. пока очередное событие не наступит. IAsyncEnumerable, в отличие от IEnumerable, позволит нам ждать асинхронно. Короче, нам надо как-то натянуть IAsyncEnumerable на IFlow.
Как известно, за возврат текущего элемента коллекции IAsyncEnumerable и переход к следующему элементу отвечает интерфейс IAsyncEnumerator. При этом нам надо брать элементы из IFlow, а этим занимается IFlowCollector. Получается вот такой объект, реализующий эти интерфейсы:
internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T>
{
private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1);
private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1);
private bool _isFinished;
public T Current { get; private set; }
public async ValueTask DisposeAsync() { }
public async Task Emit(T item, CancellationToken cancellationToken)
{
await _backpressureSemaphore.WaitAsync(cancellationToken);
Current = item;
_longPollingSemaphore.Release();
}
public async Task Finish()
{
await _backpressureSemaphore.WaitAsync();
_isFinished = true;
_longPollingSemaphore.Release();
}
public async ValueTask<bool> MoveNextAsync()
{
_backpressureSemaphore.Release();
await _longPollingSemaphore.WaitAsync();
return !_isFinished;
}
}
Основное здесь методы — Emit, Finish и MoveNextAsync.
Emit в начале ждёт момента, когда очередной элемент из коллекции будет запрошен. Т.е. не эмитит элемент, пока он не потребуется. Это называется backpressure, отсюда и имя семофора. Затем выставляется текущий item и сообщается, что long polling запрос может получить результат.
MoveNextAsync вызывается, когда из коллекции тянут очередной элемент. Он отпускает _backpressureSemaphore и ждёт, когда Flow запушит очередной элемент. Затем он возвращает признак того, закончилась ли коллекция. Этот флаг выставляет метод Finish.
Finish работает по тому же принципу, что и Emit, только вместо очередного элемента выставляет признак конца коллекции.
Теперь надо этот класс заиспользовать.
public static class AsyncEnumerableExtensions
{
public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default)
{
var collector = new FlowCollectorEnumerator<T>();
flow
.Collect(collector, cancellationToken)
.ContinueWith(_ => collector.Finish(), cancellationToken);
return new FlowEnumerableAdapter<T>(collector);
}
}
internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T>
{
private readonly IAsyncEnumerator<T> _enumerator;
public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator)
{
_enumerator = enumerator;
}
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
return _enumerator;
}
}
Extension-метод CollectEnumerable для IFlow создаёт FlowCollectorEnumerator и подписывает на него flow, по завершению которого вызывается метод Finish(). И возвращает FlowEnumerableAdapter, который является простейшей реализацией IAsyncEnumerable, использующей FlowCollectorEnumerator в качестве IEnumerator.
Пробуем, что получилось.
var clicks = emulator
.Clicks()
.OnNext(item => Log($"{item.Button} {item.X} {item.Y}"))
.CollectEnumerable(cts.Token)
.Where(click => click.Button == ClicksEmulator.Button.Right)
.Select(click => click.Y < 540 ? "TOP" : "LEFT");
await foreach (var click in clicks)
{
Log($"Clicked at: {click}");
}
Здесь мы получаем Flow clicks(), каждый клик логируем, затем превращаем IFlow в IAsyncEnumerable. Далее применяет известные Linq-операторы: оставляем только клики правой кнопкой и получаем, в какой части экрана они сделаны.
Далее рассмотрим пример посложнее. Будем заменять правый клик на двойной левый. Т.е. нам надо будет мапить каждый элемент не в какой-то другой, а в коллекцию. Либо во Flow, преобразуемый в коллекцию.
var clicks = emulator
.Clicks()
.OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}"))
.CollectEnumerable(cts.Token)
.Select(click => click.Button == ClicksEmulator.Button.Left
? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click))
: Flow<ClicksEmulator.ClickEventArgs>(async collector =>
{
var changedClick =
new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left);
await collector.Emit(changedClick);
await Task.Delay(200);
await collector.Emit(changedClick);
})
)
.SelectMany(flow => flow.CollectEnumerable());
await foreach (var click in clicks)
{
Log($"Changed: {click.Button} {click.X} {click.Y}");
}
Для этого в Linq существует оператор SelectMany. Его аналог в реактивных стримах — FlatMap. Сначала мапим каждый клик в IFlow: для левого клика — Flow с одним этим кликом, для правого — Flow из двух левых кликов с задержкой между ними. А затем в SelectMany превращаем IFlow в IAyncEnumerable.
И это работает! Т.е. многие операторы не обязательно реализовывать для IFlow — можно использовать Linq.
Заключение
Rx.Net — был и остаётся главным инструментом при работе с асинхронными последовательностями событий на C#. Но это довольно большая библиотека по объёмы кода. Как мы увидели, похожую функциональность можно получить значительно проще — всего лишь два интерфейса плюс некоторая обвязка. Это возможно благодаря использованию возможностей языка — async/await. Когда зарождался Rx, эту фичу в C# ещё не завезли.
Спасибо за внимание!
Автор: Иван