Здесь было много статей об универсальных Lock-free объектах, однако, для некоторых частных случаев они излишне громоздки. Мой случай как раз таким и являлся: требовалось организовать одностороннюю передачу информации от одного потока другому. Главный поток запускает рабочий, после чего он может только запросить его остановку и никак больше управлять он им не может. В свою очередь рабочий поток может уведомлять главный о своем текущем состоянии (прогрессе выполнения), а также отсылать промежуточные результаты выполнения. Получается, что требуется только передача данных от рабочего к главному потоку.
Разумеется, возможно, я изобрёл велосипед или, хуже того, велосипед с глюками. Поэтому комментарии и критика очень приветствуются!
Объект состояния
Состояние нашего рабочего потока представлено в виде некоторого класса. При этом главный поток не обязан всегда забирать данные, хранящиеся в объекте состояния (например, не важно, если главный поток пропустит промежуточное значение прогресса выполнения, ему важно получить последнее актуальное на данный момент).
Для реализации lock-free передачи состояния нам потребуется три его экземпляра (разных объектов одного класса):
var
ReadItem: TLockFreeWorkState;
CurrentItem: TLockFreeWorkState;
WriteItem: TLockFreeWorkState;
Идея такова: рабочий поток имеет свободный доступ к объекту WriteItem. Когда все данные сохранены выполняется операция InterlockedExchange с объектом в CurrentItem, после чего главный поток каким-то образом уведомляется о готовности нового состояния (в моем примере использован обычный PostMessage). Главный поток в обработчике уведомления выполняет операцию InterlockedExchange объекта CurrentItem с объектом ReadItem, после чего может свободно читать данные из ReadItem.
Получается такой себе «пузырек»: данные о состоянии появляются во WriteItem и далее «всплывают» через CurrentItem в ReadItem. Кстати, я не придумал нормального названия для базового класса такой структуры, поэтому назвал просто TLockFreeWorkState (возможно у кого-то найдется идея получше).
Тут есть один нюанс: главный поток может обращаться за текущим состоянием в любое время. Если мы всегда будет выполнять InterlockedExchange, то попеременно будем возвращать актуальное и предыдущее состояние.
Предотвратить это нам поможет обычный флажок Newest в классе. При записи состояния рабочий поток всегда выставляет WriteItem.Newest := True, и после InterlockedExchange этот флажок оказывается в CurrentItem. Главный поток в начале проверяет CurrentItem.Newest и, только если он True, делает InterlockedExchange после чего сразу его сбрасывает ReadItem.Newest в False. Я посчитал, что читать CurrentItem.Newest из главного потока безопасно, но поправьте меня, если не прав.
Теперь все это в виде упрощенного кода (опущено привидение типов для большей наглядности):
type
TLockFreeWorkState = class
public
Newest: Boolean;
end;
function Read(var CurrentItem, ReadItem: TLockFreeWorkState): Boolean;
begin
if CurrentItem.Newest then begin
ReadItem := InterlockedExchangePointer(CurrentItem, ReadItem);
ReadItem.Newest := False;
Result := True;
end else
Result := False;
end;
procedure Write(var CurrentItem, WriteItem: TLockFreeWorkState);
begin
WriteItem.Newest := True;
WriteItem := InterlockedExchangePointer(CurrentItem, WriteItem);
end;
Объект очереди
В чем-то тут подход схожий, но для реализации нам потребуется изначально только один объект, но две ссылки на него:
var
ReadQueue: TLockFreeWorkQueue;
WriteQueue: TLockFreeWorkQueue;
Изначально создается один экземпляр TLockFreeWorkQueue и записывается в переменные ReadQueue и WriteQueue. Класс представляет собой кольцевой буфер и имеет следующее описание:
TLockFreeWorkQueue = class
public
Head: Integer;
Tail: Integer;
Items: array[0..QueueCapacity - 1] of TObject;
end;
где QueueCapacity является некоторой константой (больше нуля), которая определяет длину кольцевого буфера.
При добавлении элемента в очередь рабочий поток выполняет InterlockedExchangeComparePointer элемента WriteQueue.Items[Tail]. При этом элемент сравнивается с Nil и в случае успеха в него записывается добавляемый элемент. Если операция прошла успешно, то значение Tail увеличивается на 1 и сбрасывается в 0, если достигнут QueueCapacity. Мы можем свободно оперировать с Tail, так как доступ к этой переменной имеет только рабочий поток (поток-писатель). Также после этого рабочий поток должен уведомить главный о том, что в очереди появились элементы. Если операция не удалась, то это означает, что очередь заполнена, но об этом позже.
Главный поток по уведомлению от рабочего начинает цикл чтения элементов из очереди (на самом деле чтение можно начинать в любой момент). Для извлечения элемента вызывается InterlockedExchangePointer для элемента ReadQueue.Items[Head] куда записывается значение Nil. Если извлеченный элемент не Nil, то значение Head увеличивается на 1 и сбрасывается в 0, если достигнут QueueCapacity.
Теперь разберемся со случаем переполнения буфера. Для новых элементов мы вполне может создать новый объект очереди и продолжить писать в него, а чтобы этот объект можно было найти потоку-читателю, мы должны передать на него ссылку в текущем заполненном объекте очереди. Для этого добавим дополнительное поле NextQueue в класс:
TLockFreeWorkQueue = class
public
Head: Integer;
Tail: Integer;
Items: array[0..QueueCapacity - 1] of TObject;
NextQueue: TLockFreeWorkQueue;
end;
Теперь если при записи элемента InterlockedExchangeComparePointer возвращает не Nil (очередь заполнена), то создаем новый объект очереди NewWriteQueue: TLockFreeWorkQueue, записываем добавляемый элемент в нее, выполняем InterlockedExchangePointer с переменной WriteQueue.NextQueue и в конце сохраняем NewWriteQueue в переменной WriteQueue. Таким образом после этой операции значения в ReadQueue и WriteQueue уже будут ссылаться на разные объекты.
В главном потоке нам нужно добавить обработку пустой очереди. Если при чтении InterlockedExchangePointer для элемента ReadQueue.Items[Head] возвращает Nil, то нам необходимо дополнительно проверить поле NextQueue, для чего мы также выполняем InterlockedExchangePointer(ReadQueue.NextQueue, Nil). Если при этом возвращается не Nil, то сохраняем объект в NewReadQueue, удаляем текущий объект ReadQueue, и присваиваем этой переменной значение NewReadQueue.
Вот упрощенный код для операций добавления элемента в очередь:
procedure AddQueueItem(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
NewWriteQueue: TLockFreeWorkQueue;
begin
if InterlockedCompareExchangePointer(WriteQueue.Items[WriteQueue.Tail]), Item, Nil) = Nil then begin
// Added successfully
Inc(WriteQueue.Tail);
if WriteQueue.Tail = QueueCapacity then
WriteQueue.Tail := 0;
end else begin
// WriteQueue full. Create new chained queue.
NewWriteQueue := TLockFreeWorkQueue.Create;
NewWriteQueue.Items[0] := Item;
Inc(NewWriteQueue.Tail);
if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
NewWriteQueue.Tail := 0;
InterlockedExchangePointer(WriteQueue.NextQueue, NewWriteQueue);
WriteQueue := NewWriteQueue;
end;
end;
и извлечения элемента из очереди:
function ExtractQueueItem(var ReadQueue: TLockFreeWorkQueue): TObject;
var
NewReadQueue: TLockFreeWorkQueue;
begin
Result := Nil;
repeat
Result := InterlockedExchangePointer(ReadQueue.Items[ReadQueue.Head], Nil);
if Result = Nil then begin
// No new items in this queue. Check next queue is available
NewReadQueue := InterlockedExchangePointer(ReadQueue.NextQueue, Nil);
if Assigned(NewReadQueue) then begin
ReadQueue.Free;
ReadQueue := NewReadQueue;
end else
// No new item in queue
Exit;
end;
until Result <> Nil;
// Item extracted successfully
Inc(ReadQueue.Head);
if ReadQueue.Head = QueueCapacity then
ReadQueue.Head := 0;
end;
В этом коде я возможно несколько перестраховался. Не уверен, что для операций с полем NextQueue вообще нужно применять InterlockedExchangePointer, возможно будет безопасным выполнять прямое чтение и запись.
Тестовый пример
Рабочий и причесанный код вместе с простеньким консольным примером можно посмотреть под спойлером.
program LockFreeTest;
{$APPTYPE CONSOLE}
{$R *.res}
uses
SysUtils, Classes, Windows, Messages;
// Lock-free work thread state ////////////////////////////////////////////////
type
TLockFreeWorkState = class
protected
FNewest: Boolean;
public
class function Read(var CurrentItem, ReadItem): Boolean;
class procedure Write(var CurrentItem, WriteItem);
property Newest: Boolean read FNewest write FNewest;
end;
class function TLockFreeWorkState.Read(var CurrentItem, ReadItem): Boolean;
begin
if TLockFreeWorkState(CurrentItem).Newest then begin
pointer(ReadItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(ReadItem));
TLockFreeWorkState(ReadItem).Newest := False;
Result := True;
end else
Result := False;
end;
class procedure TLockFreeWorkState.Write(var CurrentItem, WriteItem);
begin
TLockFreeWorkState(WriteItem).Newest := True;
pointer(WriteItem) := InterlockedExchangePointer(pointer(CurrentItem), pointer(WriteItem));
end;
// Lock-free work thread queue ////////////////////////////////////////////////
type
TLockFreeWorkQueue = class
public const
QueueCapacity = 4; // Small value for test purposes
public type
TLockFreeWorkQueueItems = array[0..QueueCapacity - 1] of TObject;
public
Head: Integer; // Access from main thread only
Tail: Integer; // Access from work thread only
NextQueue: TLockFreeWorkQueue;
Items: TLockFreeWorkQueueItems;
public
destructor Destroy; override;
class procedure Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject); static;
class function Extract(var ReadQueue: TLockFreeWorkQueue): TObject; static;
end;
destructor TLockFreeWorkQueue.Destroy;
var
i: Integer;
begin
// Free non-extracted items
for i := 0 to QueueCapacity - 1 do
Items[i].Free;
// Free NextQueue if exists
NextQueue.Free;
inherited;
end;
class procedure TLockFreeWorkQueue.Add(var WriteQueue: TLockFreeWorkQueue; Item: TObject);
var
NewWriteQueue: TLockFreeWorkQueue;
begin
// Check item assigned (can't add empty items)
if not Assigned(Item) or not Assigned(WriteQueue) then
Exit;
if InterlockedCompareExchangePointer(pointer(WriteQueue.Items[WriteQueue.Tail]), pointer(Item), Nil) = Nil then begin
// Added successfully
Inc(WriteQueue.Tail);
if WriteQueue.Tail = QueueCapacity then
WriteQueue.Tail := 0;
end else begin
// WriteQueue full. Create new chained queue.
NewWriteQueue := TLockFreeWorkQueue.Create;
NewWriteQueue.Items[0] := Item;
Inc(NewWriteQueue.Tail);
if NewWriteQueue.Tail = QueueCapacity then // Check single-item queue
NewWriteQueue.Tail := 0;
InterlockedExchangePointer(pointer(WriteQueue.NextQueue), NewWriteQueue);
WriteQueue := NewWriteQueue;
end;
end;
class function TLockFreeWorkQueue.Extract(var ReadQueue: TLockFreeWorkQueue): TObject;
var
NewReadQueue: TLockFreeWorkQueue;
begin
Result := Nil;
if not Assigned(ReadQueue) then
Exit;
repeat
Result := InterlockedExchangePointer(pointer(ReadQueue.Items[ReadQueue.Head]), Nil);
if Result = Nil then begin
// No new items in this queue. Check next queue is available
NewReadQueue := InterlockedExchangePointer(pointer(ReadQueue.NextQueue), Nil);
if Assigned(NewReadQueue) then begin
ReadQueue.Free;
ReadQueue := NewReadQueue;
end else
// No new item in queue
Exit;
end;
until Result <> Nil;
// Item extracted successfully
Inc(ReadQueue.Head);
if ReadQueue.Head = QueueCapacity then
ReadQueue.Head := 0;
end;
// Test work thread ///////////////////////////////////////////////////////////
const
WM_MAINNOTIFY = WM_USER + 1;
type
TWorkThreadState = class(TLockFreeWorkState)
public
Progress: Integer;
end;
TWorkThreadQueueItem = class
public
ItemData: Integer;
end;
TWorkThread = class(TThread)
protected
FMainHandle: THandle;
FMainNotified: Integer;
// State fields
FStateRead: TWorkThreadState;
FStateCurrent: TWorkThreadState;
FStateWrite: TWorkThreadState;
// Queue fields
FQueueRead: TLockFreeWorkQueue;
FQueueWrite: TLockFreeWorkQueue;
// Debug (test) fiels
FDebugReadQueue: Boolean;
procedure Execute; override;
procedure SetState;
procedure AddQueueItem(Item: TWorkThreadQueueItem);
procedure NotifyMain;
public
constructor Create(CreateSuspended: Boolean);
destructor Destroy; override;
function GetState: TWorkThreadState;
function ExtractQueueItem: TWorkThreadQueueItem;
procedure NotificationProcessed;
property MainHandle: THandle read FMainHandle;
end;
constructor TWorkThread.Create(CreateSuspended: Boolean);
begin
inherited Create(CreateSuspended);
// State objects
FStateRead := TWorkThreadState.Create;
FStateCurrent := TWorkThreadState.Create;
FStateWrite := TWorkThreadState.Create;
// Queue objects
FQueueRead := TLockFreeWorkQueue.Create;
FQueueWrite := FQueueRead;
end;
destructor TWorkThread.Destroy;
begin
inherited;
FStateRead.Free;
FStateCurrent.Free;
FStateWrite.Free;
// Always destroy read queue only: only read queue may have NextQueue reference
FQueueRead.Free;
end;
procedure TWorkThread.NotifyMain;
begin
if InterlockedExchange(FMainNotified, 1) = 0 then
PostMessage(FMainHandle, WM_MAINNOTIFY, 0, 0);
end;
procedure TWorkThread.NotificationProcessed;
begin
InterlockedExchange(FMainNotified, 0);
end;
function TWorkThread.GetState: TWorkThreadState;
begin
TLockFreeWorkState.Read(FStateCurrent, FStateRead);
Result := FStateRead;
end;
procedure TWorkThread.SetState;
begin
TLockFreeWorkState.Write(FStateCurrent, FStateWrite);
end;
procedure TWorkThread.AddQueueItem(Item: TWorkThreadQueueItem);
begin
TLockFreeWorkQueue.Add(FQueueWrite, Item);
end;
function TWorkThread.ExtractQueueItem: TWorkThreadQueueItem;
begin
Result := TWorkThreadQueueItem(TLockFreeWorkQueue.Extract(FQueueRead));
end;
procedure TWorkThread.Execute;
const
TestQueueCountToFlush = 10;
var
ProgressIndex: Integer;
TestQueueCount: Integer;
Item: TWorkThreadQueueItem;
begin
TestQueueCount := 0;
ProgressIndex := 0;
while not Terminated do begin
// Send current progress
if FStateWrite.Progress <> ProgressIndex then begin
// All state object fields initialization required
FStateWrite.Progress := ProgressIndex;
SetState;
NotifyMain;
end;
// Emulate calculation
Sleep(500);
Inc(ProgressIndex);
// Put intermediate result in queue
Item := TWorkThreadQueueItem.Create;
Item.ItemData := ProgressIndex;
AddQueueItem(Item);
Inc(TestQueueCount);
if TestQueueCount = TestQueueCountToFlush then begin
TestQueueCount := 0;
// Allow queue reading from main thread
FDebugReadQueue := True;
NotifyMain;
end;
end;
end;
// Test application ///////////////////////////////////////////////////////////
type
TMain = class
protected
FHandle: THandle;
FThread: TWorkThread;
procedure WndProc(var Message: TMessage);
public
constructor Create;
destructor Destroy; override;
function Run: Boolean;
property Handle: THandle read FHandle;
end;
var
Main: TMain;
constructor TMain.Create;
begin
FHandle := AllocateHWnd(WndProc);
FThread := TWorkThread.Create(True);
FThread.FMainHandle := Handle;
FThread.Start;
writeln('Work thread started');
end;
destructor TMain.Destroy;
begin
writeln('Stopping work thread...');
FThread.Free;
writeln('Work thread stopped');
DeallocateHWnd(FHandle);
inherited;
end;
procedure TMain.WndProc(var Message: TMessage);
var
State: TWorkThreadState;
Item: TWorkThreadQueueItem;
begin
if Message.Msg = WM_MAINNOTIFY then begin
FThread.NotificationProcessed;
State := FThread.GetState;
// Show current progress
writeln('Work progress ', State.Progress);
// Check queue reading allowed
if FThread.FDebugReadQueue then begin
writeln('Read queue...');
repeat
Item := FThread.ExtractQueueItem;
try
if Assigned(Item) then
writeln('Queue item: ', Item.ItemData);
finally
Item.Free;
end;
until not Assigned(Item);
FThread.FDebugReadQueue := False;
end;
end else
Message.Result := DefWindowProc(Handle, Message.Msg, Message.wParam, Message.lParam);
end;
function TMain.Run: Boolean;
var
Msg: TMsg;
begin
writeln('Start message loop (Ctrl+C to break)');
Result := True;
while Result do
case Integer(GetMessage(Msg, Handle, 0, 0)) of
0:
Break;
-1:
Result := False;
else
begin
TranslateMessage(Msg);
DispatchMessage(Msg);
end;
end;
end;
// Console event handler //////////////////////////////////////////////////////
function ConsoleEventProc(CtrlType: DWORD): BOOL; stdcall;
begin
Result := False;
case CtrlType of
CTRL_CLOSE_EVENT,
CTRL_C_EVENT,
CTRL_BREAK_EVENT:
if Assigned(Main) then begin
PostMessage(Main.Handle, WM_QUIT, 0, 0);
Result := True;
end;
end;
end;
// Main procedure /////////////////////////////////////////////////////////////
begin
{$IFDEF DEBUG}
ReportMemoryLeaksOnShutdown := True;
{$ENDIF}
try
SetConsoleCtrlHandler(@ConsoleEventProc, True);
Main := TMain.Create;
try
Main.Run;
finally
FreeAndNil(Main);
end;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
end.
В нормальной ситуации при появлении элемента в очереди он при первой возможности должен извлекаться главным потоком. Однако для тестирования переполнения очереди я добавил поле TWorkThread.FDebugReadQueue, которое при значении False запрещает главному потоку читать из очереди (в методе TWorkThread.Execute введна константа TestQueueCountToFlush = 10, которая разрешает главному потоку чтение только после 10 добавленных элементов).
К сожалению тестовый пример слишком прост и не генерирует коллизий чтения/записи между потоками, когда переключение потока происходит внутри служебных функций чтения/записи. Но тут я не уверен, можно ли вообще проверить все узкие места алгоритма и во что нужно превратить код для этого.
Автор: goobit