Моя «парадигма» работы с потоками

в 11:02, , рубрики: Delphi, параллельное программирование, потоки, Программирование, синхронизация, метки: , ,

Моя «парадигма» работы с потокамиКогда я учился писать многопоточные приложения — я перечитал кучу литературы и справочной информации по этой области. Но между теорией и практикой — огромная пропасть. Я набил кучу шишек, и до сих пор иногда получаю по голове от собственных потоков. Для себя я выработал набор некоторых правил, которым стараюсь строго следовать, и это значительно помогает мне в написании многопоточного кода.

Поскольку ошибки, связанные с синхронизацией потоков крайне сложно отлаживать, то самым эффективным способом тут является предупреждение этих самых ошибок. Для этого используются различные парадигмы программирования на разных уровнях абстракции. Нижним уровнем абстракции будем считать работу с объектами синхронизации (критические секции, мьютексы, семафоры). Верхним — такие парадигмы программирования, как Futures and promises, STM (software transactional memory), обмен асинхронными сообщениями и т.п. Верхний уровень абстракции зачастую всегда основан на нижнем.

В данной статье я поделюсь своим стилем написания кода на нижнем уровне абстракции. Поскольку я дельфист, то все примеры будут на Delphi, однако все нижесказанное справедливо и для других языков программирования (позволяющих работать с объектами синхронизации конечно)

Потокобезопасный объект

Первое правило — между потоками работать только с потокобезопасными объектами. Это самое простое, логичное и понятное правило. Однако даже тут есть некоторые особенности. Объект должен быть целиком потокобезопасный, а это значит что все public методы (кроме конструктора и деструктора) нужно синхронизировать. Конструкторы и деструкторы в свою очередь должны быть всегда синхронизированы снаружи объекта. Одна из ошибок на ранних этапах работы с потоками — я забывал о синхронизации конструкторов и деструкторов. И если в делфи с конструктором проблем нет (мы получаем указатель на объект только когда конструктор уже отработал), то с деструктором надо быть внимательным. Синхронизация деструкторов — очень скользкая тема, и я не могу дать каких-либо указаний, как лучше её реализовывать (я не гений многопоточного программирования, а только учусь ;) ). Сам я стараюсь проводить такую синхронизацию через деструктор класса TThread, но это справедливо только для объектов, которые существуют всю жизнь потока.

Блокировки

Описание

Другая распространенная проблема — это взаимные блокировки (deadlock-и). Несмотря на то, что это наиболее распространенная проблема, возникающая при синхронизации — тут есть одно не очевидное правило. Если поток единовременно выполняет не больше одной синхронизации, то никаких дедлоков не будет. Здесь под словом синхронизация — я имею ввиду как блокировку ресурса, так и ожидание какого-либо ресурса. Таким образом остановка на мьютексе, закрытие мьютекса, вход в семафор, вход критическую секцию, или отправка сообщения (SendMessage) — это все синхронизации. И в самом деле, если поток А ожидает ресурс, и при этом он не заблокировал ни один ресурс, то его в свою очередь никто не ожидает, а значит взаимной блокировки быть не может.

Примеры

Понимание и строгое выполнение данного условия — ключ к отсутствию deadlock-ов. Давайте рассмотрим на примере, о чем я говорю. Допустим у нас есть некоторый класс:

TMyObj = class
private
  FCS: TCriticalSection;
  FA: Integer;
  FB: Integer;
public
  property A: Integer read GetA write SetA;
  property B: Integer read GetB write SetB;
  function DoSomething: Integer;
  //...другие методы
end;

Следуя тому, что у нас должен быть потокобезопасный объект — я реализовал свойства A и B через геттеры и сеттеры с критической секцией:

function TMyObj.GetA: Integer;
begin
  FCS.Enter;
  try
    Result := FA;
  finally
    FCS.Leave;
  end;
end;

function TMyObj.GetB: Integer;
begin
  FCS.Enter;
  try
    Result := FB;
  finally
    FCS.Leave;
  end;
end;

procedure TMyObj.SetA(const Value: Integer);
begin
  FCS.Enter;
  try
    FA := Value;
  finally
    FCS.Leave;
  end;
end;

procedure TMyObj.SetB(const Value: Integer);
begin
  FCS.Enter;
  try
    FB := Value;
  finally
    FCS.Leave;
  end;
end;

Допустим функция DoSomething у нас работает с A и B как-то так:

function TMyObj.DoSomething: Integer;
begin
  Result := SendMessage(SomeHandle, WM_MYMESSAGE, A mod 3, B mod 4);
end;

Эй, но мы ведь используем одну критическую секцию для A и для B, скажет неискушенный писатель. И сразу же «оптимизирует» этот кусок:

function TMyObj.DoSomething: Integer;
begin
  FCS.Enter;
  try
    Result := SendMessage(SomeHandle, WM_MYMESSAGE, FA mod 3, FB mod 4);
  finally
    FCS.Leave;
  end;
end;

И это будет ошибка. Теперь, если мы в обработчике WM_MYMESSAGE попытаемся обратиться к полю A или B — мы получим дедлок. Данный дедлок — очевиден, так как объем кода маленький, данные простые. Но оно становится не тривиальным, когда когда код огромен, появляется куча связей и зависимостей. Согласно правилу — работать только с одной синхронизацией одновременно, вышеописанный код можно «оптимизировать» так:

function TMyObj.DoSomething: Integer;
var k, n: Integer;
begin
  FCS.Enter;
  try
    k := FA mod 3;
    n := FB mod 4;
  finally
    FCS.Leave;
  end;
  Result := SendMessage(SomeHandle, WM_MYMESSAGE, k, n);
end;

