Организация рабочих потоков: синхронизационный канал

в 19:00, , рубрики: c++, multithreading, SyncChannel, synchronization, WinAPI, Проектирование и рефакторинг, системное программирование, метки: , , ,

Представте себе типичное приложение:

Есть рабочий поток движка, выполняющий какую-то функциональность, допустим копирование файлов.
Данный поток должен периодически сообщать информацию о текущем копируемом файле, а также уметь обрабатывать ошибки, допустим ошибка нехватки места на диске.

Графический интерфейс такого приложения должен позволять запускать процесс копирования файлов, уметь приостановить копирование, а также, в случае ошибки, отобразить соответствующий диалог с вопросом к пользователю.

Казалось бы, как можно допустить ошибку в такой типичной ситуации?

Проблемы многопоточности

Когда в программе появляется дополнительный поток — сразу же возникает проблема взаимодействия между потоками. Даже если поток ничего не делает и ни с кем не общается, всегда есть проблема правильной остановки потока.

Даже при работе с высокоуровневыми классами-обертками над потоками, всегда есть возможность сделать что-то не так, если до конца не понимать правильность работы с потоками. По этому в данной статье будет идти речь о работе с потоками на уровне WinAPI.

И так, вернемся к нашему примеру.

Рабочий поток движка должен каким-то образом сообщать потоку GUI о своем состоянии (текущий копируемый файл), а так-же инициировать сообщение об ошибке.

Два основных направления — асинхронный и синхронный

Асинхронный способ — рабочий поток уведомляет о своем состоянии асинхронными сообщениями (PostMessage).
После посылки такого сообщения, поток, как правило, не дожидается ответа и продолжает свою работу.
А в случае невозможности продолжать, поток ожидает вызова управляющей команды от GUI.

Синхронный способ — рабочий поток уведомляет о своем состоянии синхронными вызовами (SendMessage), с ожиданием завершения обработки таких вызовов.
Такой способ удобен тем, что рабочий поток, в момент обработки сообщений, находится в заранее известном состоянии. Нет необходимости в излишней синхронизации.
В общем основная выгода в таком решении это простота.

В асинхронном способе есть и свои приемущества, но речь пойдет о синхронном способе.

Подводные камни: SendMessage + остановка потока

Когда я вижу поток, то сразу задаюсь вопросом как он взаимодействует с GUI и как его при этом останавливают.

Если рабочий поток прямым или коссвенным образом вызывает блокирующую функцию SendMessage() GUI потока (на примере WinAPI устанавливает текст статика вызовом WM_SETTEXT), то нужно быть особо внимательным при попытке остановки потока в обработчиках нажатия на кнопки и при закрытии приложения (в случае если GUI поток является основным потоком приложения).

Правильный способ завершить поток — это дождаться завершения, с использованием одной из функций WaitFor, передав параметром HANDLE потока.
Если этого не сделать, возможны непредсказуемые последствия (зависания и падения программы).
Особенно, если поток находится внутри динамически подключаемой библиотеки.

Но если мы будем в обработчике оконных сообщений ждать завершения потока, который, в свою очередь послал сообщение и ждет пока его обработают, то мы гарантированно получим deadlock.
Один из вариантов — не просто ждать завершение потока, а прокручивать оконные сообщения, пока поток не завершиться (тоже имеет право на существование)

Вторая архитектурная проблема — если рабочий поток вызывает напрямую код GUI, то необходимо позаботиться о синхронизации. Синхронизация потоков получается размазанной по всей программе.

Вариант решения перечисленных проблем — синхронизационный канал.

Канал должен обеспечивать переключение контекста потоков при вызове кода GUI.
По принципу COM Single-Threaded Apartments, то есть все вызовы от движка приходят в контексте клиентского потока.
Рабочий поток не должен пересекать границы интерфейса движка.

Рабочий поток должен иметь возможность завершиться, даже в момент вызова синхроного сообщения.
Все вызовы при этом остаются синхронными — рабочий поток ждет пока GUI не обработает уведомление (к примеру ошибка нехватки места на диске).

Таким образом код GUI остается полностью однопоточным и не требует дополнительной синхронизации.

