Qt: шаблон для корректной работы с потоками

в 12:34, , рубрики: c++, qt, Qt Software, QThread, Алгоритмы, многозадачность, метки: , , ,

Всем хабрапривет!
Как-то понадобилось мне в Qt 5.1.1 для WinXP в VS2009 реализовать многопоточное приложение с интенсивным обменом сигналами. Взял я Шлее, вычитал у него, что нужно унаследовать класс от QThread и — вуаля, велком в многопоточность! На всякий случай заглянул в документацию Qt — там никто не возражал против наследования от QThread своего класса. Ну что же — порядок, сделано! Запускаю — вроде как работает, но как-то не так… Начинаю в режиме отладки отслеживать — а там творится черт знает что! То сигналы не выходят, то выходят, но как-то криво и из другого потока. Одним словом, полный бардак! Пришлось основательно по-google-ить и разобраться в теме (мне помогли статьи тут, здесь и там). В итоге я сделал шаблон класса на С++ (вернее, целую иерархию оных), что мне позволило в итоге писать (относительно) небольшой код класса, живущего в другом потоке, который работает правильно и стабильно.

Чего хочется

Стремился я ко вполне очевидным вещам:

  • использовать классы С++ во всей их красе — т. е. чтобы конструктор вызывался при создании потока и деструктор перед уничтожением;
  • использовать возможности Qt во всей его красе — т. е. сигнально-слотовые соединения, события и пр.;
  • при желании — контроль над процессом создания и работы — приоритет потока, слот о начале работы и сигнал об экстренном завершении;
  • минимум писанины и максимум понятности.

Получилось у меня что-то вроде:

class SomeJob: public QObject
{
	Q_OBJECT
public:
	SomeJob ()	{ /* ... */ }	// создание потока
	~SomeJob ()	{ /* ... */ }	// удаление потока
signals:
	void finished ();	// поток закончил свою работу
public slots:
	void to_terminate ()	{ /* ... */ }	// экстренное завершение
};

... 
ThreadedObject<SomeJob> thr;	// объект-владелец потока
thr.start ();	// создание потока с автозапуском конструктора

Красотень!

Как мы будем действовать

В Qt 5.1 для наших целей предназначен низкоуровневый класс QThread. Про него сказано следующее: «класс QThread дает возможность в платформо-независимом виде управлять потоками». Замечательная книга у Шлее, но вот пример с потоками у него вышел очень сбивающим с толку: он предлагает наследовать класс QThread, переопределить метод run () и в нем выполнить работу. Это неплохо для задачи в стиле «запустили, выполнили функцию и завершили», но категорически неприемлемо для более сложных случаев.
В общем, зря я его послушал прочитал, надо было сразу вникать в документацию. А в ней есть, между прочим, хороший пример. Он показывает правильный путь: функцию QObject::moveToThread (QThread *thread), которая переносит родственность (сходство? — англ. affinity) данного объекта и всех его предков потоку thread.
Таким образом, в первом приближении решение задачи выглядит следующим образом:

  1. создание потока — класс QThread;
  2. создание объекта и перенос его в новый поток;
  3. установка сигнально-слотовых связей;
  4. запуск потока с заданным приоритетом.

Вроде бы все хорошо, но — помните? — я хочу, чтобы конструктор создаваемого объекта выполнился в новом потоке. Для этого он должен быть запущен после запуска потока. Можно вначале создать объект, а потом поток. Но все, что будет создано конструктором объекта, будет размещено в стеке (куче) текущего потока, а не нового. Можно как-то аккуратно все это хозяйство перенести в новый поток и удалить в старом, но… проще вызвать конструктор уже в новом потоке. Так что вот нам проблема №1. Надо решать.
Потом появилась проблема №2. Я создал симпатичный шаблон, который унаследован от QObject — это мне было нужно для сигнально-слотовых связей. И тут всплыла бяка: «MOC не позволяет использовать все возможности С++. Основная проблема в том, что шаблоны классов не могут иметь сигналы или слоты». #@&*!
Впрочем, и эту тему я также преодолел.

Я придумал следующие классы:

  1. ваш родной класс T;
  2. есть класс создания объекта — CreatorBase (потомок QObject). Он в слоте вызовом виртуального метода создает новый объект и его адрес передает сигналом;
  3. есть шаблонная реализация класса создателя — Creator<T> (потомок CreatorBase). Он реализует метод создания объекта заданного типа;
  4. есть класс ThreadedObjectBase (потомок QObject), который создает новый поток. Он получает объект-создатель CreatorBase и устанавливает необходимые сигнально-слотовые связи;
  5. пользователь использует шаблонный класс хранения объекта и потока ThreadedObject<T> (потомок ThreadedObjectBase). Он вызывает создание нового объекта и перегружает операторы * и ->, а также указатель типа создаваемого объекта;
  6. пользователь создает класс (он может быть потомком QObject), в котором по желанию реализует сигнал «класс закончил работу» и слот «прерывание работы», а также может задать отложенное удаление объекта.

