При написании много-сервисной системы в корой каждый сервис должен быть многопоточен, столкнулись с проблемой использования подключения к базе данных. Сервисы разрабатываются на QT, поэтому использовали модуль QtSql для взаимодействие с БД.
Проблемы
- Для каждого потока необходимо свое собственное подключение к БД (QSqlDatabase). При использовании одного подключения из разных потоков возникаем ошибка сегментирования.
- Т.к. в текущий момент времени возможно держать открытыми ограниченное число подключений к БД, необходимо реализовать захват, освобождение и ожидание подключения потоками.
- В контексте потока, для правильной работы с транзакциями необходимо работать только с одним подключением. Например: Сущность заказ содержит в себе сущности Товар. При сохранении Заказа должны сохранится все товары. Если при сохранении товара возникает исключительная ситуация, то вся транзакция по сохранению заказ должна отменится.
- Библиотека должна уметь работать с несколькими БД одновременно, причем разных типов (Mysql,PostgreSQL)
Решение
В итоге у нас получилось 3 класса:
- Connection — класс обертка отвечающий за работу с БД: Подключение, выполнение и обработка результатов запросов.
- ConnectionManager — синглтон хоронящий в себе подключени и отвечает за выдачу и освобождение подключений.
- ManagedConnection — класс обертка для автоматизации захвата и освобождения подключения.
Connection
В конструкторе класса происходит инциализация члена QSqlDatabase _conn и вызывается открытие (open) подключения:
Connection::Connection(const QString& ident, const QString& driver,
const QString& dbHost, const QString& dbName, const QString& dbUser,
const QString& dbPassword, int dbPort) :
_threadId(0), _countRef(0), _countBegins(0), _retryCount(0),_invalid(false) {
_conn = QSqlDatabase::addDatabase(driver, ident);
_conn.setHostName(dbHost);
_conn.setDatabaseName(dbName);
_conn.setUserName(dbUser);
_conn.setPassword(dbPassword);
_conn.setPort(dbPort),
open();
}
Основные прототипы метод для работы с БД
void exec(QSqlQuery& sql);
void exec(const char* sql);
void exec(const QString& sql);
/**
* возвращает список значений, взятых из первого столбца всех строк набора результатов
*/
template <typename T>
void fetchCol(QSqlQuery& sql,QList<T>& result)...
template <typename T>
QList<T> fetchCol(QSqlQuery& sql) ...
template <typename T>
QList<T> fetchCol(const char* sql) ...
template <typename T>
QList<T> fetchCol(const QString& sql) ...
/*
* Возвращает значение только одной колонку из первой строки
*/
template <typename T>
T fetchOne(QSqlQuery& sql, bool* ok = 0) ...
template <typename T>
T fetchOne(const QString& sql, bool* ok = 0) ...
template <typename T>
T fetchOne(const char* sql, bool* ok = 0) ...
/*
* Методы возващающие одну строку
*/
void fetchRow(QSqlQuery& sql,QVariantMap& res);
QVariantMap fetchRow(const QString& sql);
QVariantMap fetchRow(const char* sql);
QVariantMap fetchRow(QSqlQuery& sql);
/**
* Методы возвращающие множество строк
*/
void query(QSqlQuery& sql,QList<QVariant>& result);
QList<QVariant> query(const char* sql);
QList<QVariant> query(QSqlQuery& sql);
QList<QVariant> query(const QString& sql);
/* Псевдонимы query */
void fetchAll(QSqlQuery& sql,QList<QVariant>& result);
QList<QVariant> fetchAll(const char* sql);
QList<QVariant> fetchAll(QSqlQuery& sql);
QList<QVariant> fetchAll(const QString& sql);
Т.к. в QT для работы с БД используется QSqlQuery который зависит от QSqlDatabase то для создания запросов строго обязательно использовать методы:
QSqlQuery createQuery() const { return QSqlQuery(_conn);}
QSqlQuery createQuery(const QString& sql) { return QSqlQuery(sql, _conn); }
ManagedConnection
Класс «увязывает» Connection и ConnetcionManager.
При создании объекта происходит попытка запроса подключения у ConnetcionManager по идентификатору (например db1conn). После захвата инциализируется член указатель на подключение. Для удобства, переопределяется оператор -> дабы вызывались методы Connection.
Обычно приложение требует подключение только к одной БД. Поэтому принято было давать ему идентификатор «default».
Тип typedef ManagedConnection DConn позволит получать подключение. Например
DB::DConn conn;
//Эквивалент
DB::ManagedConnection conn("default');
//Для вышеуказанных примеров
DB::ManagedConnection c1("db1conn);
DB::ManagedConnection c2("db2conn);
Возьмем к примеру стек вызовов на псевдокоде. Заказ (Order) сохраняет свои данные в БД и вызывает сохранения у своего члена Item (в идале их много). Item сохраняет свои данные в БД и вызывает сохранение своего члена Data. Data сохраняет в БД свои данные. В итоге вложенность на 3 уровня:
Order : save(){
DB::DBConn conn; //перый захват. countRef = 1
conn->query('INSERT INTO order...');
item->save();
Item:save() {
DB::DBConn conn;//второй захват. countRef = 2
conn->query('INSERT INTO item...');
data->save();
Data:save() {
DB::DBConn conn;//третий захват. countRef = 3
conn->query('INSERT INTO data...');
//конец блока, вызов деструктора ~ManagedConnection countRef = 2
}
//конец блока, вызов деструктора ~ManagedConnection countRef = 1
}
//конец блока, вызов деструктора ~ManagedConnection countRef = 0
}
}
}
ConnectionManager
Хранить в себе настройки для подключения: хост, порт, тип базы, и т.д.
Например для приложения необходимо иметь подключение с базой db1 типа MySQL и db2 типа PostgresSQL. Настройка в конфигурационном файле будет выглядеть так:
[database]
size = 2
;Идентификатор
1ident=db1conn
;Тип драйвера, в данном случае MySQL
1driver=QMYSQL
; Хост
1host=localhost
; Имя БД
1name=db1
;Пользователь БД
1user=db1_user
;Пароль
1password=lol
;Порт
1port=3306
;Максимальное кол-во подключений которое может иметь приложение на данный момент
1max_count = 30
2ident=db2conn
2driver=QPSQL
2host=localhost
2name=test
2user=postgres
2password=
2port=5432
2max_count = 30
При старте приложения конфиг считывается и преобразуется в QVariantMap.
Пример инициализации в Application
Application::Application(int& argc, char** argv):
QCoreApplication(argc, argv)
{
...
QVariantMap stgs = settings();
DB::ConnectionManager::init(stgs);
...
}
Статический член ConnectionManager иницилизируется из конфига (static QMap<QString, ConnectionManager*> _instances;)
В качестве ключа в map будет использоваться идентификатор из конфига ident
void ConnectionManager::init(const QVariantMap& settings) {
const int size = settings.value("database/size").toInt();
for (int i = 1; i <= size; ++i) {
const QString ident =
settings.value(QString("database/%1/ident").arg(i), "default").toString();
ConnectionManager* inst = new ConnectionManager(
ident,
settings.value(QString("database/%1/driver").arg(i)).toString(),
...
);
_instances[ident] = inst;
Log::info(QString("ConnectionManager::init: [%1] [%2@%3:%4] ").arg(inst->_driver).arg(inst->_dbUser).arg(inst->_dbHost).arg(inst->_dbPort));
}
}
В качестве ключа в map будет использоваться идентификатор из конфига ident
Основной метод класса getConnection (пояснение в комментариях кода):
Connection* ConnectionManager::getConnection() {
Connection* conn = 0;
int count = 0;
while(count++ < MAX_RETRY_GET_CONN_COUNT) {
pthread_t thread_id = pthread_self();
//Бегаем по подключениям, ищим подключение которое возможно уже было взято в текущем потоке
{
QMutexLocker mlocker(&_mutex);
for (int i = 0; i < _pool.size(); ++i) {
conn = _pool.at(i);
//если находим то возвращаем
if (conn && conn->threadID() == thread_id && conn->isValid()) {
//увеличивая счетчик ссылок на это подключение в этом треде
conn->lock();
//Log::debug(QString("ConnectionManager::getConnection Возвращем то же самое подключение что было ранее залочено thread [%1])").arg(conn->name()));
return conn;
}
}
}
//если не нашли прежде взятые в этом потоке, то будем искать первое не залоченное
{
QMutexLocker mlocker(&_mutex);
for (int i = 0; i < _pool.size(); ++i) {
conn = _pool.at(i);
if (conn && !conn->isLocked() && conn->isValid() ) {
//Log::debug(QString("ConnectionManager::getConnection Захват свободного подключения [%1])").arg(conn->name()));
//таки лочим его
conn->lock();
return conn;
}
}
}
//если тут оказались то нет больше поключений
{
QMutexLocker mlocker(&_mutex);
if(_currentCount < _maxCount) { //если текущее количество не превышает максимальное
//то создадим новое подключение
//try {
conn = new Connection(
QString("%1_%2").arg(_ident).arg(_currentCount),
_driver, _dbHost,
_dbName, _dbUser,
_dbPassword,_dbPort
);
_currentCount++;
conn->lock();
_pool.append(conn);
return conn;
/*} catch(exc::Message& ex) {
delete conn;
throw ex;
}*/
} else {
//удалим первый невалидный
//Log::warn("Достигнуто максимальное кол-во [%d] доступных подключений к DB попытка [%d]",_maxCount,count);
/*for (int i = 0; i < _pool.size(); ++i) {
conn = _pool.at(i);
if (!conn->isValid() && !conn->isLocked() ) {
removeConnection(conn);
break;
}
}*/
}
}
//Если нельзя,
//вздремнем малость и по новой
sleep(2);
}
Log::crit("После %d не смог подключить подключение к базе данных",MAX_RETRY_GET_CONN_COUNT);
{
QMutexLocker mlocker(&_mutex);
for (int i = 0; i < _pool.size(); ++i) {
conn = _pool.at(i);
if (!conn->isValid() && !conn->isLocked()) {
removeConnection(conn);
break;
}
}
}
throw exc::Message("Невозможно получить подключение к базе данных");
//return 0;
}
Логика работы
ConnetctionManager инициализируется из конфига, чтоб знать с какими настройками создавать подключения, и каково их максимальное кол-во.
При создании экземпляра DB::ManagedConnection происходит обращение к ConnetctionManager и попытка получить указатель на Connetction из ConnetctionManager::getConnetction.
В ConnetctionManager::getConnetction используя несколько попыток происходит:
- Попытка взять из накопителя подключение у которого thread_id совпадает с текущим. Если найдено то возвратить, увеличив refCount подключения на 1
- Попытка взять из накопителя свободное подключения Если найдено то возвратить, увеличив refCount подключения на 1
- Если все подключения заняты и не достигнут макс. предел, то создать новое подключение, положить его в пул и возвратить
После удаления экземпляра класса DB::ManagedConnection происходит уменьшение refCount подключения. Если refCount == 0, подключение становится доступным для захвата других потокам.
Пример использования
QList<QVariant> Bank::banksByAccountNumber(const QString& accountNumber) {
QList<QVariant> res;
DB::ManagedConnection conn;
foreach(const QVariant& row, conn->fetchAll(
"SELECT `real` as state ,namen as name,namep as full_name,"
"newnum as bik,ksnp as korr_acc,okpo as okpo,nnp as city,"
"ind as zip, adr as address,regn as regnum, telef as phones FROM `bankdinfo` WHERE `real` = '' ORDER BY RAND()"
)) {
if(isBelongToBank((row.toMap()["bik"]).toString(),accountNumber)) {
res.append(row);
}
}
Log::debug("Banks found %d",res.size());
return res;
}
Автор: zordon13ru