Здравствуй, читатель. Прошло уже достаточно много времени с момента выхода .NET Core 2.1. И такие крутые нововведения, как Span и Memory, уже рассмотрены весьма широко, про них можно достаточно много прочитать, посмотреть и услышать. Однако, к сожалению, библиотека под названием System.IO.Pipelines не удостоилась такого же внимания. Почти все, что есть по этой теме — единственный пост, который многие перевели и разместили у себя. Информации должно быть однозначно больше, чтобы интересующиеся могли взглянуть на технологию с разных углов.
Введение
Итак, данная библиотека ставит целью ускорить работу с потоковой обработкой данных. Изначально она была создана и использовалась командой разработки Kestrel (кросс-платформенный веб-сервер для ASP.NET Core), но на данный момент поставляется через отдельный nuget-пакет.
Перед тем, как мы углубимся в тему, можно представить себе механизм библиотеки как улучшенный аналог MemoryStream. Проблема оригинального MemoryStream в излишнем числе копирований, что очевидно, если вспомнить, что внутри используется приватный массив байт в качестве буфера. Например, в методах Read и Write отчетливо видно копирование. Таким образом, для объекта, который мы хотим записать в стрим, будет создана копия во внутреннем буфере, а во время чтения потребителю будет доставлена копия внутренней копии. Звучит как не самое рациональное использование пространства.
System.IO.Pipelines не ставит целью заменить все стримы, это дополнительный инструмент в арсенале разработчика, пишущего высокопроизводительный код. Предлагаю ознакомиться с основными методами и классами, посмотреть, как они устроены внутри, и разобрать базовые примеры.
Начнем с внутреннего устройства, параллельно рассматривая простые фрагменты кода. После этого станет ясно, что и как работает, и как это следует использовать. При работе с System.IO.Pipelines стоит помнить, что базовая концепция состоит в том, что все операции чтения-записи должны проходить без дополнительных аллокаций. Но некоторые заманчивые на первый взгляд методы противоречат этому правилу. Соответственно, код, который вы так стараетесь ускорить, начинает выделять память под новые и новые данные, нагружая сборщик мусора.
Внутрянка библиотеки использует широчайшие возможности последних версий языка и райнтайма — Span, Memory, пулы объектов, ValueTask и тд. Туда стоит заглянуть как минимум за отличным примером использования этих возможностей в продакшене.
В свое время некоторые были недовольны реализацией стримов в C#, потому что один класс использовался и для чтения, и для записи. Но, как говорится, методов из класса не выкинешь. Даже если в какой-то стрим не поддерживал чтение/запись/перемещение указателя, в силу вступали свойства CanRead, CanWrite и CanSeek, что выглядело как небольшой костыль. Здесь же дела обстоят иначе.
Для работы с пайпами используются 2 класса: PipeWriter и PipeReader. Данные классы содержат примерно по 50 строк и являются псевдо-фасадами(не самым классическим его воплощениями, так как скрывается за ними единственный класс, а не множество) для класса Pipe, который содержит всю основную логику по работе с данными. Из публичных членов — 2 конструктора, 2 get-only свойства — Reader и Writer, метод Reset(), который сбрасывает внутренние поля к начальному состоянию, чтобы класс можно было переиспользовать. Остальные методы для работы вызываются с помощью псевдо-фасадов.
Для начала о классе Pipe
Экземпляр класса занимает 320 байт, что довольно много (почти треть килобайта, 2 таких объекта не смогли бы поместиться в памяти Манчестерского Марк I). Так что аллоцировать его в больших количествах — идея плохая. Тем более, что по смыслу объект предназначается для долгого использования. Использование пулов также вносит свой аргумент в пользу этого заявления. Ведь объекты, используемые в пуле будут жить вечно(во всяком случае в стандартном).
Заметим, что класс помечен как sealed и что он потокобезопасен — многие участки кода являются критической секцией и обернуты в локи.
Для начала работы следует создать экземпляр класса Pipe и получить с помощью упомянутых свойств объекты PipeReader и PipeWriter.
var pipe = new Pipe();
PipeWriter pipeWriter = pipe.Writer;
PipeReader pipeReader = pipe.Reader;
Рассмотрим методы для работы с пайплами:
Для записи через PipeWriter — WriteAsync, GetMemory/GetSpan, Advance, FlushAsync, Complete, CancelPendingFlush, OnReaderCompleted.
Для чтения через PipeReader — AdvanceTo, ReadAsync, TryRead, Complete, CancelPendingRead, OnWriterCompleted.
Как и сказано в упомянутом посте, класс использует односвязный список буферов. Но, очевидно, между PipeReader и PipeWriter они не передаются — вся логика заложена в одном классе. Данный список используется как для чтения, так и для записи. Более того, возвращаемые данные хранятся именно в этом списке.
Также имеются объекты, указывающие на начало данных для чтения (ReadHead и индекс), конец данных для чтения (ReadTail и индекс) и начало места для записи (WriteHead и количество записанных буферизованных байт). Здесь ReadHead, ReadTail и WriteHead — конкретный сегмент из списка, а индекс указывает на конкретную позицию внутри сегмента. Таким образом запись может начинаться с середины сегмента, захватывать весь следующий сегмент и завершаться на середине третьего. Данные указатели двигаются в различных методах.
Приступим к разбору методов PipeWriter
#1 ValueTask<FlushResult> WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
Как раз тот заманчивый метод. Имеет весьма подходящую и модную сигнатуру — принимает ReadOnlyMemory, асинхронный. И многие могут соблазниться, особенно вспомнив о том, что Span и Memory — это так быстро и круто. Но не стоит обольщаться. Всё, что делает этот метод — копирует переданный ему ReadOnlyMemory во внутренний список. И под «копирует» понимается вызов метода CopyTo, а не копирование самого объекта. То есть все данные, которые мы хотим записать, буду скопированы, тем самым нагружая память. Этот метод стоит изучить лишь для того, чтобы убедится, что им лучше не пользоваться. Ну и, возможно, для некоторых редких ситуациях данное поведение является подходящим.
Тело метода является критической секцией, доступ к нему синхронизируется посредствам монитора.
Тогда может возникнуть вопрос, как записать что-то, если не через самый очевидный и единственный подходящий метод.
#2 Memory<byte> GetMemory(int sizeHint)
Метод принимает один параметр целочисленного типа. В нем мы должны указать, сколько байт мы хотим записать (или больше, но ни в коем случае не меньше). Данный метод проверяет, достаточно ли места для записи в текущем фрагменте памяти, хранящимся в _writingHeadMemory. Если достаточно, то в качестве Memory возвращается _writingHeadMemory. Если же нет, то для данных, записанных в буфер, но для которых не был вызван метод FlushAsync, он вызывается и выделяется еще один BufferSegment, который соединяется с предыдущим (вот и список). При отсутствии _writingHeadMemory, он инициализируется новым BufferSegment. И выделение очередного буфера является критической секцией и делается под локом.
Предлагаю взглянуть на такой пример. На первый взгляд может показаться, что компилятор (или рантайм) бес попутал.
var pipeNoOptions = new Pipe();
Memory<byte> memoryOne = pipeNoOptions.Writer.GetMemory(2);
Console.WriteLine(memoryOne.Length); //2048 или 4096
var pipeWithOptions = new Pipe(new PipeOptions(minimumSegmentSize: 5));
Memory<byte> memoryTwo = pipeWithOptions.Writer.GetMemory(2);
Console.WriteLine(memoryTwo.Length); //16
Но все в этом примере объяснимо и просто.
При создании экземпляра Pipe мы можем передать ему в конструктор объект PipeOptions с опциями для создания.
В PipeOptions есть поле минимального размера сегмента по умолчанию. Еще не так давно оно было 2048, но данный коммит все изменил, теперь 4096. На момент написания статьи версия с 4096 была пререлизным пакетом, в последней релизной версии было значение 2048. Это объясняет поведение первого примера. Если вам критично использование меньшего размера для стандартного буфера, его можно указать в экземпляре типа PipeOptions.
Но во втором примере, где указан минимальный размер, длина все равно ему не соответствует. А это уже происходит потому, что создание нового BufferSegment происходит с использованием пулов. Одной из опций в PipeOptions является пул памяти. После этого для создания нового сегмента будет использоваться именно указанный пул. Если вы не указали свой пул памяти, будет использоваться стандартный ArrayPool, который, как известно, имеет несколько бакетов под разные размеры массивов (каждый следующий в 2 раза больше предыдущего) и при запросе на определенный размер, он ищет бакет с массивами подходящего размера (то есть ближайшего большего или равного). Соответственно, новый буфер почти наверно будет большего размера, чем вы запросили. Минимальный размер массива в стандартном ArrayPool (System.Buffers.TlsOverPerCoreLockedStacksArrayPool) — 16. Но не стоит переживать, ведь это пул массивов. Соответственно, в подавляющем большинстве случаев массив не оказывает нагрузки на сборщик мусора и будет использоваться повторно.
#2.5 Span<byte> GetSpan(int sizeHint)
Работает аналогично, давая Span из Memory.
Таким образом GetMemory() или GetSpan() — главные методы для записи. Они дают нам объект, в который мы может писать. Для этого нам не надо выделять память под новые массивы значений, можно писать напрямую во внутренню структуру. Какой из них использовать в основном будет зависеть от используемого вами API и асинхронности метода. Однако с учетом вышесказанного появляется вопрос. Как читатель узнает о том, сколько мы записали? Если бы использовалась всегда конкретная реализация пула, которая дает массив точно такого же размера, как запрошен, то читатель мог бы читать сразу весь буфер. Однако, как мы уже сказали, нам выделяется буфер с высокой долей вероятности большего размера. Это приводит к следующему необходимому для работы методу.
#3 void Advance(int bytes)
До жути простой метод. В качестве аргумента принимает количество записанных байт. Они увеличивают внутренние счетчики — _unflushedBytes и _writingHeadBytesBuffered, чьи имена говорят сами за себя. А также он обрезает _writingHeadMemory ровно под количество записанных байт (используя метод Slice). Поэтому после вызова этого метода нужно запрашивать новый блок памяти в виде Memory или Span, в прежний писать нельзя. И все тело метода является критической секцией и выполняется под локом.
Казалось бы, после этого читатель может получать данные. Но нужен еще один шаг.
#4 ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken)
Метод вызывается после того, как мы записали в полученный Memory нужные данные и указали, сколько мы туда записали. Метод возвращает ValueTask, однако не является асинхронным (в отличие от его наследника StreamPipeWriter). ValueTask — специальный тип (readonly struct), используемый в случае, когда большинство вызовов не будет использовать асинхронность, то есть все необходимые данные будут доступны к моменту его вызова и метод будет завершаться синхронно. Внутри себя содержит либо данные, либо Task (в случае, если синхронно все же не получилось). Это зависит от состояния свойства _writerAwaitable.IsCompleted. Если поискать, что изменяет состояние этого объекта ожидания, мы увидим, что это происходит при условии, что количество необработанных (not consumed) данных (это не совсем то же самое, что и непрочитанных (not examined), будет объяснено далее) превышает определенный порог (_pauseWriterThreshold). По умолчанию — 16 размеров сегментов. При желании значение можно изменить в PipeOptions. Также данный метод запускает продолжение (continuation) метода ReadAsync, если тот был заблокирован.
Возвращает FlushResult, содержащий 2 свойства — IsCanceled и IsCompleted. IsCanceled показывает, был ли отменен Flush (вызов CancelPendingFlush). IsCompleted показывает, был ли завершен PipeReader (вызовом методов Complete() или CompleteAsync()).
Основная часть метода выполняется под Локом Скайуокером.
Остальные методы PipeWriter'a с точки зрения реализации интереса не представляют и применяются гораздо реже, поэтому будет дано лишь краткое описание.
#5 void Complete(Exception exception = null) или ValueTask CompleteAsync(Exception exception = null)
Помечает пайп как закрытый для записи. После завершения, при попытке использовать методы для записи будет выкинуто исключение. В случае, если PipeReader уже был завершен, завершается и весь экземпляр Pipe. Большая часть работы выполняется под локом.
#6 void CancelPendingFlush()
Как понятно из названия — завершает текущую операцию FlushAsync(). Присутствует лок.
#7 void OnReaderCompleted(Action<Exception, object> callback, object state)
Выполняет переданный делегат, когда читатель завершается (Complete). Также есть лок.
В документации на текущий момент написано, что данный метод может не быть вызван на некоторых наследниках PipeWriter и будет убран в будущем. Поэтому не следует завязывать логику на данные методы.
Переходим к PipeReader
#1 ValueTask<ReadResult> ReadAsync(CancellationToken token)
Здесь, подобно FlushAsync, возвращается ValueTask, что намекает, что метод в большинстве своем синхронный, но не всегда. Зависит от состояния _readerAwaitable. Как и с FlushAsync, необходимо найти, когда _readerAwaitable выставляется в незавершенный. Это происходит, когда PipeReader вычитал все из списка (или в нем остались данные, которые были помечены как прочитанные (examined) и нужно больше данных для продолжения). Что, собственно, и логично. Соответственно, можно сделать вывод, что желательно тонко подстраивать Pipe под вашу работу, выставлять все ее опции вдумчиво, основываясь на эмпирически выявленных статистиках. Грамотная настройка снизит вероятность асинхронной ветки выполнения и позволит более эффективно обрабатывать данные. Почти весь метод окружен локом.
Возвращает некий загадочный ReadResult. По факту это просто буфер + флаги показывающие статус операции (IsCanceled — был ли ReadAsync отменен и IsCompleted, показывающий, был ли PipeWriter закрыт). При этом IsCompleted — значение, указывающее, были ли вызваны методы PipeWriter Complete() или CompleteAsync(). Если данные методы были вызваны с указанием исключения, то оно будет выброшено при попытке чтения.
Буфер опять имеет загадочный тип — ReadOnlySequence. Это, в свою очередь, объект для содержания сегментов (ReadOnlySequenceSegment) начала и конца + индексы старта и конца внутри соответствующих сегментов. Что собственно напоминает структуру самого класса Pipe. Кстати, BufferSegment — наследник ReadOnlySequenceSegment, что наталкивает на мысль, что именно он там и используется. Благодаря этому можно как раз избавится от лишних выделений памяти на передачи данных от писателя к читателю.
Из буфера можно получить ReadOnlySpan для последующей обработки. Для полноты картины можно проверить, содержит ли буфер единственный ReadOnlySpan. В случае если содержит, нам не надо итерироваться по коллекции из одного элемента и мы можем получить его с помощью свойства First. В противном случае надо пробежаться по всем сегментам в буфере и обработать ReadOnlySpan каждого.
Тема для обсуждения — в классе ReadOnlySequence активно применяются nullable reference types и имеется goto (не для выхода из вложенности и не в сгенеренном коде) — в частности здесь
После обработки необходимо дать понять экземпляру Pipe, что мы прочитали данные.
#2 bool TryRead(out ReadResult result)
Синхронная версия. Позволяет получить результат в случае, если он есть. В случае, если его еще нет, в отличие от ReadAsync не блокируется, а возвращает false. Также в локе.
#3 void AdvanceTo(SequencePosition consumed, SequencePosition examined)
В этом методе можно указать, сколько байт мы прочитали, а сколько обработали. Прочитанные, но не обработанные данные будут возвращены при следующем чтении. Эта возможность может показаться странной на первый взгляд, но при обработке потока байт редко необходимо обрабатывать каждый байт по отдельности. Обычно обмен данными идет с помощью сообщений. Может возникнуть ситуация, что читатель при чтении получил одно целое сообщение и часть второго. Целое необходимо обработать, а часть второго оставить на следующий раз, чтобы она пришла вместе с оставшейся частью. Метод AdvanceTo принимает SequencePosition, который собственно является сегментом + индексом в нем. При обработке всего, что прочел ReadAsync, можно указать buffer.End. В противном случае придется явно создавать позицию, указывая сегмент и индекс, на котором была прекращена обработка. Под капотом лок.
Также в случае, если объем необработанной информации меньше, чем установленный порок(_resumeWriterThreshold), он запускает продолжение PipeWriter, если тот был заблокирован. По умолчанию этот порог — 8 объемов сегмента (половина порога блокировки).
#4 void Complete(Exception exception = null)
Завершает PipeReader. Если к этому моменту PipeWriter завершен, то завершается весь экземпляр Pipe. Лок внутри.
#5 void CancelPendingRead()
Позволяет отменить чтение, которое сейчас ожидается. Лок.
#6 void OnWriterCompleted(Action<Exception, object> callback, object state)
Позволяет указать делегат для исполнения по завершении PipeWriter.
Как и аналогичный метод у PipeWriter, в документации стоит та же пометка, что будет убран. Лок под капотом.
Пример
В листинге ниже приведен пример работы с пайпами.
С момента появления в .NET Core Span и Memory, многие классы для работы с данными были дополнены перегрузками, использующими данные типы. Так что общая схема взаимодействия будет примерно одинаковая. В своем примере я использовал пайплайны для работы с пайпами (люблю однокоренные слова), т.е. каналами — объектами ОС для межпроцессорного взаимодействия. API каналов как раз было расширено соответствующим образом, чтобы читать данные в Span и Memory. Асинхронная версия использует Memory, так как асинхронный метод будет преобразован в шаблонный метод, использующий автосгенерированный конечный автомат, в котором сохраняются все локальные переменные и параметры метода, а так как Span — ref readonly struct, он не может находиться в куче, соответственно, использование Span в асинхронном методе невозможно. Но есть и синхронная версия метода, которая позволяет использовать Span. В своем примере я попробовал обе и оказалось, что синхронная версия в данной ситуации показывает себя лучше. При ее использовании происходит меньше сборок мусора, а обработка данных происходит быстрее. Но это было лишь потому, что данных было в избытке. В случае, когда вероятна ситуация, при которой данных не будет на момент обращения за очередной порцией, следует использовать асинхронную версию, дабы не напрягать процессор в холостую.
В примере имеются комментарии, которые поясняют некоторые моменты. Обращаю внимание, что несмотря на то, что фрагменты программы, отвечающие за считывание из пайпы и обработку разделены, при записи в файл данные читаются именно из того места, куда пишутся при считывании из пайпы.
class Program
{
static async Task Main(string args)
{
var pipe = new Pipe();
var dataWriter = new PipeDataWriter(pipe.Writer, "testpipe");
var dataProcessor = new DataProcessor(new ConsoleBytesProcessor(), pipe.Reader);
var cts = new CancellationTokenSource();
await Task.WhenAll(dataWriter.ReadFromPipeAsync(cts.Token), dataProcessor.StartProcessingDataAsync(cts.Token));
}
}
public class PipeDataWriter
{
private readonly NamedPipeClientStream _namedPipe;
private readonly PipeWriter _pipeWriter;
private const string Servername = ".";
public PipeDataWriter(PipeWriter pipeWriter, string pipeName)
{
_pipeWriter = pipeWriter ?? throw new ArgumentNullException(nameof(pipeWriter));
_namedPipe = new NamedPipeClientStream(Servername, pipeName, PipeDirection.In);
}
public async Task ReadFromPipeAsync(CancellationToken token)
{
await _namedPipe.ConnectAsync(token);
while (true)
{
token.ThrowIfCancellationRequested();
//// при работе с асинхронным методом используем Memory<T>
//Memory<byte> buffer = _pipeWriter.GetMemory();
//// асинхронное чтение из именованного канала в Memory<T>
//// здесь может быть любая операция для получения данных - от считывания с файла до рандомной генерации.
//int readBytes = await _namedPipe.ReadAsync(buffer, token);
// синхронное чтение из именованного канала в запрошенный у PipeWriter Span
// здесь может быть любая операция для получения данных - от считывания с файла до рандомной генерации.
int readBytes = _namedPipe.Read(_pipeWriter.GetSpan());
// если в канале ничего не было, отпускаем поток на полсекунды и пытаемся снова
// в других случаях при данной ситуации можно выходить
if (readBytes == 0)
{
await Task.Delay(500, token);
continue;
}
// указываем, сколько байт мы взяли из канала
_pipeWriter.Advance(readBytes);
// флашим данные, чтобы они стали доступны для PipeReader
FlushResult result = await _pipeWriter.FlushAsync(token);
// Если PipeReader был завершен, писать данные ему уже не нужно
// ЗЫ данное поведение было выбрано мной для примера, оно зависит от бизнес логики
if (result.IsCompleted)
{
break;
}
}
// завершить _pipeWriter для завершения всего экземпляра Pipe
_pipeWriter.Complete();
}
}
public class DataProcessor
{
private readonly IBytesProcessor _bytesProcessor;
private readonly PipeReader _pipeReader;
public DataProcessor(IBytesProcessor bytesProcessor, PipeReader pipeReader)
{
_bytesProcessor = bytesProcessor ?? throw new ArgumentNullException(nameof(bytesProcessor));
_pipeReader = pipeReader ?? throw new ArgumentNullException(nameof(pipeReader));
}
public async Task StartProcessingDataAsync(CancellationToken token)
{
while (true)
{
token.ThrowIfCancellationRequested();
// чтение данных из экземпляра Pipe
ReadResult result = await _pipeReader.ReadAsync(token);
ReadOnlySequence<byte> buffer = result.Buffer;
// Производим вычисления с полученными данными
await _bytesProcessor.ProcessBytesAsync(buffer, token);
// указываем, до какой позиции данные были обработаны. В данном случае в файл записывается все, что приходит
// в ситуациях, когда были обработаны не все данные требуется создать позицию вручную используя буфер и индекс
// при таком раскладе IBytesProcessor.ProcessBytesAsync может быть дополнен, возвращая эту позицию
_pipeReader.AdvanceTo(buffer.End);
// если PipeWriter был завершен, производить чтение уже не нужно
// данное поведение также было выбрано мной, оно зависит от бизнес логики
if (result.IsCompleted)
{
break;
}
}
// завершить _pipeReader для завершения всего экземпляра Pipe
_pipeReader.Complete();
}
}
public interface IBytesProcessor
{
Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token);
}
public class ConsoleBytesProcessor : IBytesProcessor
{
//Представим, что в этом классе есть нормальный конструктор и IDisposable
readonly FileStream _fileStream = new FileStream("buffer", FileMode.Create);
public Task ProcessBytesAsync(ReadOnlySequence<byte> bytesSequence, CancellationToken token)
{
if (bytesSequence.IsSingleSegment)
{
ProcessSingle(bytesSequence.First.Span);
}
else
{
foreach (var segment in bytesSequence)
{
ProcessSingle(segment.Span);
}
}
return Task.CompletedTask;
}
private void ProcessSingle(ReadOnlySpan<byte> span)
{
_fileStream.Write(span);
}
}
Автор: ZloyChert