Последовательность действий получилась несложной:

  1. используется класс хранения объекта и потока ThreadedObject;
  2. он создает создатель объекта Creator и QThread для нового потока;
  3. создатель объекта переносится в новый поток;
  4. устанавливаются сигнально-слотовые связи;
  5. вновь созданный поток запускается с необходимым приоритетом;
  6. в созданном потоке создается пользовательский класс T;
  7. ThreadedObjectBase узнает об этом с помощью слота setObject (void *Obj), запоминает адрес объекта и оповещает об этом миру с помощью сигнала objectIsReady ();
  8. об успешном финале всех этих действий можно узнать у bool ThreadedObject<T>::objectIsCreated (void) const.

Реализация

Рассмотрим код созданных классов (чтобы оно все влезло в экран, я убрал комментарии).
Создатели объектов:

class CreatorBase: public QObject
{
Q_OBJECT
	void	*_obj;
protected:	virtual void *Allocation (void) = 0;
public slots:	void allocate (void)		{ emit setObject (Allocation ()); }
signals:	void setObject (void *Obj);
};

template <class T>
class Creator: public CreatorBase
{
protected: void *Allocation (void) { return reinterpret_cast <void*> (new T); }
};

Тут все очевидно: базовый класс создателя CreatorBase имеет слот allocate (), который будет запущен в новом активном потоке. Он вызывает сигнал setObject (void *Obj), который передает адрес объекта, созданного в потомке void *Creator<T>::Allocation ().

Базовый класс ThreadedObjectBase выглядит следующим образом:

class ThreadedObjectBase: public QObject
{
	Q_OBJECT

protected:
	QThread	*_thread;

	virtual void SetObjectPointer (void *Ptr) = 0;
	ThreadedObjectBase (QObject *parent = 0): QObject (parent), _thread (0) {}

	void starting (CreatorBase *Creator, QThread::Priority Priority = QThread::InheritPriority, bool ToDeleteLaterThread = true)
	{
		bool res;			
		_thread = new QThread;
		Creator->moveToThread (_thread);
		res = connect (_thread, SIGNAL (started ()), Creator, SLOT (allocate ()));
		Q_ASSERT_X (res, "connect", "connection is not established");
		res = connect (Creator, SIGNAL (setObject (void*)), this, SLOT (setObject (void*)));
		Q_ASSERT_X (res, "connect", "connection is not established");
		if (ToDeleteLaterThread)
		{
			res = connect (_thread, SIGNAL (finished ()), _thread, SLOT (deleteLater ()));
			Q_ASSERT_X (res, "connect", "connection is not established");
		}
		_thread->start (Priority);
	}

public:
	virtual ~ThreadedObjectBase (void)		{ if (_thread) delete _thread; }
	QThread *thread (void)	{ return _thread; }
	const QThread *cthread (void) const	{ return _thread; }

signals:
	void objectIsReady (void);

private slots:
	void setObject (void *Obj)		{ SetObjectPointer (Obj); emit objectIsReady (); }
};

Основной метод тут — starting. Он создает поток с указанным приоритетом. При запуске поток _thread вызывает сигнал QThread::started (). Этот сигнал мы связываем со слотом CreatorBase::allocate (), который и создает новый объект. Тот, в свою очередь, вызывает сигнал CreatorBase::setObject (void *), который мы подхватываем слотом ThreadedObjectBase::setObject (void *Obj). Все, объект создан (о чем выдается сигнал ThreadedObjectBase::objectIsReady () ), указатель на него получен.

Если пользователь желает установить отложенное удаление класса потока (что желательно), то устанавливается связь внутри _thread QThread::finished () -> QObject::deleteLater ().
Также пользователь может установить имя сигнала (будет храниться в переменной _finished_signal). Этот сигнал вызывается создаваемым объектом по окончании своей работы. Аналогично слот из _terminate_slot будет вызываться сигналом прерывания работы потока (поток, впрочем, остановится не мгновенно; дождаться его окончания можно будет вызовом thread()->wait — см. QThread::wait).

Ну и наконец видимый пользователю шаблонный класс:

template <class T>
class ThreadedObject: public ThreadedObjectBase
{
private:
	Creator<T> *_creator;

protected:
	T*	_obj;
	const char	*_finished_signal;
	const char	*_terminate_slot;
	bool		_to_delete_later_object;

