Всем привет! Мы на проекте ГИС ЖКХ используем PostgreSQL и недавно столкнулись с проблемой долгого выполнения SQL скриптов из-за быстрого увеличения объема данных в БД. В феврале 2018 года на PGConf я рассказал, как мы решали эту проблему. Слайды презентации доступны на сайте конференции. Предлагаю вашему вниманию текст моего выступления.
Дано
Про ГИС ЖКХ уже была подробная статья в блоге группы ЛАНИТ на Хабре. Если в двух словах ГИС ЖКХ – это первый в России федеральный портал о всей информации в ЖКХ, который запущен почти во всех регионах (в 2019 году присоединятся Москва, Питер и Севастополь). За последние три месяца в систему было загружено более 12 ТБ данных о домах, лицевых счетах, фактах оплаты и много-много еще чего, а всего в PostgreSQL сейчас лежит уже более 24 ТБ.
Проект архитектурно разделен на подсистемы. Каждой подсистеме выделена отдельная база данных. Всего таких баз сейчас около 60, они размещены на 11 виртуальных серверах. Некоторые подсистемы нагружены сильнее других, и у них базы по объему могут занимать 3-6 терабайт.
ЦУП, у нас проблема
Теперь немного подробнее расскажу о проблеме. Начну издалека: у нас код приложения и код миграций базы данных (под миграцией я понимаю перевод базы данных из одной ревизии в другую с выполнением всех необходимых SQL скриптов для этого) хранятся вместе в системе контроля версий. Это возможно благодаря использованию Liquibase (подробнее про Liquibase на проекте можно узнать из доклада Миши Балаяна на TechGuruDay в ЛАНИТ).
Теперь давайте представим себе выпуск версии. Когда данных всего пара терабайт или меньше и все таблицы в пределах сотни гигабайт, изменения (миграции) любых данных или изменения структуры в любых таблицах проходят быстро (обычно).
А теперь представим, что у нас данных уже пара десятков терабайт и появилось несколько таблиц по терабайту и больше (возможно, разбитых на партиции). В новой версии нам нужно провести миграцию по одной из этих таблиц или еще хуже сразу по всем. И при этом время регламентных работ увеличивать нельзя. И при этом такую же миграцию нужно провести и на тестовых базах данных, где железо слабее. И при этом нужно понять заранее, сколько в сумме все миграции по времени займут. Здесь и начинается проблема.
Сперва мы попробовали советы из официальной документации PostgreSQL (удаление индексов и FK перед массовой миграцией, пересоздание таблиц с нуля, использование copy, динамическое изменение конфига). Это дало эффект, но нам хотелось еще быстрее и удобнее (тут, конечно, дело субъективное – кому как удобно :–)). В результате мы реализовали параллельное выполнение массовых миграций, что увеличило скорость на многих кейсах в разы (а иногда и на порядок). Хотя на самом деле запускается параллельно несколько процессов, внутри команды у нас прижилось слово “многопоточка”.
«Многопоточка»
Основная идея такого подхода заключается в разделении большой таблицы на непересекающиеся диапазоны (например, функцией ntile) и выполнение SQL скрипта не сразу по всем данным, а параллельно по нескольким диапазонам. Каждый параллельный процесс забирает себе один диапазон, блокирует его и начинает выполнять SQL скрипт только для данных из этого диапазона. Как только скрипт отработал, мы опять ищем незаблокированный и еще не обработанный диапазон и повторяем операцию. Важно выбрать правильный ключ для разделения. Это должно быть проиндексированное поле с уникальными значениями. Если такого поля нет, можно использовать служебное поле ctid.
Первая версия «многопоточки» была реализована с помощью вспомогательной таблицы с диапазонами и функции взятия следующего диапазона. Требуемый SQL скрипт подставлялся в анонимную функцию и запускался в требуемом количестве сессий, обеспечивая параллельное выполнение.
-- Таблица UPDATE_INFO_STEPS используется для реализации обновления/заполнения
-- больших таблиц, выполнения сложных запросов обновления/заполнения
CREATE TABLE UPDATE_INFO_STEPS (
BEGIN_GUID varchar(36),
END_GUID varchar(36) NOT NULL,
STEP_NO int,
STATUS char(1),
BEGIN_UPD timestamp,
END_UPD timestamp,
ROWS_UPDATED int,
ROWS_UPDATED_TEXT varchar(30),
DISCR varchar(10)
);
ALTER TABLE UPDATE_INFO_STEPS ADD PRIMARY KEY(discr, step_no);
-- Функция FUNC_UPDATE_INFO_STEPS реализует ключевой функционал.
-- Возможность "брать" следующий интервал, если текущий занят.
CREATE OR REPLACE FUNCTION func_update_info_steps(
pStep_no int,
pDiscr varchar(10)
) RETURNS text AS
$BODY$
DECLARE
lResult text;
BEGIN
SELECT
'SUCCESS' INTO lResult
FROM
update_info_steps
WHERE
step_no = pStep_no
AND discr = pDiscr
AND status = 'N'
FOR UPDATE NOWAIT;
UPDATE
UPDATE_INFO_STEPS
SET
status = 'A',
begin_upd = now()
WHERE
step_no = pStep_no
AND discr = pDiscr
AND status = 'N';
return lResult;
EXCEPTION WHEN lock_not_available THEN
SELECT
'ERROR' INTO lResult;
return lResult;
END;
$BODY$
LANGUAGE PLPGSQL VOLATILE;
-- Пример использования (1 процесс на 1 сессию)
-- Шаг 1. Заполняем служебную таблицу интервалами для обработки.
DO
LANGUAGE PLPGSQL
$$
DECLARE
-- Указать количество обрабатываемых записей за одну итерацию
l_count int := 10000;
-- Подставить идентификатор
l_discr VARCHAR(10) := '<discr>';
BEGIN
INSERT INTO UPDATE_INFO_STEPS (
BEGIN_GUID, END_GUID, STEP_NO, STATUS,
DISCR
)
SELECT
min(guid) BEGIN_GUID,
max(guid) END_GUID,
RES2.STEP STEP_NO,
'N' :: char(1) STATUS,
l_discr DISCR
FROM
(
SELECT
guid,
floor(
(ROWNUM - 1) / l_count
) + 1 AS STEP
FROM
(
-- Подставить название колонки
SELECT
<column> AS GUID,
-- Подставить название колонки
row_number() over (
ORDER BY
<column>
) AS ROWNUM
FROM
-- Подставить схему и название таблицы
<schema>.<table_name>
ORDER BY
1 --
) RES1
) RES2
GROUP BY
RES2.step;
END;
$$;
-- Шаг 2. Используя служебную таблицу, выполняем скрипт UPDATE.
DO
LANGUAGE PLPGSQL
$$
DECLARE
cur record;
vCount int;
vCount_text varchar(30);
vCurStatus char(1);
vCurUpdDate date;
-- Подставить идентификатор
l_discr varchar(10) := '<discr>';
l_upd_res varchar(100);
BEGIN
FOR cur IN (
SELECT
*
FROM
UPDATE_INFO_STEPS
WHERE
status = 'N'
AND DISCR = l_discr
ORDER BY
step_no
) LOOP
vCount := 0;
-- Внутренняя транзакция обязательна!
SELECT
result INTO l_upd_res
FROM
dblink(
'<parameters>',
'SELECT FUNC_UPDATE_INFO_STEPS(' || cur.step_no
|| ','''
|| l_discr
|| ''')'
) AS T (result text);
IF l_upd_res = 'SUCCESS' THEN
-- Основной скрипт. В данной секции необходимо выполнять
-- требуемые действия по обновлению, вставке и тп.
-- Обязательное требование - использовать интервал
-- cur.begin_guid - cur.end_guid и dblink на "самого себя".
-- Указан примерный скрипт.
SELECT
dblink(
'<parameters>',
'UPDATE FOO set level = 42
WHERE id BETWEEN ''' || cur.begin_guid
|| ''' AND '''
|| cur.end_guid
|| ''''
) INTO vCount_text;
-- Конец основного скрипта.
SELECT
dblink(
'<parameters>',
'update UPDATE_INFO_STEPS
SET status = ''P'', end_upd = now(),
rows_updated_text = ''' || vCount_text || '''
WHERE step_no = ' || cur.step_no || '
AND discr = ''' || l_discr || ''''
) INTO l_upd_res;
END IF;
END LOOP;
END;
$$;
-- Мониторинг выполнения.
SELECT
SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END) done,
SUM(CASE status WHEN 'A' THEN 1 ELSE 0 END) processing,
SUM(CASE status WHEN 'N' THEN 1 ELSE 0 END) LEFT_,
round(
SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END):: numeric / COUNT(*)* 100 :: numeric,
2
) done_proc
FROM
UPDATE_INFO_STEPS
WHERE
discr = '<discr>';
Такой подход хоть и работал быстро, но требовал очень большого числа действий руками. И если деплой проходил в 3 часа ночи, ДБА должен был отловить момент выполнения «многопоточного» скрипта в Liquibase (который его выполнял, по сути, в одном процессе) и запустить руками еще несколько процессов для ускорения.
«МноGOпоточка 2.0»
Предыдущая версия «многопоточки» была неудобной в использовании. Поэтому мы сделали приложение на Go, которое автоматизирует процесс (можно сделать и на Python, например, да и на многих других языках).
Сперва мы разбиваем данные в изменяемой таблице на диапазоны. После этого во вспомогательную таблицу задач добавляем информацию о скрипте – его имя (уникальный идентификатор, например, имя задачи в Jira) и количество одновременно запускаемых процессов. Затем во вспомогательную таблицу скриптов добавляем текст SQL миграции, разбитый на диапазоны.
-- В целевой БД необходимо создать объекты, в которых будет храниться
-- конфигурация многопоточного обновления (pg_parallel_task)
-- и логи задания (pg_parallel_task_statements).
CREATE TABLE IF NOT EXISTS public.pg_parallel_task (
name text primary key, threads_count int not null DEFAULT 10,
comment text
);
COMMENT ON table public.pg_parallel_task
IS 'Задание параллельного выполнения';
COMMENT ON COLUMN public.pg_parallel_task.name
IS 'Уникальный идентификатор';
COMMENT ON COLUMN public.pg_parallel_task.threads_count
IS 'Количество одновременных потоков обработки. По умолчанию 10';
COMMENT ON COLUMN public.pg_parallel_task.comment
IS 'Комментарий';
CREATE TABLE IF NOT EXISTS public.pg_parallel_task_statements (
statement_id bigserial primary key,
task_name text not null references public.pg_parallel_task (name),
sql_statement text not null,
status text not null check (
status in (
'new', 'in progress', 'ok', 'error'
)
) DEFAULT 'new',
start_time timestamp without time zone,
elapsed_sec float(8),
rows_affected bigint,
err text
);
COMMENT ON table public.pg_parallel_task_statements
IS 'Операторы параллельного выполнения';
COMMENT ON COLUMN public.pg_parallel_task_statements.sql_statement
IS 'Полный текст выполняемого запроса';
COMMENT ON COLUMN public.pg_parallel_task_statements.status
IS 'Статус обработки текущего оператора. Один из new|in progress|ok|error';
COMMENT ON COLUMN public.pg_parallel_task_statements.start_time
IS 'Время начала выполнения текущего оператора';
COMMENT ON COLUMN public.pg_parallel_task_statements.elapsed_sec
IS 'Для выполненных операторов, затраченное время в секундах';
COMMENT ON COLUMN public.pg_parallel_task_statements.rows_affected
IS 'Для выполненных операторов, количество затронутных строк';
COMMENT ON COLUMN public.pg_parallel_task_statements.err
IS 'Для выполненных операторов, текст ошибки. NULL, если выполнение успешно.';
-- Основной скрипт
INSERT INTO PUBLIC.pg_parallel_task (NAME, threads_count)
VALUES ('JIRA-001', 10);
INSERT INTO PUBLIC.pg_parallel_task_statements (task_name, sql_statement)
SELECT
'JIRA-001' task_name,
FORMAT(
'UPDATE FOO SET level = 42 where id >= ''%s'' and id <= ''%s''',
MIN(d.id),
MAX(d.id)
) sql_statement
FROM
(
SELECT
id,
NTILE(10) OVER (
ORDER BY
id
) part
FROM
foo
) d
GROUP BY
d.part;
-- Конец основного скрипта
При деплое происходит вызов приложения на Go, которое считывает конфигурацию задачи и скрипты по этой задаче из вспомогательных таблиц и автоматически запускает скрипты с заданным числом параллельных процессов (worker’ов). После выполнения управление передается обратно в Liquibase.
<changeSet id="JIRA-001" author="soldatov">
<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
<arg value="testdatabase"/><arg value="JIRA-001"/>
</executeCommand>
</changeSet>
Приложение состоит из трех основных абстракций:
- task – загружает в память параметры миграции, количество процессов и все диапазоны, запускает “многопоточку” и поднимает Web–сервер для отслеживания прогресса выполнения;
- statement – представляет собой один диапазон выполняемой операции, также отвечает за изменение статуса выполнения диапазона, запись времени выполнения диапазона, количество строк в диапазоне и т.д.;
- worker – представляет собой один поток выполнения.
В методе task.do создается канал, в который отправляются все statements операции. На этом канале запускается указанное число worker’ов. Внутри worker’ов бесконечный цикл, он мультиплексирует на двух каналах: по которому получает statements и выполняет их, и пустой канал как сигнализатор? что надо завершиться. Как только пустой канал будет закрыт, worker завершит работу – это случается при ошибке в одном из worker’ов. Т.к. каналы в Go это thread–safe структура, то закрытием одного канала мы можем отменить все worker’ы разом. Когда statement в канале закончится, worker просто выйдет из цикла, и уменьшит общий для всех worker'ов счетчик. Так как task всегда знает, сколько worker’ов по нему работает, он просто ждет, когда этот счетчик обнулится и после этого завершается сам.
Плюшки
За счет такой реализации «многопоточки» появилось несколько интересных фич:
- Интеграция с Liquibase (вызываем с помощью тега executeCommand).
- простой веб–интерфейс, который появляется при запуске “многопоточки” и содержит всю информацию о ходе ее выполнения.
- Прогресс–бар (мы знаем, сколько обрабатывается один диапазон, сколько запущено параллельных процессов и сколько диапазонов еще осталось обработать – значит можем подсчитать время завершения).
- Динамическое изменение параллельных процессов (пока это мы делаем руками, но в дальнейшем хотим автоматизировать).
- Логирование информации по ходу выполнения многопоточных скриптов для возможности дальнейшего анализа.
- Можно выполнять блокирующие операции типа update, почти ничего не блокируя (если разбить табличку на очень маленькие диапазоны, все скрипты будут выполняться почти мгновенно).
- Есть обертка для вызова “многопоточки” прямо из БД.
Не плюшки
Основным недостатком является необходимость один раз пройти фулсканом по табличке для разбиения ее на диапазоны, если в качестве ключа используется текстовое поле, дата или uid. Если ключом для разбиения выбрано поле с последовательно увеличивающимися плотными значениями, то такой проблемы нет (мы заранее можем указать все диапазоны, просто задав требуемый шаг).
Ускоряемся в семь раз (тест на pgbench таблице)
Напоследок приведу пример сравнения по скорости выполнения операции UPDATE 500 000 000 строк без использования «многопоточки» и с ней. Простой UPDATE выполнялся 49 минут, тогда как «многопоточка» завершилась за семь минут.
SELECT count(1) FROM pgbench_accounts;
count
-------
500000000
(1 row)
SELECT pg_size_pretty(pg_total_relation_size('pgbench_accounts'));
pg_size_pretty
----------------
62 Gb
(1 row)
UPDATE pgbench_accounts
SET abalance = 42;
-- Время выполнения 49 минут
vacuum full analyze verbose pgbench_accounts;
INSERT INTO public.pg_parallel_tASk (name, threads_count) values ('JIRA-002', 25);
INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT 'JIRA-002' tASk_name,
FORMAT('UPDATE pgbench_accounts
SET abalance = 42
WHERE aid >= ''%s'' AND aid <= ''%s'';',
MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid, ntile(25) over (order by aid) part
FROM pgbench_accounts) d
GROUP BY d.part;
-- Время выполнения 10 минут
-- Можно дробить по ctid, но получится неравномерно и нужно чтобы эту таблицу никто не изменял в процесе многопоточки
INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT 'JIRA-002-ctid' tASk_name,
FORMAT('UPDATE pgbench_accounts
SET abalance = 45
WHERE (ctid::text::point)[0]::text > ''%s'' AND (ctid::text::point)[0]::text <= ''%s'';',
(d.min_ctid), (d.max_ctid)) sql_statement
FROM (
WITH max_ctid AS (
SELECT MAX((ctid::text::point)[0]::int) FROM pgbench_accounts)
SELECT generate_series - (SELECT max / 25 FROM max_ctid) AS min_ctid, generate_series AS max_ctid
FROM generate_series((SELECT max / 25 FROM max_ctid), (SELECT max FROM max_ctid), (SELECT max / 25 FROM max_ctid))) d;
-- Время выполнения 9 мин
./pgpar-linux-amd64 jdbc:postgresql://localhost:5432 soldatov password testdatabase JIRA-002
-- Время выполнения 7 минут
P.S. Вам это надо, если:
Все инструменты хороши для определенных задач, и вот несколько таких для «многопоточки».
- UPDATE таблиц > 100 000 строк.
- UPDATE со сложной логикой, которую можно распараллелить (например, вызов функций для вычисления чего-либо).
- UPDATE без локов. За счет дробления на очень маленькие диапазоны и запуска небольшого числа процессов можно добиться мгновенной обработки каждого диапазона. Таким образом, блокировка тоже будет почти мгновенной.
- Параллельное выполнение changeSet’ов в Liquibase (например, VACUUM).
- Создание и заполнение данными новых полей в таблице.
- Сложные отчеты.
<changeSet author="soldatov" id="JIRA-002-01">
<sql>
<![CDATA[
INSERT INTO public.pg_parallel_task (name, threads_count)
VALUES ('JIRA-002', 5);
INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement)
SELECT
'JIRA-002' task_name,
FORMAT(
'UPDATE pgbench_accounts
SET abalance = 42
WHERE filler IS NULL
AND aid >= ''%s'' AND aid <= ''%s'';',
MIN(d.aid),
MAX(d.aid)
) sql_statement
FROM
(
SELECT
aid,
ntile(10000) over (
order by
aid
) part
FROM
pgbench_accounts
WHERE
filler IS NULL
) d
GROUP BY
d.part;
]]>
</sql>
</changeSet>
<changeSet author="soldatov" id="JIRA-002-02">
<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
<arg value="pgconfdb"/><arg value="JIRA-002"/>
</executeCommand>
</changeSet>
<changeSet author="soldatov" id="JIRA-003-01">
<sql>
<![CDATA[
INSERT INTO pg_parallel_task (name, threads_count)
VALUES ('JIRA-003', 2);
INSERT INTO pg_parallel_task_statements (task_name, sql_statement)
SELECT
'JIRA-003' task_name,
'VACUUM FULL ANALYZE pgbench_accounts;' sql_statement;
INSERT INTO pg_parallel_task_statements (task_name, sql_statement)
SELECT
'JIRA-003' task_name,
'VACUUM FULL ANALYZE pgbench_branches;' sql_statement;
]]>
</sql>
</changeSet>
<changeSet author="soldatov" id="JIRA-003-02">
<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
<arg value="testdatabase"/><arg value="JIRA-003"/>
</executeCommand>
</changeSet>
-- SQL part
ALTER TABLE pgbench_accounts ADD COLUMN account_number text;
INSERT INTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-004', 5);
INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement)
SELECT 'JIRA-004' task_name,
FORMAT('UPDATE pgbench_accounts
SET account_number = aid::text || filler
WHERE aid >= ''%s'' AND aid <= ''%s'';',
MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid,
ntile(50000) over (order by device_version_guid) part
FROM pgbench_accounts) d
GROUP BY d.part;
SELECT * FROM func_run_parallel_task('testdatabase','JIRA-004');
Автор: EasyGrow