Данная статья является продолжением статьи — Организация рабочих потоков: синхронизационный канал. Продолжение родилось как попытка написать пример использования подхода с синхронными сообщениями.
В этой части я хочу на примере показать, как можно организовать управление и отображение состояния движка с рабочим потоком, используя синхронные сообщения между потоками. И показать, как при этом обойти проблему взаимной блокировки потоков при закрытии приложения.
Давайте вернемся к примеру с предыдущей статьи. У нас есть графический интерфейс, отображающий состояние движка с рабочим потоком. Допустим движок можно запустить, остановить, поставить на паузу и соответственно снять с паузы. Для реализации такого поведения проще всего применить что-то подобное шаблонам проектирования конечный автомат и наблюдатель.
Для начала определимся с набором состояний движка. Движок у нас будет асинхронным, по этому в наборе состояний будут также и переходные состояния. У меня получился следующий набор состояний: NotStarted, StartPending, Started, PausePending, Paused, ResumePending, StopPending.
Эти состояния прежде всего будут использоваться для изменения состояния GUI. Тоесть GUI будет получать уведомление о изменении состояния движка, и соответствующим образом отображать это состояние. Например, при переходе в состояние NotStarted, GUI должно показать кнопку «Старт», а кнопки «Пауза» «Продолжить» и «Остановить» должны быть заблокированными. Соответственно при переходе в состояние Paused кнопки «Старт» и «Пауза» будут заблокированными, а кнопки «Продолжить» и «Остановить» должны быть разблокированными.
Давайте посмотрим, как может выглядеть обработчик уведомления о изменении состояния движка, на примере WTL диалога с соответствующими управляющими кнопками:
void CMainDlg::OnStateChanged(EngineState::State state)
{
// Это функция обратного вызова, она вызывается только из GUI потока по средством класса CSyncChannel,
// по этому нет необходимости в дополнительной синхронизации при доступе к членам класса диалога
// В случае Pending состояний, блокируем все кнопки, дальше мы разблокируем только нужные кнопки
// Так было сделано для уменьшения количества строчек в примере
GetDlgItem(IDC_BUTTON_START).EnableWindow(FALSE);
GetDlgItem(IDC_BUTTON_STOP).EnableWindow(FALSE);
GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(FALSE);
GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(FALSE);
switch (state)
{
case EngineState::NotStarted:
GetDlgItem(IDC_BUTTON_START).EnableWindow(TRUE);
break;
case EngineState::Started:
GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE);
GetDlgItem(IDC_BUTTON_PAUSE).EnableWindow(TRUE);
break;
case EngineState::Paused:
GetDlgItem(IDC_BUTTON_STOP).EnableWindow(TRUE);
GetDlgItem(IDC_BUTTON_CONTINUE).EnableWindow(TRUE);
break;
}
}
Для управления своим состоянием, движок будет иметь соответствующий набор функций: Start, Pause, Resume, Stop, которые в классе диалога будут вызываться из соответствующих обработчиков нажатия на кнопки.
За счет использования синхронных сообщений, переход из одного состояния в другое будет выполняться синхронно по отношению к потоку GUI. То есть, пока поток GUI находится в обработчике нажатия на кнопку «Старт», движок не может перейти в состояние Started или Paused асинхронно, он ждет пока обработчик нажатия на кнопку «Старт» завершится. Это существенно упрощает управление состояниями движка.
Переход в ожидающие состояния, такие как StartPending, осуществляется внутри вызова управляющих функций, таких как Start, и по этому, после выхода из функции Start, движок будет иметь состояние StartPending. То есть уведомление о переходе в состояние StartPending будет вызвано синхронно, еще до завершения вызова функции Start.
Посмотрим на реализацию движка.
Привожу класс движка полностью, т.к. при работе с многопоточностью любая упущенная деталь может играть большую роль.
//Engine.h
namespace EngineState
{
enum State {
NotStarted,
StartPending,
Started,
PausePending,
Paused,
ResumePending,
StopPending
};
};
class IEngineEvents {
public:
virtual void OnStateChanged(EngineState::State state) = 0;
};
class CEngine {
public:
CEngine(IEngineEvents* listener);
~CEngine(void);
public:
// Управляющие команды для рабочего потока
void Start();
void Stop();
void Pause();
void Resume();
public:
// GUI поток должен вызывать эту функцию для всех своих сообщений
bool ProcessMessage(MSG& msg);
private:
void WaitForThread();
static DWORD WINAPI ThreadProc(LPVOID param);
void Run();
bool CheckStopAndPause();
void ChangeState(EngineState::State state);
void OnStateChanged(EngineState::State state);
private:
CSyncChannel m_syncChannel; // Этот класс был описан в предыдущей статье
IEngineEvents* m_listener;
HANDLE m_hThread;
volatile EngineState::State m_state;
};
// Engine.cpp
CEngine::CEngine(IEngineEvents* listener) :
m_listener(listener),
m_hThread(NULL),
m_state(EngineState::NotStarted)
{
m_syncChannel.Create(GetCurrentThreadId());
}
CEngine::~CEngine(void)
{
// Если рабочий поток не был остановлен, необходимо установить флаг завершения потока
m_state = EngineState::StopPending;
// Необходимо дождаться, пока завершиться рабочий поток.
// Если рабочий поток в этот момент пошлет сообщение на GUI, то он заблокируется.
// Для того, чтобы разрешить ситуацию с взаимной блокировкой потоков,
// закрываем m_syncChannel, после чего рабочий поток будет разблокирован.
m_syncChannel.Close();
// Теперь рабочий поток успешно завершиться.
WaitForThread();
}
void CEngine::WaitForThread()
{
if (m_hThread)
{
// Ожидание завершения рабочего потока.
// Рабочий поток завершиться при переходе в одно из состояний:
// StopPending или NotStarted
_ASSERT(m_state == EngineState::StopPending || m_state == EngineState::NotStarted);
// Ждем полной остановки рабочего потока
DWORD waitResult = WaitForSingleObject(m_hThread, INFINITE);
_ASSERT(waitResult == WAIT_OBJECT_0);
// Рабочий поток полностью завершился, теперь можно освободить HANDLE рабочего потока
CloseHandle(m_hThread);
m_hThread = NULL;
}
}
void CEngine::Start()
{
// Это управляющая комманда, вызывается из GUI потока
// Запустить рабочий поток можно только в том случае, если он еще не запущен
// Для этого проверяем состояние движка
if (m_state == EngineState::NotStarted)
{
// Если функция Start была вызвана повторно, после завершения рабочего потока,
// предыдущий рабочий поток может некоторое время продолжать работать
// Необходимо дождаться его полного завершения перед тем, как создавать новый поток
WaitForThread();
// Создаем новый рабочий поток,
// передаем параметром this, чтобы поток мог вызвать функцию Run для текущего объекта
m_hThread = CreateThread(NULL, 0, CEngine::ThreadProc, this, 0, NULL);
if (m_hThread)
{
// Переключаем состояние движка в StartPending
// Рабочий поток уже мог вызвать функцию ChangeState(EngineState::Started)
// Но так, как для переключения состояния используется SyncChannel,
// то состояние не может быть изменено асинхронно,
// рабочий поток будет ждать, пока не завершится обработка нажатия на кнопку "Start"
// И по этому состояние StartPending гарантированно придет перед тем,
// как придет состояние Started от рабочего потока
ChangeState(EngineState::StartPending);
}
}
}
void CEngine::Stop()
{
// Это управляющая комманда, вызывается из GUI потока
if (m_state != EngineState::NotStarted && m_state != EngineState::StopPending)
{
// Устанавливаем флаг остановки, если он еще не установлен и поток еще работает
ChangeState(EngineState::StopPending);
}
// Рабочий поток остановится асинхронно, после выхода этой функции
// поток еще будет продолжать работать
// После остановки рабочий поток вызовет функцию ChangeState(EngineState::Stopped)
// Для ожидания полной остановки потока необходимо вызвать функцию WaitForThread
}
void CEngine::Pause()
{
// Это управляющая комманда, вызывается из GUI потока
if (m_state == EngineState::Started)
{
// Переход в состояние PausePending возможен только из состояния Started
ChangeState(EngineState::PausePending);
}
}
void CEngine::Resume()
{
// Это управляющая комманда, вызывается из GUI потока
if (m_state == EngineState::Paused)
{
// Переход в состояние ResumePending возможен только из состояния Paused
ChangeState(EngineState::ResumePending);
}
}
bool CEngine::ProcessMessage(MSG& msg)
{
// GUI поток вызывает эту функцию для всех своих сообщений
return m_syncChannel.ProcessMessage(msg);
}
DWORD WINAPI CEngine::ThreadProc(LPVOID param)
{
// Это статическая функция потока, получаем указатель
// на объект класса и вызываем его функцию Run
reinterpret_cast<CEngine*>(param)->Run();
return 0;
}
void CEngine::Run()
{
// Оповещаем GUI о том, что рабочий поток запустился
ChangeState(EngineState::Started);
for (;;)
{
// Это функция рабочего потока, здесь выполняется какая-то работа
// При этом необходимо периодически проверять флаг остановки и паузы
if (!CheckStopAndPause())
{
break;
}
// Выполняется важная работа - хорошо поспать, мечта всех родителей :)
Sleep(1000);
}
// Оповещаем о том, что рабочий поток завершился
ChangeState(EngineState::NotStarted);
// После переключения в состояние NotStarted рабочий поток
// еще какое-то время продолжает свое выполнение по этому необходимо
// дождаться завершения рабочего потока с использованием функции WaitForSingleObject
}
bool CEngine::CheckStopAndPause()
{
// Эта функция вызывается периодически из рабочего потока.
// Частота вызова этой функции влияет на отзывчивость GUI.
if (m_state == EngineState::StopPending)
{
// Была вызывана функция Stop, необходимо остановить рабочий поток
return false;
}
if (m_state == EngineState::PausePending)
{
// Ставим рабочий поток на паузу,
// для этого оповещаем GUI поток о изменении состояния
// и входим в цикл ожидания дальнейших команд
ChangeState(EngineState::Paused);
while (m_state == EngineState::Paused)
{
Sleep(100);
}
if (m_state == EngineState::StopPending)
{
// Была вызывана функция Stop, необходимо остановить рабочий поток
return false;
}
// Снимаем рабочий поток с паузы
// Для этого оповещаем GUI о изменении состояния и возвращаем управление рабочему потоку
ChangeState(EngineState::Started);
}
// Рабочий поток может продолжать свое выполнение
return true;
}
void CEngine::ChangeState(EngineState::State state)
{
// Эта функция может быть вызвана рабочим потоком,
// транслируем вызов в GUI поток, используя m_syncChannel
m_syncChannel.Execute(boost::bind(&CEngine::OnStateChanged, this, state));
}
void CEngine::OnStateChanged(EngineState::State state)
{
// Эта функция вызывается только из GUI потока, устанавливаем переменную m_state
// Таким образом изменение переменной m_state будет происходить только из одного потока
// и не требует использования объектов синхронизации, таких как критические секции или мьютексы
// То есть состояние рабочего потока не может переключится асинхронно по отношению к GUI потоку
m_state = state;
m_listener->OnStateChanged(m_state);
}
Использование класса CEngine.
В моем примере объект класса CEngine объявлен членом класса WTL диалога.
Набор функций диалога сводится к обработчикам нажатия на клавиши «Старт», «Стоп», «Продолжить», «Пауза» которые перевызывают соответствующие фукнции объекта CEngine.
Также класс диалога подписывается на уведомления о изменении состояния движка при помощи интерфейса IEngineEvents, реализация функции OnStateChanged этого интерфейса была приведена в начале статьи.
Для обеспечения трансляции сообщений потока в класс CEngine, класс диалога устанавливает себя фильтром оконных сообщений, используя стандартные методы WTL::CMessageLoop::AddMessageFilter, и реализует интерфейс WTL::CMessageFilter:
class CMessageFilter
{
public:
virtual BOOL PreTranslateMessage(MSG* pMsg) = 0;
};
Реализация функции PreTranslateMessage сводится к перевызову функции CEngine::ProcessMessage
Для решения проблемы взаимной блокировки потоков при закрытии приложения, в классе диалога никаких дополнительный действий не требуется. Эта проблема полностью решена за счет использования CSyncChannel::Close в деструкторе класса CEngine. Таким образом рабочий поток полностью инкапсулирован внутри класса CEngine, что дает ощутимые преимущества при работе с таким классом.
Автор: Ryadovoy