	void SetObjectPointer (void *Ptr)
	{
		bool res;

		_obj = reinterpret_cast <T*> (Ptr);

		if (_finished_signal)
		{
			res = connect (_obj, _finished_signal, _thread, SLOT (quit ()));
			Q_ASSERT_X (res, "connect", "connection is not established");
		}

		if (_terminate_slot)
		{
			res = connect (_thread, SIGNAL (finished ()), _obj, _terminate_slot);
			Q_ASSERT_X (res, "connect", "connection is not established");
		}
		
		if (_to_delete_later_object && _finished_signal)
		{
			res = connect (_obj, _finished_signal, _obj, SLOT (deleteLater ()));
			Q_ASSERT_X (res, "connect", "connection is not established");
		}
	}

public:
	ThreadedObject (QObject *parent = 0): ThreadedObjectBase (parent), _obj (0), _creator (0)
		{ }
	~ThreadedObject  (void)	{ if (_creator) delete _creator; }
	void start (const char *FinishedSignal = 0, const char *TerminateSlot = 0, QThread::Priority Priority = QThread::InheritPriority, bool ToDeleteLaterThread = true, bool ToDeleteLaterObject = true)
	{
		_finished_signal = FinishedSignal;
		_terminate_slot = TerminateSlot;
		_to_delete_later_object = ToDeleteLaterObject;
		starting (new Creator<T>, Priority, ToDeleteLaterThread);
	}

	bool objectIsCreated (void) const	{ return _obj != 0; }

	T* ptr (void) 			{ return reinterpret_cast <T*> (_obj); }
	const T* cptr (void) const	{ return reinterpret_cast <const T*> (_obj); }

	// . перегрузки
	operator T* (void)			{ return ptr (); }
	T* operator -> (void)		{ return ptr (); }
	operator const T* (void) const	{ return cptr (); }
	const T* operator -> (void) const	{ return cptr (); }	
};

Тут основной метод — start, который запоминает имена сигналов и слотов, а также устанавливает отложенное удаление метода. Метод objectIsCreated () возвращает истину когда объект уже создан. Многочисленные перегрузки позволяют использовать ThreadedObject<T> как «умный» указатель.

Вот простенький пример использования этих классов:


ThreadedObject <Operation> _obj;
QObject::connect (&_obj, SIGNAL (objectIsReady ()), this, SLOT (connectObject ()));
_obj.start (SIGNAL (finished ()), SLOT (terminate ()), QThread::HighPriority);

Снизу прилагается реальный пример — в основном потоке создается кнопка. В новом потоке создается переменная типа int, а также сигнал от таймера и событие по таймеру. Оба этих таймера уменьшают значение переменной int, по достижению нуля вызывается слот QCoreApplication::quit (). С другой стороны, закрытие приложения останавливает поток. Пример проверен в WinXP. Хотелось бы в комментариях услышать об успешных испытаниях в Linux, MacOS, Android и прочих поддерживаемых платформах.

Пример + классы

Файл ThreadedObject:

// **
// **  Базовый класс для создателя объекта
// **

class CreatorBase: public QObject
{
Q_OBJECT
	void	*_obj;										// созданный объект
protected:		virtual void *Allocation (void) = 0;	// тут будет создаваться объект
public slots:	void allocate (void)		{ emit setObject (Allocation ()); }	// слот создания объекта
signals:		void setObject (void *Obj);				// выдача созданного объекта
};

// **
// **  Базовый класс для создания потока
// **

class ThreadedObjectBase: public QObject
{
	Q_OBJECT

protected:
	QThread	*_thread;				// поток

	virtual void SetObjectPointer (void *Ptr) = 0;		// тут будет присваиваться объект
	ThreadedObjectBase (QObject *parent = 0): QObject (parent), _thread (0) {}		// инициализация предка

	void starting (CreatorBase *Creator, QThread::Priority Priority = QThread::InheritPriority, bool ToDeleteLaterThread = true)		// запуск нового потока
	{
		bool res;							// признак успешности установки сигналов-слотов
		_thread = new QThread;		// создание потока
		Creator->moveToThread (_thread);		// перенос _creator в поток
		res = connect (_thread, SIGNAL (started ()), Creator, SLOT (allocate ()));				Q_ASSERT_X (res, "connect", "connection is not established");	// запуск потока _thread вызывает создание объекта Creator-ом
		res = connect (Creator, SIGNAL (setObject (void*)), this, SLOT (setObject (void*)));	Q_ASSERT_X (res, "connect", "connection is not established");	// Creat-ор выдает адрес объекта
		if (ToDeleteLaterThread)			// отложенное удаление thread?
		{	res = connect (_thread, SIGNAL (finished ()), _thread, SLOT (deleteLater ()));		Q_ASSERT_X (res, "connect", "connection is not established");	}	// завершение потока _thread вызывает отложенное удаление объекта
		_thread->start (Priority);			// установка приоритета
	}

public:
	// . управление
	virtual ~ThreadedObjectBase (void)		{ if (_thread) delete _thread; }			// создание иерархии деструкторов
	QThread *thread (void)	{ return _thread; }				// поток, владеющий объектом

