Для работы с PostgreSQL на языке С++, есть замечательная библиотека libpq. Библиотека отлично документирована, есть даже полный перевод на русский язык, от компании PostgresPRO.
При написании серверного бекэнда, столкнулся с тем, что в этой библиотеке нет никакого пула коннектов, а работа с БД, предполагалась в довольно интенсивном режиме и одного коннекта было явно мало. Каждый раз устанавливать соединение для отправки полученных данных, было бы просто безумием, т.к. соединение самая долгая операция, решено было написать свой пул коннектов.
Идея в том, что мы при старте программы создаём несколько соединений и храним их в очереди.
Когда приходят данные, мы просто берём свободное соединеие из очереди, а если свободных соединений нет — ждём когда появится, используем его для вставки данных, а затем помещаем соединение обратно. Идея довольна простая, быстро реализуема и самое главное, скорость работы очень высокая.
Создадим в PostgreSQL базу с именем demo, табличкой demo такой
-- Table: public.demo
-- DROP TABLE public.demo;
CREATE TABLE public.demo
(
id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
name character varying(256),
CONSTRAINT demo_pk PRIMARY KEY (id)
)
WITH (
OIDS=FALSE
);
ALTER TABLE public.demo
OWNER TO postgres;
Пишем класс который будет представлять собой соединение к БД, параметры соединения, будут прописаны прямо в коде для его упрощения, в реальности конечно их надо хранить в конфигурационном файле и при запуске считывать оттуда, чтобы при изменении парметров сервера, не приходилось перекомпилировать программу.
#ifndef PGCONNECTION_H
#define PGCONNECTION_H
#include <memory>
#include <mutex>
#include <libpq-fe.h>
class PGConnection
{
public:
PGConnection();
std::shared_ptr<PGconn> connection() const;
private:
void establish_connection();
std::string m_dbhost = "localhost";
int m_dbport = 5432;
std::string m_dbname = "demo";
std::string m_dbuser = "postgres";
std::string m_dbpass = "postgres";
std::shared_ptr<PGconn> m_connection;
};
#endif //PGCONNECTION_H
#include "pgconnection.h"
PGConnection::PGConnection()
{
m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );
if (PQstatus( m_connection.get() ) != CONNECTION_OK && PQsetnonblocking(m_connection.get(), 1) != 0 )
{
throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
}
}
std::shared_ptr<PGconn> PGConnection::connection() const
{
return m_connection;
}
Чтобы предотвратить возможную утечку ресурсов, соединиение мы будем хранить в умном указателе.
В конструкторе мы вызывам функцию PQsetdbLogin, которая устанавливает соединение к БД, возвращая указатель на соедининие PGconn* и переводим соедининие в асинхронный режим работы.
При завершении работы, соединие должно быть удалено функцией PQfinish, которой передается указатель, возвращённый функцией PQsetdbLogin. Поэтому последним параметром в вызове m_connection.reset() мы передаём адрес функции &PQfinish. Когда умный указатель выйдет из области видимости и счётчик ссылок обнулиться, он вызовет эту функцию, тем самым корректно завершив соединие.
Теперь нам нужен класс, который будет создавать, хранить и управлять работой пула коннектов.
#ifndef PGBACKEND_H
#define PGBACKEND_H
#include <memory>
#include <mutex>
#include <string>
#include <queue>
#include <condition_variable>
#include <libpq-fe.h>
#include "pgconnection.h"
class PGBackend
{
public:
PGBackend();
std::shared_ptr<PGConnection> connection();
void freeConnection(std::shared_ptr<PGConnection>);
private:
void createPool();
std::mutex m_mutex;
std::condition_variable m_condition;
std::queue<std::shared_ptr<PGConnection>> m_pool;
const int POOL = 10;
};
#endif //PGBACKEND_H
#include <iostream>
#include <thread>
#include <fstream>
#include <sstream>
#include "pgbackend.h"
PGBackend::PGBackend()
{
createPool();
}
void PGBackend::createPool()
{
std::lock_guard<std::mutex> locker_( m_mutex );
for ( auto i = 0; i< POOL; ++i ){
m_pool.emplace ( std::make_shared<PGConnection>() );
}
}
std::shared_ptr<PGConnection> PGBackend::connection()
{
std::unique_lock<std::mutex> lock_( m_mutex );
while ( m_pool.empty() ){
m_condition.wait( lock_ );
}
auto conn_ = m_pool.front();
m_pool.pop();
return conn_;
}
void PGBackend::freeConnection(std::shared_ptr<PGConnection> conn_)
{
std::unique_lock<std::mutex> lock_( m_mutex );
m_pool.push( conn_ );
lock_.unlock();
m_condition.notify_one();
}
В функции createPool создаём пул соедининий, я установил 10 соединений. Далее — создаём класс PGBackend, и работаем с ним через функции connection — котороя возвращает свободное соединение к БД, и freeConnection — которая помещяет соединие обратно в очередь.
Всё это работает на основе условных переменных, если очередь пуста, значит свободных соединений нет, и поток засыпает, пока не будет разбужен через условную переменную.
Простейший пример, в котором используется наш бекэнд с пулом коннектов, приведён в файле main.cpp. В «боевых условия» у вас конечно будет какой-то цикл событий, при наступлении которых будет проводиться работа с БД. У меня это boost::asio, которая работает асинхронно и принимая события из сети, пишет всё в БД. Приводить здесь её излишне, чтобы не усложнять идею с пулом коннектов. Здесь мы просто создаём 50 потоков, которые работают с сервером через один экземляр PGBackend.
#include <thread>
#include <iostream>
#include "pgbackend.h"
void testConnection(std::shared_ptr<PGBackend> pgbackend)
{
//получаем свободное соединение
auto conn = pgbackend->connection();
std::string demo = "SELECT max(id) FROM demo; " ;
PQsendQuery( conn->connection().get(), demo.c_str() );
while ( auto res_ = PQgetResult( conn->connection().get()) ) {
if (PQresultStatus(res_) == PGRES_TUPLES_OK && PQntuples(res_)) {
auto ID = PQgetvalue (res_ ,0, 0);
std::cout<< ID<<std::endl;
}
if (PQresultStatus(res_) == PGRES_FATAL_ERROR){
std::cout<< PQresultErrorMessage(res_)<<std::endl;
}
PQclear( res_ );
}
//возвращаем соединение в очередь
pgbackend->freeConnection(conn);
}
int main(int argc, char const *argv[])
{
auto pgbackend = std::make_shared<PGBackend>();
std::vector<std::shared_ptr<std::thread>> vec;
for ( size_t i = 0; i< 50 ; ++i ){
vec.push_back(std::make_shared<std::thread>(std::thread(testConnection, pgbackend)));
}
for(auto &i : vec) {
i.get()->join();
}
return 0;
}
Компилируется это всё командой:
g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool -std=c++14 -I/usr/include/postgresql/ -lpq -lpthread
Будьте внимательны с количеством соединений БД — этот параметр задаётся параметром max_connections (integer).
Автор: sborisov