На небольшом примере я расскажу как используя библиотеку TPL Dataflow можно решить довольно не тривиальную задачу многопоточной компрессии файлов в течении 15 минут.
Задача
Необходимо реализовать эффективную компрессию файлов используя класс GZipStream находящийся в пространстве имён System.IO.Compression. Предполагается, что сжимать мы будем файлы большие, и их нельзя уместить целиком в оперативной памяти.
TPL Dataflow
TPL Dataflow (TDF) построена поверх вошедшей в .NET 4 библиотеки TPL (The Task Parallel Library) и дополняет ее набором примитивов, для решения более сложных задач чем исходная библиотека. TPL Dataflow использует задачи, потоково-безопасные коллекции и другие возможности, представленные в .NET 4 для добавления поддержки параллельной обработки потоков данных. Суть библиотеки сводится к тому, чтобы стыкуя различные блоки, организовывать различные цепи обработки данных. При этом обработка данных может происходить как синхронно так и асинхронно. Библиотека войдет в грядущий .NET 4.5.
Решение
Для решения этой задачи понадобится всего 3 блока:Буфер для данных считанных из источника данных:
var buffer = new BufferBlock();
* This source code was highlighted with Source Code Highlighter.
Блок сжатия данных:
var compressor = new TransformBlock(bytes => Compress(bytes));
* This source code was highlighted with Source Code Highlighter.
Функция сжатия:
private static byte[] Compress(byte[] bytes)
{
using (var resultStream = new MemoryStream())
{
using (var zipStream = new GZipStream(resultStream, CompressionMode.Compress))
{
using (var writer = new BinaryWriter(zipStream))
{
writer.Write(bytes);
return resultStream.ToArray();
}
}
}
}
* This source code was highlighted with Source Code Highlighter.
Блок записи сжатых данных:
var writer = new ActionBlock(bytes => outputStream.Write(bytes, 0, bytes.Length));
* This source code was highlighted with Source Code Highlighter.
Соединим наши блоки:
buffer.LinkTo(compressor);
compressor.LinkTo(writer);
* This source code was highlighted with Source Code Highlighter.
Так же мы будем сообщать нашим блокам когда данные для них закончились и они могут завершить свою работу. Сделать это можно вызвав метод Complete блока:
buffer.Completion.ContinueWith(task => compressor.Complete());
compressor.Completion.ContinueWith(task => writer.Complete());
* This source code was highlighted with Source Code Highlighter.
По мере считывания файла, мы будем предлагать данные нашему буферу. Делается это вызовом метода Post блока:
while (!buffer.Post(bytes))
{
}
* This source code was highlighted with Source Code Highlighter.
Такая конструкция нам нужна для того, чтобы учесть ситуацию, когда блок полон и не принимает больше данные.
По завершению считывания уведомим наш блок о том, что данные у нас кончились:
buffer.Complete();
* This source code was highlighted with Source Code Highlighter.
Теперь нам осталось дождаться только окончания работы нашего блока, отвечающего за запись сжатых данных в поток:
writer.Completion.Wait();
* This source code was highlighted with Source Code Highlighter.
Получившийся метод:
public static void Compress(Stream inputStream, Stream outputStream)
{
var buffer = new BufferBlock();
var compressor = new TransformBlock(bytes => Compress(bytes));
var writer = new ActionBlock(bytes => outputStream.Write(bytes, 0, bytes.Length));
buffer.LinkTo(compressor);
buffer.Completion.ContinueWith(task => compressor.Complete());
compressor.LinkTo(writer);
compressor.Completion.ContinueWith(task => writer.Complete());
var readBuffer = new byte[BufferSize];
while (true)
{
int readCount = inputStream.Read(readBuffer, 0, BufferSize);
if (readCount > 0)
{
var bytes = new byte[readCount];
Buffer.BlockCopy(readBuffer, 0, bytes, 0, readCount);
while (!buffer.Post(bytes))
{
}
}
if (readCount != BufferSize)
{
buffer.Complete();
break;
}
}
writer.Completion.Wait();
}
* This source code was highlighted with Source Code Highlighter.
Можно было бы закончить на этом, если бы не одно «но»: данный код по скорости работы не отличается от абсолютно синхронного. Для того, чтобы он стал работать быстрее, нам необходимо указать, что нашу операцию сжатия необходимо делать асинхронно. Сделать это можно добавив необходимый настройки в наш блок:
var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
var compressor = new TransformBlock(bytes => Compress(bytes), compressorOptions);
* This source code was highlighted with Source Code Highlighter.
Так же нам необходимо предусмотреть ситуацию, когда данные считываются быстрее чем сжимаются или записываются медленне чем сжимаются. Сделать это можно изменив свойство BoundedCapacity наших блоков:
var buffer = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 100 });
var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, BoundedCapacity = 100 };
var compressor = new TransformBlock(bytes => Compress(bytes), compressorOptions);
var writerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 100, SingleProducerConstrained = true };
var writer = new ActionBlock(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions);
* This source code was highlighted with Source Code Highlighter.
Итоговый метод выглядит вот так:
public static void Compress(Stream inputStream, Stream outputStream)
{
var buffer = new BufferBlock(new DataflowBlockOptions { BoundedCapacity = 100 });
var compressorOptions = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4, BoundedCapacity = 100 };
var compressor = new TransformBlock(bytes => Compress(bytes), compressorOptions);
var writerOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = 100, SingleProducerConstrained = true };
var writer = new ActionBlock(bytes => outputStream.Write(bytes, 0, bytes.Length), writerOptions);
buffer.LinkTo(compressor);
buffer.Completion.ContinueWith(task => compressor.Complete());
compressor.LinkTo(writer);
compressor.Completion.ContinueWith(task => writer.Complete());
var readBuffer = new byte[BufferSize];
while (true)
{
int readCount = inputStream.Read(readBuffer, 0, BufferSize);
if (readCount > 0)
{
var postData = new byte[readCount];
Buffer.BlockCopy(readBuffer, 0, postData, 0, readCount);
while (!buffer.Post(postData))
{
}
}
if (readCount != BufferSize)
{
buffer.Complete();
break;
}
}
writer.Completion.Wait();
}
* This source code was highlighted with Source Code Highlighter.
Вызвать мы его можем например из такого консольного приложения:
private const int BufferSize = 16384;
static void Main(string[] args)
{
var stopwatch = Stopwatch.StartNew();
using (var inputStream = File.OpenRead(@"C:file.bak"))
{
using (var outputStream = File.Create(@"E:file.gz"))
{
Compress(inputStream, outputStream);
}
}
stopwatch.Stop();
Console.WriteLine();
Console.WriteLine(string.Format("Time elapsed: {0}s", stopwatch.Elapsed.TotalSeconds));
Console.ReadKey();
}
* This source code was highlighted with Source Code Highlighter.
Заключение
Как Вы можете видеть использование TPL Dataflow может серьезно упростить решение задач многопоточной обработки данных. На проведенных мной тестах, время, необходимое для компрессии, сократилось почти в 3 раза.
Скачать данную библиотеку и почитать о ней можно на официальной странице.