	// . состояние
	const QThread *cthread (void) const	{ return _thread; }		// поток, владеющий объектом

signals:
	void objectIsReady (void);				// сигнал "объект готов"

private slots:
	void setObject (void *Obj)				{ SetObjectPointer (Obj); emit objectIsReady (); }		// получение адреса объекта
};	// class ThreadedObjectBase

// **
// **  Создание потока
// **

template <class T>
class ThreadedObject: public ThreadedObjectBase
{
private:
	template <class T> class Creator: public CreatorBase		// создатель объекта в потоке
	{ protected: void *Allocation (void) { return reinterpret_cast <void*> (new T); } };
	Creator<T> *_creator;		// класс создания объекта в потоке

protected:
	T*	_obj;						// объект
	const char	*_finished_signal;			// сигнал "окончание работы объекта"
	const char	*_terminate_slot;			// слот "остановка работы"
	bool		_to_delete_later_object;	// установить "отложенне удаление объекта?

	void SetObjectPointer (void *Ptr)		// установка связей объекта
	{
		bool res;							// признак успешности установки сигналов-слотов

		_obj = reinterpret_cast <T*> (Ptr);	// установка указателя на объект

		if (_finished_signal)				// установить сигнал "окончание работы объекта"?
		{	res = connect (_obj, _finished_signal, _thread, SLOT (quit ()));			Q_ASSERT_X (res, "connect", "connection is not established");	}	// по окончанию работы объекта поток будет завершен
		if (_terminate_slot)				// установить слот "остановка работы"?
		{	res = connect (_thread, SIGNAL (finished ()), _obj, _terminate_slot);	Q_ASSERT_X (res, "connect", "connection is not established");	}	// перед остановкой потока будет вызван слот объекта "остановка работы"
		if (_to_delete_later_object && _finished_signal)	// установить отложенное удаление объекта?
		{	res = connect (_obj, _finished_signal, _obj, SLOT (deleteLater ()));	Q_ASSERT_X (res, "connect", "connection is not established");	}	// по окончанию работы объекта будет установлено отложенное удаление
	}

public:
	// . управление
	ThreadedObject (QObject *parent = 0): ThreadedObjectBase (parent), _obj (0), _creator (0) {}	// конструктор
	~ThreadedObject  (void)	{ if (_creator) delete _creator; }		// деструктор
	void start (const char *FinishedSignal = 0, const char *TerminateSlot = 0, QThread::Priority Priority = QThread::InheritPriority, bool ToDeleteLaterThread = true, bool ToDeleteLaterObject = true)		// запуск нового потока
	{
		_finished_signal = FinishedSignal;		// запоминание имени сигнала "окончание работы объекта"
		_terminate_slot = TerminateSlot;		// запоминание имени слота "остановка работы"
		_to_delete_later_object = ToDeleteLaterObject;	// запоминание установки отложенного удаление объекта
		starting (new Creator<T>, Priority, ToDeleteLaterThread);	// создание объекта
	}

	// . состояние
	bool objectIsCreated (void) const	{ return _obj != 0; }							// объект готов к работе?

	T* ptr (void) 				{ return reinterpret_cast <T*> (_obj); }			// указатель на объект
	const T* cptr (void) const	{ return reinterpret_cast <const T*> (_obj); }		// указатель на константный объект

	// . перегрузки
	operator T* (void)					{ return ptr (); }		// указатель на объект
	T* operator -> (void)				{ return ptr (); }		// указатель на объект
	operator const T* (void) const		{ return cptr (); }		// указатель на константный объект
	const T* operator -> (void) const	{ return cptr (); }		// указатель на константный объект
};	// class ThreadedObject

Файл main.cpp:


#include <QtGui>
#include <QtWidgets>
#include <QtCore>
 
#include "ThreadedObject.h"

// **
// **  Выполнение операции
// **

class Operation: public QObject
{
	Q_OBJECT

