В одной конторе соискателю на позицию Senior C# developer выдали тестовое задание: отсортировать файл со строками определенного формата.
Требования такие:
-
Формат строки: число, точка, пробел, далее любые символы до конца строки.
-
Порядок сортировки — сначала сортируем текстовой части строки, потом по числу если текстовые части совпадают.
-
Кодировка — UTF-8.
-
Размер файла — 100гб - гарантированно больше объема ОП.
-
Должно отработать за 1 час на машине проверяющего, вряд ли там будет супер-быстрый SSD и огромное количество оперативной памяти.
Как и многие другие программисты, узнав о таком тестовом задании, я возмутился. Внешнюю сортировку слиянием практически всех проходили в ВУЗе, но практически никто никогда не писал её. Задача очень непрактическая и непонятно какие навыки проверяет. Так мне казалось.
Эта задача вызвала бурные обсуждения о способах её решения. Многие программисты, причисляющие себя к рангу senior, предложили использовать базы данных, ибо не барское это дело - вручную писать алгоритмы сортировки. Некоторые даже попытались сделать решение на Apache Spark. Однако никто до конца задачу не решил, ибо мало кому удалось отсортировать в нужном порядке даже 10ГБ файл менее чем за 15 минут без SSD.
Я подумал, что стоит решить задачу до конца с помощью программирования, и тоже причислить себя к рангу senior developer.
В первую очередь я написал генератор тестового файла, который генерирует нужное количество строк из исходного файла. В качестве исходного взял первый том Войны и Мира, так как там есть как русские, так и английские символы.
Код генератора
var source = (from l in File.ReadLines("source.txt")
where !string.IsNullOrEmpty(l)
from s in l.Split(new[] { '.', '?', '!', '[', ']' }, StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
where s.Length > 10
select s).ToList();
Random rand = new();
using (var f = File.CreateText(file))
{
f.AutoFlush = false;
while(f.BaseStream.Position < maxSize)
{
var n = rand.Next();
f.Write(n);
f.Write(". ");
f.WriteLine(source[rand.Next(source.Count)]);
}
}
return 0;
Для начала решил сгенерировать 10ГБ, чтобы не ждать час на каждом тестовом прогоне. Кроме того файл такого размера не помещается в кэши операционной системы и операции чтения-записи доходят до диска, что дает представление о реальном быстродействии на больших объемах.
Самое простое работающее решение
Все началось со статьи на хабре о внешней сортировке. Сразу отбросил идею нескольких прогонов для объединения блоков, так как это привело бы к дополнительным затратам на запись. Весь код разделил на две фазы — разбиение исходного файла на отдельные блоки (чанки, от английского chunk) и сортировка строк в блоках, слияние блоков в один файл.
Код разбиения:
var count = 0;
var tempFiles =
File.ReadLines(file)
.Select(s => new Item(s, s.IndexOf('.')))
.Chunk(chunkSize)
.Select(chunk =>
{
Array.Sort(chunk, comparer);
var tempFileName = Path.ChangeExtension(file, $".part-{count++}" + Path.GetExtension(file));
File.WriteAllLines(tempFileName, chunk.Select(x => x.Line));
return tempFileName;
}).ToList();
Код слияния:
try
{
var mergedLines = tempFiles
.Select(f => File.ReadLines(f).Select(s => new Item(s, s.IndexOf('.'))))
.Merge(comparer) // IEnumerable<IEnumerable<T>> -> IEnumerable<T>
.Select(x => x.Line);
File.WriteAllLines(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)), mergedLines);
}
finally
{
tempFiles.ForEach(File.Delete);
}
Для того, чтобы удобнее писать код, определил тип, содержащий строку и позицию точки в строке и компаратор для этого типа:
public record struct Item(string Line, int DotPosition);
public record Comparer(StringComparison StringComparison) : IComparer<Item>
{
public int Compare(Item x, Item y)
{
var spanX = x.Line.AsSpan();
var spanY = y.Line.AsSpan();
var xDot = x.DotPosition;
var yDot = y.DotPosition;
var cmp = spanX[(xDot + 2)..].CompareTo(spanY[(yDot + 2)..], StringComparison);
if (cmp != 0) return cmp;
return int.Parse(spanX[..xDot]) - int.Parse(spanY[..yDot]);
}
}
"Сердце" всего алгоритма внешней сортировки - слияние итераторов:
public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default)
{
var enumerators = (from source in sources
let e = source.GetEnumerator()
where e.MoveNext()
select e).ToList();
while (enumerators.Count > 0)
{
var min = enumerators.MinBy(e => e.Current, comparer)!;
yield return min.Current;
if (!min.MoveNext())
{
min.Dispose();
enumerators.Remove(min);
}
}
}
Почему я не использовал asyncawait? Ведь сейчас все программисты C# втыкают asyncawait на автомате. Конечно я тоже так сделал сначала, но потом убрал.
Во-первых для асинхронных итераторов сложнее написать Merge. Во-вторых код с asyncawait медленнее работал. asyncawait несет дополнительные расходы на переключение контекста, продолжения вызывают всю цепочку асинхронных методов. Это может быть выгодно когда нам надо распараллелить ожидание, но в этом коде никаких параллельных ожиданий нет. Все операции происходят последовательно.
Первый запуск
Запустил сортировку слиянием, размер чанка - 1М строк или около 157Мб, время работы - 15:30, пятнадцать с половиной минут! В час для 100Гб уложиться не выйдет.
Что по вашему тормозило в этом коде больше всего? Напишите свой вариант в комментариях, прежде чем разворачивать спойлер и читать дальше.
Тайминг
SplitSort done in 00:04:59.2942000
Merge done in 00:10:32.1238153
Диспетчер задач показывал, что во время сортировки ресурсы компьютера задействуются очень мало:
Оптимизируем слияние
Дольше всего выполняется не чтение или запись, а поиск минимального элемента во время слияния. Этот код я честно написал сам, не подсматривая в готовые решения. Гораздо эффективнее будет отсортировать итераторы один раз, а далее поддерживать их отсортированность после вызова .MoveNext(), даже на StackOverflow предлагают такой вариант.
Лучше всего подойдет двоичная (она же бинарная) куча. Она имеет минимальный элемент в корне и позволяет восстановить отсортированность за O(logN), где K - количество элементов в куче (у нас равно числу чанков). Естественно это я не сам придумал, а подсмотрел в интернете.
Методы работы с кучей
public static void Heapify<T>(this Span<T> heap, int index, IComparer<T> comparer)
{
ArgumentNullException.ThrowIfNull(comparer);
var min = index;
while (true)
{
var leftChild = 2 * index + 1;
var rightChild = 2 * index + 2;
var v = heap[index];
if (rightChild < heap.Length && comparer.Compare(v, heap[rightChild]) > 0)
{
min = rightChild;
v = heap[min];
}
if (leftChild < heap.Length && comparer.Compare(v, heap[leftChild]) > 0)
{
min = leftChild;
}
if (min == index) break;
var temp = heap[index];
heap[index] = heap[min];
heap[min] = temp;
index = min;
}
}
public static void BuildHeap<T>(this Span<T> heap, IComparer<T> comparer)
{
ArgumentNullException.ThrowIfNull(comparer);
for (int i = heap.Length / 2; i >= 0; i--)
{
Heapify(heap, i, comparer);
}
}
Код метода слияния:
public static IEnumerable<T> Merge<T>(this IEnumerable<IEnumerable<T>> sources, IComparer<T> comparer = default)
{
var heap = (from source in sources
let e = source.GetEnumerator()
where e.MoveNext()
select e).ToArray();
var enumeratorComparer = new EnumeratorComparer<T>(comparer ?? Comparer<T>.Default);
heap.AsSpan().BuildHeap(enumeratorComparer);
while (true)
{
var min = heap[0];
yield return min.Current;
if (!min.MoveNext())
{
min.Dispose();
if (heap.Length == 1) yield break;
heap[0] = heap[^1];
Array.Resize(ref heap, heap.Length - 1);
}
heap.AsSpan().Heapify(0, enumeratorComparer);
}
}
private record EnumeratorComparer<T>(IComparer<T> comparer) : IComparer<IEnumerator<T>>
{
public int Compare(IEnumerator<T>? x, IEnumerator<T>? y)
{
return comparer.Compare(x!.Current, y!.Current);
}
}
Остальной код программы не изменился. Время работы:
SplitSort done in 00:04:27.8391844
Merge done in 00:02:11.4364005
Значительно лучше, но до заветного часа на 100ГБ еще очень далеко. Тут стоит обратить внимание, что из-за кэша файловой системы время работы может варьироваться +-15%
Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/heapsort
Оптимизируем разбиение
Фазы разбиения и слияния выполняют одинаковое количество чтения-записи, создают одинаковое количество объектов типа string, но фаза разбиения использует в 2,5 раз больше памяти и запуск под отладчиком показывает множество сборок мусора.
Все дело во времени жизни объектов. В фазе слияния объект строки живет от чтения из чанка до записи в результирующий файл. Когда считывается следующая строка из чанка предыдущая уже превратилась мусор. Мусор убирается в нулевом поколении сборщика, это происходит быстро и память не растет.
В фазе разбиения объекты строк живут от чтения из исходного файла до записи в чанк. Большинство объектов строк переживает несколько сборок мусора, что создает повышенную активность сборщика и увеличивает потребляемую память.
Мы не можем уменьшить время жизни строк на фазе разбиения. Но их можно вообще не создавать! Можно прочитать из файла блок символов, разделить по символу перевода строки и использовать вместо строк тип ReadOnlyMemory<char>
, который предоставляет ту же функциональность. ReadOnlyMemory<char>
это структура (не требует аллокаций в управляемой куче), которая представляет из себя ссылку на массив, смещение и длину.
Код разбиения без аллокаций:
List<string> tempFiles = new();
List<Item> chunk = new();
using (var reader = File.OpenText(file))
{
var chunkBuffer = new char[chunkSize];
var chunkReadPosition = 0;
var eos = reader.EndOfStream;
while (!eos)
{
// Читаем из файла весь буфер
var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition));
eos = reader.EndOfStream;
var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead);
// Заполняем список строк ReadOnlyMemory<char> для сортировки
int linePos;
while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0))
{
var line = linePos >= 0 ? m[..linePos] : m;
chunk.Add(new Item(line, line.Span.IndexOf('.')));
m = m[(linePos + Environment.NewLine.Length)..];
}
chunk.Sort(comparer);
// Записываем строки из отсортированного списка во временный файл
var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file));
using (var tempFile = File.CreateText(tempFileName))
{
foreach (var (l, _) in chunk)
{
tempFile.WriteLine(l);
}
}
tempFiles.Add(tempFileName);
if (eos) break;
chunk.Clear();
//Отсток буфера переносим в начало
m.CopyTo(chunkBuffer);
chunkReadPosition = m.Length;
}
}
Можно было бы оставить код в функциональном стиле, но тогда код получился бы более неуклюжим из-за необходимости передачи флага конца файла.
В структурах данных заменил string
наReadOnlyMemory<char>
и больше ничего не изменилось.
Время работы при размере чанка в 100М символов, 161Мб на диске:
SplitSort done in 00:03:50.6780519
Merge done in 00:02:19.5627238
Удалось выиграть еще 30 сек и сократить расход памяти на фазе разбиения со 600 до 250 мегабайт. Как говорится Allocation is cheap… until it is not (статья от другом, но заголовок подходит).
К сожалению на этом все простые оптимизации кончились, а суммарное время работы все еще не позволит уложиться в час.
Как сравнивать строки
Для многих программистов сравнение строк это все еще посимвольное, а для тех кто пришел из С — побайтное сравнение. Но примерно с 2000 года все используют юникод. Юникод это не просто два байта на символ и кодировки переменной длины, вроде UTF8, это еще правила сравнения, нормализации и подсчета символов. Кто еще не в курсе - посмотрите доклад Plain Text Дилана Битти на NDC. Это один из лучших докладов за всю историю конференций.
Сравнение юникодных строк описано в стандарте Unicode Collation Algorithm (UCA). Это очень сложный алгоритм, который опирается на таблицы весов символом для разных культур. Этот алгоритм реализован в операционной системе (CompareStringW, CompareStringEx в Windows и CompareString
из libSystem.Globalization.Native.so
в Linux).
Конечно можно от этого всего отказаться и сравнивать строки посимвольно, это ускорит сортировку почти на минуту, так как .NET не использует системные API для этого. Достаточно указать StringComparison.Ordinal
в Comparer
. Кроме того, отказ от UCA позволяет использовать поразрядные (radix) алгоритмы сортировки, которые должны работать быстрее обычных. Но изменит порядок сортировки и фактически является оптимизацией под один частный случай. Не будет простых способов вернуться к UCA без потери быстродействия.
Один из шагов UCA — получение ключа сортировки (sort key) для строк — простого массива байт, который можно использовать для побайтного сравнения. Оказывается в .NET есть функция получения ключа сортировки строк CompareInfo.GetSortKey. То есть мы можем получить эти байты и потом сравнивать их. Если дописать в конец полученного массива байты числа, стоящего в начале, то мы можем всю сортировку свести к сортировке байтовых массивов.
Скоро 15 лет как я программирую на .NET и я узнал о наличии ключей сортировки строк и соответствующих классов только когда решал эту задачу.
Пытаемся оптимизировать сортировку
Для начала добавим получение ключей и сортировку по ним в методы разбиения и слияния:
List<string> tempFiles = new();
List<Item> chunk = new();
using (var reader = File.OpenText(file))
{
var keyBuffer = new byte[chunkSize * 2]; //Буфер для ключей
var chunkBuffer = new char[chunkSize];
var chunkReadPosition = 0;
var eos = reader.EndOfStream;
while (!eos)
{
// Читаем из файла весь буфер
var charsRead = reader.ReadBlock(chunkBuffer.AsSpan(chunkReadPosition));
eos = reader.EndOfStream;
var m = chunkBuffer.AsMemory(0, chunkReadPosition + charsRead);
var key = keyBuffer.AsMemory();
// Заполняем список строк ReadOnlyMemory<char> для сортировки
int linePos;
while ((linePos = m.Span.IndexOf(Environment.NewLine)) >= 0 || (eos && m.Length > 0))
{
var line = linePos >= 0 ? m[..linePos] : m;
var s = line.Span;
var dot = line.Span.IndexOf('.');
int x = int.Parse(s[..dot]);
s = s[(dot + 2)..];
var keyLen = culture.CompareInfo.GetSortKey(s, key.Span); // Получаем ключ
BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x); // Добписываем число в конец ключа, чтобы старшый байт был с меньшим индексом
keyLen += sizeof(int);
chunk.Add(new Item(line, key[..keyLen]));
m = m[(linePos + Environment.NewLine.Length)..];
key = key[keyLen..];
}
chunk.Sort(comparer);
// Записываем строки из отсортированного списка во временный файл
var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}" + Path.GetExtension(file));
using (var tempFile = File.CreateText(tempFileName))
{
foreach (var (l, _) in chunk)
{
tempFile.WriteLine(l);
}
}
tempFiles.Add(tempFileName);
if (eos) break;
chunk.Clear();
//Остаток буфера переносим в начало
m.CopyTo(chunkBuffer);
chunkReadPosition = m.Length;
}
}
При слиянии нам также надо получать ключи:
try
{
var mergedLines = tempFiles
.Select(f => File.ReadLines(f).Select(s => // Читаем построчно все файлы
{
var m = s.AsMemory();
var dot = s.IndexOf('.'); // Находим в строках точку
int x = int.Parse(s.AsSpan(0, dot));
// Получаем ключ того, что находится после точки с пробелом
var key = new byte[s.Length * 2 + sizeof(int)];
var keyLen = culture.CompareInfo.GetSortKey(m[(dot + 2)..].Span, key);
// Дописываем число в конец
BinaryPrimitives.WriteInt32BigEndian(key.AsSpan(keyLen), x);
return new Item(m, key);
}))
.Merge(comparer); //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T>
using var sortedFile = File.CreateText(Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file)));
foreach (var (l, _) in mergedLines)
{
sortedFile.WriteLine(l);
}
}
finally
{
tempFiles.ForEach(File.Delete);
}
Компаратор теперь очень простой:
public record struct Item(ReadOnlyMemory<char> Line, ReadOnlyMemory<byte> Key);
public class Comparer : IComparer<Item>
{
public int Compare(Item x, Item y)
{
return x.Key.Span.SequenceCompareTo(y.Key.Span);
}
}
Результаты ожидаемо хуже:
SplitSort done in 00:04:09.5091207
Merge done in 00:03:02.5646277
Мы проиграли 40 секунд на слиянии из-за получения ключей и 10 секунд на разбиении и сортировке. Сортировка ключей оказалась эффективнее, чем сортировка строк, но накладные расходы на получение ключей убили весь выигрыш.
Зато теперь можно применить поразрядную (Radix) сортировку ключей. Я написал два варианта поразрядной сортировки - Radix Quick Sort aka Multi-key QuickSort (просто перевел на C# алгоритм описанный в статье) и Counting Radix Sort (в основном скопировал код отсюда). К сожалению оба варианта проиграли стандартному Array.Sort
(Код этих сортировок в статье не привожу, чтобы не забивать объем, но вы сможете найти его в исходниках вместе с бенчмарками по ссылке в конце статьи). Скорее всего потому, что сравнение блоков памяти методом SequenceCompareTo
оптимизируется с помощью SIMD и работает гораздо быстрее, чем ручной код сравнения по разрядам.
Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key
На этом месте я устал и лег спать.
А что если сохранять ключи?
С этой мыслью я проснулся на следующий день.
-
Во-первых сохраняя ключи во временном файле мы можем не получать ключ сортировки через API в фазе слияния.
-
Во-вторых нам вообще даже не надо декодировать символы в фазе слияния, мы можем просто сохранять нужное количество байт в выходном файле.
-
В-третьих, спустившись на уровень файловых потоков (
FileStream
вместоStreamReader
) мы сможем эффективнее управлять буферизацией.
Я сделал бенчмарк, где сравнил все способы построчного чтения файлов, где сравнил File.ReadLines
, StreamReader
, FileStream
и различные варианты буферизации, а также модный молодежный PipeReader
. Победил, ожидаемо, FileStream
, как самый низкоуровневый инструмент. Кроме того если вы будете читать или записывать данные большими блоками, то выгодно отключать встроенную буферизацию .NET, а если маленькими, то указывать большой размер буфера (код бенчмарков по ссылке в конце статьи).
Много кода
Фаза разбиения
public void SplitSort()
{
using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan);
fileSize = stream.Length;
List<SortKey> chunk = new();
var keyBuffer = new byte[maxChunkSize];
var readBuffer = new byte[maxChunkSize];
var remainingBytes = 0;
var charBuffer = new char[1024];
var eof = false;
while (!eof)
{
var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof);
int chunkSize = remainingBytes + bytesRead;
if (!eof)
{
var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine);
if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length;
remainingBytes = remainingBytes + bytesRead - chunkSize;
}
chunk.AddRange(ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer));
//Сортируем и записываем чанки на диск
chunk.Sort(comparer);
WriteChunk(chunk);
chunk.Clear();
//Остаток буфера переносим в начало
if (remainingBytes > 0) readBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan());
}
}
Функция чтения строк и получения ключей сортировки
private IEnumerable<SortKey> ParseChunk(int byteCount, byte[] readBuffer, byte[] keyBuffer, char[] charBuffer)
{
var readPos = 0;
var key = keyBuffer.AsMemory();
while (byteCount > 0)
{
var linePos = readBuffer.AsSpan(readPos, byteCount).IndexOf(NewLine);
if (linePos == -1) linePos = byteCount;
if (charBuffer.Length < linePos) charBuffer = new char[linePos];
// Надо обязательно вызывать именно эту перегрузку, потому что остальные аллоцируют память
var lineLen = encoding.GetChars(readBuffer, readPos, linePos, charBuffer, 0);
var line = charBuffer.AsMemory(0, lineLen);
var s = line.Span;
var dot = s.IndexOf('.');
var x = int.Parse(s[0..dot]);
var keyLen = culture.CompareInfo.GetSortKey(s[(dot + 2)..], key.Span, compareOptions);
BinaryPrimitives.WriteInt32BigEndian(key[keyLen..].Span, x);
keyLen += sizeof(int);
var lineSize = linePos + NewLine.Length;
yield return new SortKey(readBuffer.AsMemory(readPos, lineSize), key[..keyLen]);
key = key[keyLen..];
readPos += lineSize;
byteCount -= lineSize;
maxLineSize = Math.Max(maxLineSize, lineSize);
maxKeyLength = Math.Max(maxKeyLength, keyLen);
}
}
Функция записи чанка на диск
void WriteChunk(List<SortKey> chunk)
{
// Записываем строки из отсортированного списка во временный файл
var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp");
using var stream = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan);
Span<byte> buffer = stackalloc byte[sizeof(int)];
foreach (var (line, key) in chunk)
{
BinaryPrimitives.WriteInt32LittleEndian(buffer, line.Length);
stream.Write(buffer);
stream.Write(line.Span);
BinaryPrimitives.WriteInt32LittleEndian(buffer, key.Length);
stream.Write(buffer);
stream.Write(key.Span);
}
tempFiles.Add(tempFileName);
}
Фаза слияния
public void Merge()
{
var mergedLines = tempFiles
.Select(ReadTempFile) // Читаем построчно все файлы, находим в строках точку
.Merge(comparer); //Слияние итераторов IEnumerable<IEnumerable<T>> в IEnumerable<T>
string sortedFileName = Path.ChangeExtension(file, ".sorted" + Path.GetExtension(file));
using var sortedFile = new FileStream(sortedFileName, FileMode.Create, FileAccess.Write, FileShare.None, BufferSize, FileOptions.SequentialScan);
sortedFile.SetLength(fileSize);
foreach (var (l, _) in mergedLines)
{
sortedFile.Write(l.Span);
}
}
Чтение временного файла
private IEnumerable<SortKey> ReadTempFile(string file)
{
using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, BufferSize, FileOptions.SequentialScan);
var maxBlockSize = maxLineSize + maxKeyLength + sizeof(int) * 2;
var readBuffer = new byte[Math.Max(BufferSize, maxBlockSize)];
var bytesRemaining = 0;
var eof = false;
while (!eof)
{
var bytesRead = stream.ReadBlock(readBuffer, bytesRemaining, readBuffer.Length - bytesRemaining, out eof);
if (bytesRead == 0) eof = true;
var mem = readBuffer.AsMemory(0, bytesRemaining + bytesRead);
while (mem.Length > maxBlockSize || (eof && mem.Length > 0))
{
var lineSize = BinaryPrimitives.ReadInt32LittleEndian(mem.Span);
mem = mem[sizeof(int)..];
var line = mem[..lineSize];
mem = mem[lineSize..];
var keyLen = BinaryPrimitives.ReadInt32LittleEndian(mem.Span);
mem = mem[sizeof(int)..];
yield return new SortKey(line, mem[..keyLen]);
mem = mem[keyLen..];
}
mem.CopyTo(readBuffer);
bytesRemaining = mem.Length;
}
}
Из 25 строк кода в самом начале, написанных даже без классов и метода Main, всё превратилось в 150 строк без учета конструктора и полей класса.
Результаты забега при установке размера чанка в 100М байт. Так как теперь вместе со строками записываются ключи размер одного временного файла на диске составляет 180МБ.
SplitSort done in 00:04:12.8286312
Merge done in 00:03:05.3477665
Результат приблизительно равен предыдущему, но это при учете что теперь мы пишем и читаем не 10Гб временных файлов, а 18гб. В таск менеджере заметно, что быстродействие теперь сильно упирается в диск.
Если быстродействие сильно упирается в диск, то нужно данные сжать. Так мне говорила бабушка прочитал в книге по базам данных. Завернем FileStream
в BrotliStream при записи и чтении временных файлов. Brotli — это новый алгоритм сжатия, который пока еще приходит в веб и другие аспекты разработки. Подробнее можно прочитать на википедии.
Результаты забега со сжатием
SplitSort done in 00:04:28.3044728
Merge done in 00:00:36.4300613
В сумме меньше 5 минут. Суммарный объем временных файлов на диске сократился до 970МБ, то есть почти в 20 раз. Это понятно, так как в файлах очень много повторяющихся строк. Возможно на других текстовых файлах результат будет не настолько выдающимся, но все равно написанные человеком или chatGpt тексты будут хороши сжиматься.
Код по ссылке https://github.com/gandjustas/HugeFileSort/tree/sort-key-with-compression
Быстродействие теперь упирается не в диск, а в процессор. И это хорошо. Диск у нас один, а процессоров зачастую больше.
Распараллеливание
Сейчас программа выполняется последовательно:
-
Чтение чанка (нагружает диск и не использует процессор)
-
Парсинг строк и получение ключей (нагружает процессор в основном)
-
Сортировка (сильно нагружает процессор)
-
Сжатие данных (сильно нагружает процессор)
-
Запись (сильно нагружает диск)
Было бы неплохо пункты 1 и 5 выполнять параллельно с 2-4.
Заведем пять отдельных потоков для каждой задачи. Для передачи чанков между потоками воспользуемся библиотекой System.Threading.Channels.
readToParse = Channel.CreateBounded<(byte[], int)>(1); // Буфер и размер
parseToSort = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1); // Список ключей, буфер строк и буфер ключей
sortToCompress = Channel.CreateBounded<(List<SortKey>, byte[], byte[])>(1)); // Список ключей, буфер строк и буфер ключей
compressToWrite = Channel.CreateBounded<(byte[], int)>(1); // Сжатые данные и размер
parserThreads =
Enumerable
.Range(0, degreeOfParallelism)
.Select(_ => Task.Run(ParallelParser)).ToArray();
sorterThreads =
Enumerable
.Range(0, degreeOfParallelism)
.Select(_ => Task.Run(ParallelSorter)).ToArray();
compressThreads =
Enumerable
.Range(0, degreeOfParallelism)
.Select(_ => Task.Run(ParallelCompressor)).ToArray();
writerThread = Task.Run(ParallelWriter);
Нам нужен ограниченный канал с емкостью в одно сообщение. Если сообщение уже есть в очереди, то есть получатели заняты обработкой предыдущего, то отправитель будет висеть в ожидании освобождения канала. Таким образом нагрузка будет автоматически балансироваться.
Метод SplitSort
изменим так, чтобы он мог работать как в синхронном режиме, так и в параллельном
using var stream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 0, FileOptions.SequentialScan);
fileSize = stream.Length;
List<SortKey>? chunk = null;
byte[]? keyBuffer = null;
char[]? charBuffer = null;
var readBuffer = pool!.Rent(maxChunkSize);
var remainingBytes = 0;
var eof = false;
while (!eof)
{
var bytesRead = stream.ReadBlock(readBuffer, remainingBytes, maxChunkSize - remainingBytes, out eof);
int chunkSize = remainingBytes + bytesRead;
if (!eof)
{
var lastNewLine = readBuffer.AsSpan(0, bytesRead).LastIndexOf(NewLine);
if (lastNewLine >= 0) chunkSize = lastNewLine + NewLine.Length;
remainingBytes = remainingBytes + bytesRead - chunkSize;
}
var oldBuffer = readBuffer;
if (degreeOfParallelism > 0)
{
await readToParse.Writer.WriteAsync((readBuffer, chunkSize));
readBuffer = pool.Rent(maxChunkSize);
}
else
{
chunk ??= new();
chunk.AddRange(ParseChunk(chunkSize, readBuffer,
keyBuffer ??= pool.Rent(maxChunkSize),
charBuffer ??= new char[1024]));
//Сортируем и записываем чанки на диск
chunk.Sort(comparer);
WriteChunk(chunk);
chunk.Clear();
}
//Осаток буфера переносим в начало
if (remainingBytes > 0) oldBuffer.AsSpan(chunkSize, remainingBytes).CopyTo(readBuffer.AsSpan());
}
if (degreeOfParallelism == 0)
{
if (readBuffer != null) pool.Return(readBuffer);
if (keyBuffer != null) pool.Return(keyBuffer);
}
Если параметр degreeOfParallelism
равен нулю, то код будет выполнятся последовательно, как и раньше. Если degreeOfParallelism >= 1
, то после чтения чанка он отправится в readToParse
канал и основной поток сразу же начнет читать второй чанк.
Очевидно в таком случае одним буфером для строк и ключей обойтись не получится, буферы придется каждый раз выделять новые. Чтобы не забить всю память таким образом я сразу применил ArrayPool. Ничего сложного нет: вместо оператора new вызываем метод Rent
, а когда перестали пользоваться - вызываем Return
.
ParallelParser
, ParallelSorter
и ParallelWriter
выглядят так:
private async Task ParallelParser()
{
var charBuffer = new char[1024];
await foreach (var (readBuffer, chunkSize) in readToParse.Reader.ReadAllAsync())
{
var keyBuffer = pool!.Rent(maxChunkSize);
var chunk = ParseChunk(chunkSize, readBuffer, keyBuffer, charBuffer).ToList();
await parseToSort.Writer.WriteAsync((chunk, readBuffer, keyBuffer));
}
}
private async Task ParallelSorter()
{
await foreach (var item in parseToSort.Reader.ReadAllAsync())
{
item.Item1.Sort(comparer);
await sortToCompress.Writer.WriteAsync(item);
}
}
private async Task ParallelWriter()
{
await foreach (var (buffer, bufferLength) in compressToWrite.Reader.ReadAllAsync())
{
var tempFileName = Path.ChangeExtension(file, $".part-{tempFiles.Count}.tmp");
using (var tempFile = new FileStream(tempFileName, FileMode.Create, FileAccess.Write, FileShare.None, 0, FileOptions.SequentialScan))
{
await tempFile.WriteAsync(buffer.AsMemory(0, bufferLength));
}
pool!.Return(buffer);
tempFiles.Add(tempFileName);
}
}
Они построены по простому принципу - читаем сообщения из канала пока они не кончатся, на каждое сообщение выполняем свое действие и отправляем дальше.
ParallelCompressor построен по тому же принципу, но содержит больше кода. Уберу его под спойлер.
Код ParallelCompressor
private async Task ParallelCompressor()
{
var buffer = new byte[1024]; //Buffer with margin
var outputSize = BrotliEncoder.GetMaxCompressedLength(maxChunkSize * 2);
await foreach (var (chunk, readBuffer, keyBuffer) in sortToCompress.Reader.ReadAllAsync())
{
using var encoder = new BrotliEncoder(4, 22);
var output = pool!.Rent(outputSize);
var dest = output.AsMemory();
var compressed = 0;
foreach (var sk in chunk)
{
if (sk.Length > buffer.Length)
{
buffer = new byte[sk.Length];
}
sk.Write(buffer, 0);
var source = buffer.AsMemory(0, sk.Length);
while (true)
{
var r = encoder.Compress(source.Span, dest.Span, out var bytesConsumed, out var bytesWritten, false);
compressed += bytesWritten;
if (bytesConsumed > 0) source = source[bytesConsumed..];
if (bytesWritten > 0) dest = dest[bytesWritten..];
if (r == OperationStatus.Done) break;
if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData)
{
throw new InvalidOperationException();
}
var old = output;
outputSize *= 2;
output = pool.Rent(outputSize);
old.CopyTo(output, 0);
pool.Return(old);
dest = output.AsMemory(compressed);
}
}
while (true)
{
var r = encoder.Flush(dest.Span, out var bytesWritten);
compressed += bytesWritten;
if (r == OperationStatus.Done) break;
if (r == OperationStatus.InvalidData || r == OperationStatus.NeedMoreData)
{
throw new InvalidOperationException();
}
var old = output;
outputSize *= 2;
output = pool.Rent(outputSize);
old.CopyTo(output, 0);
pool.Return(old);
dest = output.AsMemory(compressed);
}
outputSize = compressed * 11 / 10;
await compressToWrite.Writer.WriteAsync((output, compressed));
pool.Return(readBuffer);
pool.Return(keyBuffer);
}
}
Из отсортированного списка строк и ключей записываем все в энкодер, а он периодически отдает нам блок упакованных данных. В конце надо вызвать Flush. Все осложняется тем, что метод может выполниться частично и сказать, что для продолжения недостаточно места в целевом буфере. Тогда надо выделить буфер побольше и перенести туда данные из старого.
В конце код завершения параллельной обработки: завершаем очереди и ждем завершения потоков.
readToParse.Writer.Complete();
await parserThread;
parseToSort.Writer.Complete();
await sorterThread;
sortToCompress.Writer.Complete();
await compressThread;
compressToWrite.Writer.Complete();
await writerThread;
Запускаем с размером чанка в 200 мегабайт.
SplitSort done in 00:02:21.4203828
Merge done in 00:00:39.0610435
Три минуты в сумме, есть шанс уложиться в час для 100Гб.
Посмотрим в таск менеджер:
Потребление памяти выросло с 400Мб до 5,3Гб, это уже много. Почему так?
Когда код выполнялся последовательно для всех операцию использовался один набор буферов - для чтения данных, для ключей, список для сортировки и буфер для временного файла. Когда мы перешли в параллельный вариант у нас таких наборов как минимум количеству потоков + количеству каналов и свободных мест в них.
Такова, к сожалению, цена параллельности. Очень редко можно распараллелить обработку данных, не повышая размер используемой оперативной памяти.
Нагрузка на диск получилась небольшая, стоит добавить еще потоков для парсинга, сортировки и сжатия данных, то есть увеличить степень параллелизма (dop). Но это увеличит затраты памяти. Можно уменьшать размер чанка при повышении степени параллелизма.
// Значения по умолчанию
dop = Environment.ProcessorCount / 4;
chunkSize = 200 / int.Max(dop, 1);
Финальный прогон с дефолтными параметрами (dop=4, chunkSize=50)
SplitSort done in 00:00:53.8610345
Merge done in 00:00:39.7727140
Итого 1:40 (не более 1:50 за несколько прогонов).
Код со всеми бенчмарками по ссылке.
Заключение
Я очень сильно ошибся, думая что задача сортировки 100Гб файла простая. Для её решения нужно много знаний алгоритмов, библиотек, навык оптимизации программ и написания параллельного кода. А самое главное эта задача хорошо показывает способен ли программист преодолевать технические трудности и решать задачу до конца, а не пытаться найти короткий путь и опустить руки, если такого пути нет.
PS
❯ .Sort.exe ........100gb.txt
SplitSort done in 00:11:35.9023876
Merge done in 00:20:16.3989011
Автор: Стас Выщепан