Вариант реализации и пример на C++

Делаем некий синхронизационный канал, при помощи которого рабочий поток движка будет вызывать функции обратного вызова, реализуемые GUI.

Канал будет иметь функцию Execute, с параметром boost::function, куда можно передать функтор, созданный boost::bind. Таким образом, с использованием данного канала, можно будет вызвать функцию обратного вызова с любой сигнатурой, например:

class IEngineEvents
{
public:
    virtual void OnProgress(int progress) = 0;
    ...
};
//где-то в движке...
IEngineEvents* listener; //указатель на объект, реализуемый GUI
syncChannel.Execute(boost::bind(&IEngineEvents::OnProgress, listener, 30));

Функция Execute, как говорилось раньше, синхронная — она не завершается до тех пор, пока функция обратного вызова не будет завершена. Кроме исключения, описанного ниже.

Канал также должен иметь функцию Close, действие которой следующее: все вызовы функции Execute завершаются, новые вызовы функции Execute не проходят. Рабочий поток освобождается и, таким образом, решается проблема остановки рабочего потока — можно использовать функцию WaitFor без необходимости прокрутки оконных сообщений.

Данная реализация предполагает функцию ProcessMessage, которую необходимо вызывать из цикла обработки сообщений или оконной процедуры. Возможны реализации и без такой функции, например канал может создавать себе невидимое окно, и обрабатывать все сообщения внутри. Кроме того, возможны реализации без использования оконных сообщений в принципе.

Хотелось бы еще сказать, что пример несет лишь ознакомительный характер, и не является единственно правильным решением.

//SyncChannel.h
#pragma once
#include <Windows.h>
#include <boost/function.hpp>

class CSyncChannel
{
public:
    typedef boost::function<void()> CCallback;

public:
    CSyncChannel(void);
    ~CSyncChannel(void);

public:
    bool Create(DWORD clientThreadId);
    void Close();
    bool Execute(CCallback callback);
    bool ProcessMessage(MSG msg);

private:
    DWORD                           m_clientThreadId;
    CCallback                       m_callback;
    HANDLE                          m_deliveredEvent;
    volatile bool                   m_closeFlag;
};

//SyncChannel.cpp
#include "StdAfx.h"
#include "SyncChannel.h"

UINT WM_SYNC_CHANNEL_COMMAND = WM_APP + 500;

CSyncChannel::CSyncChannel(void)
:
    m_closeFlag(true)
{}

CSyncChannel::~CSyncChannel(void)
{}

bool CSyncChannel::Create(DWORD clientThreadId)
{
    if (!m_closeFlag)
    {
        return false;
    }
    m_clientThreadId = clientThreadId;

    m_deliveredEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
    if (!m_deliveredEvent)
    {
        return false;
    }
    m_closeFlag = false;
    return true;
}

void CSyncChannel::Close()
{
    m_closeFlag = true;
    if (m_deliveredEvent)
    {
        CloseHandle(m_deliveredEvent);
        m_deliveredEvent = NULL;
    }
}

bool CSyncChannel::Execute(CCallback callback)
{
    if (m_closeFlag)
    {
        return false;
    }
    if (GetCurrentThreadId() == m_clientThreadId)
    {
        callback();
        return true;
    }
    else
    {
        m_callback = callback;
        ResetEvent(m_deliveredEvent);

        if (!PostThreadMessage(m_clientThreadId, WM_SYNC_CHANNEL_COMMAND, NULL, NULL))
        {
            return false;
        }

        DWORD waitResult = WAIT_TIMEOUT;

        while (waitResult == WAIT_TIMEOUT && !m_closeFlag)
        {
            waitResult = WaitForSingleObject(m_deliveredEvent, 100);
        }

        if (waitResult != WAIT_OBJECT_0)
        {
            return false;
        }
    }
    return true;
}

bool CSyncChannel::ProcessMessage(MSG msg)
{
    if (msg.message != WM_SYNC_CHANNEL_COMMAND)
    {
        return false;
    }

    if (!m_closeFlag)
    {
        m_callback();

        SetEvent(m_deliveredEvent);
    }
    return true;
}

Автор: Ryadovoy

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js