Поэтому всегда, прежде чем вызвать новую синхронизацию — нужно освободить другие объекты синхронизации. Код в духе:

FCS1.Enter;
try
  //bla bla bla
  FCS2.Enter;
  try
    //bla bla bla
  finally
    FCS2.Leave;
  end;
  //bla bla bla
finally
  FCS1.Leave;
end;

В большинстве случаев можно считать многопоточным быдлокодом. Я думаю вы уже представляете как надо его переписать:

FCS1.Enter;
try
  //bla bla bla
  //bla bla bla
  //сохраняем в стеке/куче данные, которые нам понадобятся при работе внутри FCS2 
finally
  FCS1.Leave;
end;

FCS2.Enter;
try
  //используем данные из стека/кучи
  //bla bla bla
finally
  FCS2.Leave;
end;

По данному подходу видно, что нам приходится копировать данные, что может отразиться на производительности. Однако в большинстве случаев объемы данных не велики, и мы можем позволить копировать их. Трижды четырежды подумайте, чтобы применять подход без копирования.

Диагностика

На уровне компилирования такое диагностировать не получится. Однако можно провести диагностику в реалтайме. Для этого нам надо хранить текущий объект синхронизации для каждого потока. Вот пример реализации средства диагностики на Delphi.

procedure InitSyncObject;
procedure PushSyncObject(handle: Cardinal); overload;
procedure PushSyncObject(obj: TObject); overload;
procedure PopSyncObject;

implementation

threadvar syncobj: Cardinal;
          synccnt: Cardinal;

procedure InitSyncObject;
begin
  syncobj := 0;
  synccnt := 0;
end;

procedure PushSyncObject(handle: Cardinal);
begin
  if handle = 0 then
    raise EProgrammerNotFound.Create('Нельзя захватить несуществующий объект');

  if (syncobj <> 0) and (handle <> syncobj)  then
    raise EProgrammerNotFound.Create('Попытка воспользоваться двумя объектами синхронизации преследуется по закону');

  syncobj := handle;
  inc(synccnt);
end;

procedure PushSyncObject(obj: TObject);
begin
  PushSyncObject(Cardinal(obj));
end;

procedure PopSyncObject;
begin
  if (syncobj = 0) or (synccnt = 0) then
    raise EProgrammerNotFound.Create('Слишком много освобождений объекта');
  Dec(synccnt);
  if synccnt = 0 then syncobj := 0;
end;

Вызываем InitSyncObject когда стартуем новый поток.
Перед захватом объекта синхронизации вызываем PushThreadObject, после освобождения объекта синхронизации вызываем PopThreadObject.
Для удобства использования данных функций рекомендую скопировать код модуля SyncObjs.pas в новый, скажем SyncObjsDbg.pas. В нем есть базовый класс объекта синхронизации:

  TSynchroObject = class(TObject)
  public
    procedure Acquire; virtual;
    procedure Release; virtual;
  end;

В Acquire добавить вызов PushSyncObject(Self), а в Release PopSyncObject. Так же не забываем обрамить в эти функции WaitFor методы у THandleObject. Кроме того, если используем метод TThread.Synchronize то до вызова сохраняем объект TThread, а после извлекаем его (PopSyncObject), если используем API функции SendMessage или WaitFor функции, то до вызова сохраняем хендл (PushSyncObject), после — извлекаем (PopSyncObject).
Вот и все, теперь при попытке захватить второй объект синхронизации — будет возникать исключение, а модули (SyncObjs/SyncObjsDbg) можно менять через дефайны.

Плохой код

В качестве примера плохого кода возьмем… класс TThreadList из модуля Classes.pas

  TThreadList = class
  private
    FList: TList;
    FLock: TRTLCriticalSection;
    FDuplicates: TDuplicates;
  public
    constructor Create;
    destructor Destroy; override;
    procedure Add(Item: Pointer);
    procedure Clear;
    function LockList: TList;
    procedure Remove(Item: Pointer); inline;
    procedure RemoveItem(Item: Pointer; Direction: TList.TDirection);
    procedure UnlockList; inline;
    property Duplicates: TDuplicates read FDuplicates write FDuplicates;
  end;

Казалось бы, потокобезопасный класс, с доступом через критическую секцию, что в нем плохого? А плохо то, что у нас доступны методы LockList и UnlockList. Если между парой вызовов LockList и UnlockList у нас будет синхронизация — то мы нарушаем вышеописанное правило. Поэтому выносить пару функций Lock/Unlock в паблик — не есть хорошо, и такие функции нужно использовать крайне осторожно.

К слову, различные API от Microsoft часто возвращают Enum интерфейсы, вот например. Зачем они это делают? Ведь гораздо удобнее получить количество скажем через функцию Count, а потом в цикле через функцию GetItem по индексу получать элемент. Но в этом случае им бы пришлось вынести еще пару функций Lock/Unlock, чтобы никто не мог изменить список, пока вы в цикле работаете. Кроме того, если вы между Lock/Unlock вдруг вызовете такую API функцию, которая выполняет внутреннюю синхронизацию — вы запросто можете получить дедлок. Поэтому все и сделано через Enum интерфейсы. При получении такого интерфейса формируется список объектов, и счетчик ссылок их увеличивается. Это значит что ни один объект в Enum интерфейсе не будет уничтожен пока как минимум энум интерфейс существует, и пока вы работаете с Enum — ко внутреннему списку все имеют доступ, и этот список может даже изменяться.

Наверное хватит

Нажал я кнопку предпросмотр, увидел получившийся объем, и понял что пока хватит. В следующей статье я хотел бы рассказать про класс делфи TThread, и показать правила, которым я следую при создании и работе с потоками.

Автор: MrShoor

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js