	int		*Int;		// некоторая динамическая переменная
	QTimer	_tmr;		// таймер
	int		_int_timer;	// внутренний таймер

public:
	Operation (void)	{ Int = new int (5); }			// некоторый конструктор
	~Operation (void)	{ if (Int) delete Int; }		// некоторый деструктор

signals:
    void addText(const QString &txt);		// сигнал "добавление текста"
	void finished ();						// сигнал "остановка работы"

public slots:
	void terminate ()			// досрочная остановка
	{
		killTimer (_int_timer);		// остановка внутреннего таймера
		_tmr.stop ();				// остановка внешенго таймера
		delete Int;					// удаление переменной
		Int = 0;					// признак завершения работы
		emit finished ();			// сигнал завергения работы
	}
    void doAction (void)		// некоторое действие
    {
		bool res;
		emit addText (QString ("- %1 -"). arg (*Int));
		res = QObject::connect (&_tmr, &QTimer::timeout, this, &Operation::timeout);	Q_ASSERT_X (res, "connect", "connection is not established");	// связывание внешнего таймера
		_tmr.start (2000);			// запуск внешнего таймера
		thread()->sleep (1);		// выжидание 1 сек...
		timeout ();					// ... выдача состояния ...
		startTimer (2000);			// ... и установка внутреннего таймера
    }
protected:
	void timerEvent (QTimerEvent *ev)	{ timeout (); }		// внутренний таймер

private slots:
	void timeout (void)
	{
		if (!Int || !*Int)									// поток закрывается?
			return;											// ... выход
		--*Int;												// уменьшение счетчика
		emit addText (QString ("- %1 -"). arg (*Int));		// выдача значения

		if (!Int || !*Int)									// таймер закрыт?
			emit finished ();								// ... выход

	}

};

// **
// **  Объект, взаимодействующий с потоком
// **

class App: public QObject
{
	Q_OBJECT

	ThreadedObject <Operation>	_obj;		// объект-поток
	QPushButton _btn;						// кнопка

protected:
	void timerEvent (QTimerEvent *ev)
	{
		bool res;							// признак успешности установки сигналов-слотов
		killTimer (ev->timerId ());			// остановка таймера
		res = QObject::connect (&_obj, SIGNAL (objectIsReady ()), this, SLOT (connectObject ()));		Q_ASSERT_X (res, "connect", "connection is not established");	// установка связей с объектом
		_obj.start (SIGNAL (finished ()), SLOT (terminate ()), QThread::HighPriority);					// запуск потока с высоким приоритетом
	}

private slots:
	void setText (const QString &txt)		{ _btn.setText (txt); }		// установка надписи на кнопке
	void connectObject (void)		// установка связей с объектом
	{
		bool res;					// признак успешности установки сигналов-слотов
		res = QObject::connect (this, &App::finish, _obj, &Operation::terminate);				Q_ASSERT_X (res, "connect", "connection is not established");	// закрытие этого объекта хакрывает объект в потоке
		res = QObject::connect (this, &App::startAction, _obj, &Operation::doAction);			Q_ASSERT_X (res, "connect", "connection is not established");	// установка сигнала запуска действия
		res = QObject::connect (_obj, &Operation::finished, this, &App::finish);				Q_ASSERT_X (res, "connect", "connection is not established");	// конец операции завершает работу приложения
		res = QObject::connect (_obj, &Operation::addText, this, &App::setText);				Q_ASSERT_X (res, "connect", "connection is not established");	// установка надписи на кнопку
		res = QObject::connect (&_btn, &QPushButton::clicked, _obj, &Operation::terminate);		Q_ASSERT_X (res, "connect", "connection is not established");	// остановка работы потока

		_btn.show ();				// вывод кнопки
		emit startAction ();		// запуск действия
	}

public slots:
	void terminate (void)			{ emit finish (); }		// завершение работы приложения

signals:
	void startAction (void);		// сигнал "запуск действия"
	void finish (void);				// сигнал "завершение работы"
};

// **
// **  Точка входа в программу
// **
 
int main (int argc, char **argv)
{
    QApplication app (argc, argv);		// приложение
	App a;								// объект
	bool res;							// признак успешности операции

	a.startTimer (0);					// вызов функции таймера объекта при включении цикла обработки сообщений
	res = QObject::connect (&a, SIGNAL (finish ()), &app, SLOT (quit ()));				Q_ASSERT_X (res, "connect", "connection is not established");	// окончание работы объекта закрывает приложение
	res = QObject::connect (&app, SIGNAL (lastWindowClosed ()), &a, SLOT (terminate ()));	Q_ASSERT_X (res, "connect", "connection is not established");	// окончание работы приложения закрывает объект

	return app.exec();					// запуск цикла обработки сообщений
}

#include "main.moc"

Автор: avn

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js