По мере работы с базами данных, ознакомления с их плюсами и минусами, возникает момент, когда принимается решение миграции с одной СУБД в другую. В данном случае возникла задача переноса сервисов с MySQL на PostgreSQL. Вот небольшой перечень вкусностей, которые ждут от перехода на PostgreSQL, версии 9.2 (с более подробным списком возможностей можно ознакомится тут):
- наследование таблиц (есть ограничения, которые обещают в будущем исправить)
- диапазоны: int4range, numrange, daterange
- поддержка из коробки несколько языков для хранимых функций (PL/pgSQL, PL/Tcl, PL/Perl, PL/Python и голый C)
- оператор WITH, позволяющий делать рекурсивные запросы
- (планируется) материализованные представления (частично они доступны и сейчас — как IUD правила к представлению)
- (планируется) триггера на DDL операции
Как правило, существующие решения опираются на работу с уже готовым SQL дампом, который конвертируется в соответствии с синтаксисом целевой БД. Но в некоторых случаях (активно использующееся веб-приложение с большим объемом информации) такой вариант несет определенные временные затраты на создание SQL дампа из СУБД, его конвертации и загрузку получившегося дампа снова в СУБД. Поэтому оптимальней будет online-вариант (прямиком из СУБД в СУБД) конвертера, что может существенно уменьшить простой сервисов.
Языком для реализации выбран C++ (с некоторыми возможностями из C++11x), библиотеки для соединения с MySQL и PostgreSQL использовались нативные, в качестве IDE был задействован Qt Creator.
Алгоритм миграции состоит в следующем. Подразумевается что в БД-получателе уже создана структура таблиц, соответствующая структуре в БД-источнике. Формируется список таблиц для переноса данных, который затем распределяется в пуле потоков. Каждый поток имеет подключение к БД-источнику и к БД-получателю. Т.е. параллельно переносится несколько таблиц. Profit!
Традиционно любое приложение имеет некоторый каркас — набор системных компонент, на которые опираются другие компоненты — работа с конфигурационным файлом, логом, обработчик ошибок, менеджер памяти и прочее. В нашем случае, используется только самое необходимое для решения задачи. Во-первых, были переопределены (исключительно для удобства) некоторые фундаментальные и составные типы (да, знаю, можно было аlias templates использовать, но получилось так):
typedef bool t_bool;
typedef char t_char;
typedef unsigned char t_uchar;
typedef signed char t_schar;
typedef int t_int;
typedef unsigned int t_uint;
typedef float t_float;
typedef double t_double;
template<typename T, typename U>
class CMap
: public std::map<T, U>
{
public:
CMap();
virtual ~CMap();
};
template<typename T, typename U>
CMap<T, U>::CMap()
{
}
template<typename T, typename U>
CMap<T, U>::~CMap()
{
}
template<typename T>
class CVector
: public std::vector<T>
{
public:
CVector();
virtual ~CVector();
};
template<typename T>
CVector<T>::CVector()
{
}
template<typename T>
CVector<T>::~CVector()
{
}
class CFileStream
: public std::fstream
{
public:
CFileStream();
virtual ~CFileStream();
};
Из явных паттернов использован только синглтон:
template<typename T>
class CSingleton
{
public:
static T* instance();
void free();
protected:
CSingleton();
virtual ~CSingleton();
};
template<typename T>
T* CSingleton<T>::instance()
{
static T *instance = new T();
return instance;
}
template<typename T>
void CSingleton<T>::free()
{
delete this;
}
template<typename T>
CSingleton<T>::CSingleton()
{
}
template<typename T>
CSingleton<T>::~CSingleton()
{
}
Базовые классы для задачи (выполняется в отдельном потоке) и системы (запускает на выполнение задачи):
class CTask
{
public:
CTask();
virtual ~CTask();
void execute();
t_uint taskID();
t_bool isExecuted();
protected:
virtual void executeEvent() = 0;
private:
t_uint m_task_id;
t_bool m_executed;
};
CTask::CTask()
: m_executed(false)
{
static t_uint task_id = 0;
m_task_id = task_id++;
}
CTask::~CTask()
{
}
void CTask::execute()
{
executeEvent();
m_executed = true;
}
t_uint CTask::taskID()
{
return m_task_id;
}
t_bool CTask::isExecuted()
{
return m_executed;
}
class CSystem
{
public:
CSystem();
virtual ~CSystem() = 0;
protected:
void executeTask(CTask *task);
};
CSystem::CSystem()
{
}
CSystem::~CSystem()
{
}
void CSystem::executeTask(CTask *task)
{
CTask& task_ref = *task;
std::thread thread([&]() { task_ref.execute(); });
thread.detach();
}
В завершении рассмотрения базовых типов нужно упомянуть класс строки, который пришлось написать с нуля, чтобы для некоторых операций (замена подстроки и конкатенация) была возможность работы с передаваемым буфером (о нем чуть ниже) без дополнительных выделений памяти и некоторые вещи (преобразование строки в число и числа в строки) сделать членами класса (приводится только объявление класса):
class CString
{
public:
CString(const t_char *data = nullptr);
CString(const CString& s);
~CString();
const t_char* ptr() const;
void setPtr(t_char *p);
CString& operator= (const CString& s);
CString operator+ (const t_char *p) const;
CString operator+ (t_char c) const;
CString operator+ (const CString& s) const;
friend CString operator+ (const t_char *p, const CString& s);
CString& operator+= (const t_char *p);
CString& operator+= (t_char c);
CString& operator+= (const CString& s);
t_bool operator== (const CString& s) const;
t_bool operator!= (const CString& s) const;
t_bool operator< (const CString& s) const;
t_bool operator> (const CString& s) const;
t_bool operator<= (const CString& s) const;
t_bool operator>= (const CString& s) const;
t_char& at(t_uint index);
t_char at(t_uint index) const;
t_uint length() const;
t_bool isEmpty() const;
void clear();
t_int search(const CString& s, t_uint from = 0) const;
CString substr(t_uint from, t_int count = -1) const;
CString replace(const CString& before, const CString& after) const;
static CString fromNumber(t_uint value);
static t_uint toUnsignedInt(const CString& s, t_bool *good = nullptr);
CVector<CString> split(const CString& splitter) const;
t_bool match(const CString& pattern) const;
static t_uint replacePtr(const t_char *src, const t_char *before, const t_char *after, char *buffer);
static t_uint lengthPtr(const t_char *src);
static t_uint concatenatePtr(const t_char *src, char *buffer);
private:
t_char *m_data;
t_uint length(const t_char *src) const;
t_char* copy(const t_char *src) const;
t_char* concatenate(const t_char *src0, t_char c) const;
t_char* concatenate(const t_char *src0, const t_char *src1) const;
t_int compare(const t_char *src0, const t_char *src1) const;
};
CString operator+ (const t_char *p, const CString& s);
Как неизбежность, для приложения, чуть больше чем «Hello,world», это лог и конфигурационный файл. В методе записи сообщения в лог был задействован мьютекс, так как каждая задача по мере обработки таблицы пишет об этом в лог. Мелкогранулярные блокировки и lockfree-алгоритмы не рассматривались по причине того, что запись в лог — это далеко не узкое место в работе приложения:
class CLog
: public CSingleton<CLog>
{
public:
enum MessageType
{
Information,
Warning,
Error
};
CLog();
virtual ~CLog();
void information(const CString& message);
void warning(const CString& message);
void error(const CString& message);
private:
std::mutex m_mutex;
CFileStream m_stream;
void writeTimestamp();
void writeHeader();
void writeFooter();
void writeMessage(MessageType type, const CString& message);
};
CLog::CLog()
{
m_stream.open("log.txt", std::ios_base::out);
writeHeader();
}
CLog::~CLog()
{
writeFooter();
m_stream.flush();
m_stream.close();
}
void CLog::information(const CString& message)
{
writeMessage(Information, message);
}
void CLog::warning(const CString& message)
{
writeMessage(Warning, message);
}
void CLog::error(const CString& message)
{
writeMessage(Error, message);
}
void CLog::writeTimestamp()
{
time_t rawtime;
tm *timeinfo;
t_char buffer[32];
time(&rawtime);
timeinfo = localtime(&rawtime);
strftime(buffer, 32, "%Y/%m/%d %H:%M:%S", timeinfo);
m_stream << buffer << " ";
}
void CLog::writeHeader()
{
writeMessage(Information, "Log started");
}
void CLog::writeFooter()
{
writeMessage(Information, "Log ended");
}
void CLog::writeMessage(MessageType type, const CString& message)
{
std::lock_guard<std::mutex> guard(m_mutex);
writeTimestamp();
switch (type)
{
case Information:
{
m_stream << "Information " << message.ptr();
break;
}
case Warning:
{
m_stream << "Warning " << message.ptr();
break;
}
case Error:
{
m_stream << "Error " << message.ptr();
break;
}
default:
{
break;
}
}
m_stream << "n";
m_stream.flush();
}
class CConfig
: public CSingleton<CConfig>
{
public:
CConfig();
virtual ~CConfig();
CString value(const CString& name, const CString& defvalue = "") const;
private:
CFileStream m_stream;
CMap<CString, CString> m_values;
};
CConfig::CConfig()
{
m_stream.open("mysql2psql.conf", std::ios_base::in);
if (m_stream.is_open())
{
CString line;
const t_uint buffer_size = 256;
t_char buffer[buffer_size];
while (m_stream.getline(buffer, buffer_size))
{
line = buffer;
if (!line.isEmpty() && line.at(0) != '#')
{
t_int pos = line.search("=");
CString name = line.substr(0, pos);
CString value = line.substr(pos + 1);
m_values.insert(std::pair<CString, CString>(name, value));
}
}
m_stream.close();
CLog::instance()->information("Config loaded");
}
else
{
CLog::instance()->warning("Can't load config");
}
}
CConfig::~CConfig()
{
}
CString CConfig::value(const CString& name, const CString& defvalue) const
{
CMap<CString, CString>::const_iterator iter = m_values.find(name);
if (iter != m_values.end())
{
return iter->second;
}
return defvalue;
}
# MySQL connection
mysql_host=localhost
mysql_port=3306
mysql_database=mysqldb
mysql_username=root
mysql_password=rootpwd
mysql_encoding=UTF8
# PostgreSQL connection
psql_host=localhost
psql_port=5432
psql_database=psqldb
psql_username=postgres
psql_password=postgrespwd
psql_encoding=UTF8
# Migration
# (!) Note: source_schema == mysql_database
source_schema=mysqldb
destination_schema=public
tables=*
use_insert=0
# Other settings
threads=16
Теперь, что касательно добавления данных в PostgreSQL. Есть два варианта — использовать запросы INSERT, которые на большом массиве данных не очень себя показали в плане производительности (особенности транзакционного механизма), или через команду COPY, которая позволяет непрерывно пересылать порции данных, отправляя в конце передачи специальный маркер (символ-терминатор). Еще один нюанс связан с определением типа (поля в таблице) в PostgreSQL. В документации не указано (возможно не было чтения между строк документации), как можно вернуть человекопонятный идентификатор типа, поэтому было составлено соответствие oid (почти уникальный идентификатор каждого объекта в БД) и типа:
case 20: // int8
case 21: // int2
case 23: // int4
case 1005: // int2
case 1007: // int4
case 1016: // int8
case 700: // float4
case 701: // float8
case 1021: // float4
case 1022: // float8
case 1700: // numeric
case 18: // char
case 25: // text
case 1002: // char
case 1009: // text
case 1015: // varchar
case 1082: // date
case 1182: // date
case 1083: // time
case 1114: // timestamp
case 1115: // timestamp
case 1183: // time
case 1185: // timestamptz
case 16: // bool
case 1000: // bool
Подготовка и выполнение задач состоит в следующем:
- создается список таблиц
- создаются подключения (по количеству задач) к БД-источнику и БД-приемнику
- распределяются диапазоны из списка таблиц задачам
- задачи запускаются на выполнение (с переданным диапазоном таблиц и подключениями к БД)
- ожидается выполнение задач (главный поток + созданные потоки)
В каждой задаче выделяется три статических буфера по 50 МБ, в которых происходит подготовка данных для команды COPY (экранирование специальных символов и конкатенация значений полей):
// create connection pool
t_uint threads = CString::toUnsignedInt(CConfig::instance()->value("threads", "1"));
CLog::instance()->information("Count of working threads: " + CString::fromNumber(threads));
if (!createConnectionPool(threads - 1))
{
return false;
}
// create tasks
CString destination_schema = CConfig::instance()->value("destination_schema");
t_uint range_begin = 0;
t_uint range_end = 0;
t_uint range = m_tables.size() / threads;
for (t_uint i = 0, j = 0; i < m_tables.size() - range; i += range + 1, ++j)
{
range_begin = i;
range_end = i + range;
std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(m_source_pool.at(j), m_destination_pool.at(j), destination_schema, m_tables, range_begin, range_end));
m_migration_tasks.push_back(std::move(task));
}
range_begin = range_end + 1;
range_end = m_tables.size() - 1;
std::unique_ptr<CTask> task = std::unique_ptr<CTask>(new CMigrationTask(std::move(m_source), std::move(m_destination), destination_schema, m_tables, range_begin, range_end));
// executing tasks
for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
executeTask(m_migration_tasks.at(i).get());
}
task->execute();
// wait for completion
for (t_uint i = 0; i < m_migration_tasks.size(); ++i)
{
while (!m_migration_tasks.at(i)->isExecuted())
{
}
}
t_uint count = 0;
t_char *value;
CString copy_query = "COPY " + m_destination_schema + "." + table + " ( ";
m_buffer[0] = '';
m_buffer_temp0[0] = '';
m_buffer_temp1[0] = '';
if (result->nextRecord())
{
for (t_uint i = 0; i < result->columnCount(); ++i)
{
if (i != 0)
{
copy_query += ", ";
CString::concatenatePtr("t", m_buffer);
}
copy_query += result->columnName(i);
if (!result->isColumnNull(i))
{
value = result->columnValuePtr(i);
CString::replacePtr(value, "\", "\\", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "b", "\b", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "f", "\f", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "n", "\n", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "r", "\r", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "t", "\t", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "v", "\v", m_buffer_temp0);
CString::concatenatePtr(m_buffer_temp0, m_buffer);
}
else
{
CString::concatenatePtr("\N", m_buffer);
}
}
copy_query += " ) FROM STDIN";
if (!m_destination_connection->copyOpen(copy_query))
{
CLog::instance()->error("Can't execute query '" + copy_query + "', error: " + m_destination_connection->lastError());
return false;
}
CString::concatenatePtr("n", m_buffer);
if (!m_destination_connection->copyDataPtr(m_buffer))
{
CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());
return false;
}
++count;
while (result->nextRecord())
{
m_buffer[0] = '';
for (t_uint i = 0; i < result->columnCount(); ++i)
{
if (i != 0)
{
CString::concatenatePtr("t", m_buffer);
}
if (!result->isColumnNull(i))
{
value = result->columnValuePtr(i);
CString::replacePtr(value, "\", "\\", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "b", "\b", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "f", "\f", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "n", "\n", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "r", "\r", m_buffer_temp0);
CString::replacePtr(m_buffer_temp0, "t", "\t", m_buffer_temp1);
CString::replacePtr(m_buffer_temp1, "v", "\v", m_buffer_temp0);
CString::concatenatePtr(m_buffer_temp0, m_buffer);
}
else
{
CString::concatenatePtr("\N", m_buffer);
}
}
CString::concatenatePtr("n", m_buffer);
if (!m_destination_connection->copyDataPtr(m_buffer))
{
CLog::instance()->error("Can't copy data, error: " + m_destination_connection->lastError());
return false;
}
++count;
if (count % 250000 == 0)
{
CLog::instance()->information("Working task #" + CString::fromNumber(taskID()) + ":tttable " + table + " processing, record count: " + CString::fromNumber(count));
}
}
}
Результаты
Для переноса 2 Гб данных в PostgreSQL, c включенным WAL-архивированием, потребовалось порядка 10 минут (создано 16 потоков).
Над чем стоит подумать
- Определение на этапе выполнения количества задача/потоков — на основании количества данных и доступных аппаратных возможностей
- Определение количества необходимой памяти под буфер, в котором готовятся данные для COPY
- Распределение таблиц между задачами не по диапазону, а по необходимости — задачи берут таблицу из threadsafe-стека
Исходный код
Исходный код доступен на github.
Автор: blackmaster