Продолжая тему использования динамического SQL, я хочу рассказать об одном полезном инструменте, реализованном мной в рамках одного из текущих проектов. Речь пойдет о дубликатах в справочниках. Под дубликатами, в этой статье, я понимаю записи, внесенные в справочники повторно, например в результате орфографической ошибки при вводе наименования.
Суть предлагаемого мной подхода в том, чтобы дать возможность объявить любую запись справочника дубликатом уже существующей. В результате, дублирующая запись будет удалена, а все ссылки на нее будут исправлены таким образом, чтобы они стали ссылаться на правильную запись. Также, очень важно, предоставить возможность отката таких изменений, на случай, если они сделаны по ошибке.
Начнем с таблиц для хранения служебных данных:
create table mg_table (
table_name varchar(100) not null,
pk_name varchar(100) not null,
primary key(name)
);
create sequence mg_action_seq;
create table mg_action (
id bigint default nextval('mg_action_seq') not null,
table_name varchar(100) not null references mg_table(name),
old_id varchar(50) not null,
new_id varchar(50) not null,
action_time timestamp default now() not null,
primary key(id)
);
create sequence mg_action_detail_seq;
create table mg_action_detail (
id bigint default nextval('mg_action_detail_seq') not null,
action_id bigint not null references mg_action(id),
table_name varchar(100) not null,
pk_name varchar(100) not null,
column_name varchar(100) not null,
obj_id varchar(50) not null,
primary key(id)
);
Здесь, таблица mg_table содержит данные о таблицах, для которых поддерживается слияние дубликатов. Требование к таким таблицам единственное — первичный ключ должен состоять из одного числового или строкового столбца. Нам не придется беспокоиться об этой таблице, поскольку она будет заполняться автоматически. Таблицы mg_action и mg_action_detail будут содержать данные, необходимые для отката изменений.
Определим пару вспомогательных функций:
create or replace function mg_get_pk_column(in p_table varchar) returns varchar
as $$
declare
l_pk text;
l_cn int;
begin
select max(f.name), count(*) as name into l_pk, l_cn
from ( select ps_array_to_set(a.conkey) as nn
from pg_constraint a, pg_class b
where b.oid = a.conrelid
and a.contype = 'p'
and b.relname = lower(p_table) ) c,
( select d.attname as name, d.attnum as nn
from pg_attribute d, pg_class e
where e.oid = d.attrelid
and e.relname = lower(p_table) ) f
where f.nn = c.nn;
if l_cn <> 1 then
raise EXCEPTION 'Can''t support composite PK';
end if;
return l_pk;
end;
$$ language plpgsql;
create or replace function mg_add_dict(in p_table varchar) returns void
as $$
declare
l_pk text;
l_sql text;
begin
l_pk := mg_get_pk_column(p_table);
perform 1
from mg_table where table_name = lower(p_table);
if not FOUND then
l_sql :=
'create table mg_' || lower(p_table) || ' ' ||
'as select * from ' || lower(p_table) || ' limit 0';
execute l_sql;
l_sql :=
'alter table mg_' || lower(p_table) || ' ' ||
'add primary key(' || l_pk || ')';
execute l_sql;
insert into mg_table(table_name, pk_name) values (lower(p_table), l_pk);
end if;
end;
$$ language plpgsql;
Функция mg_get_pk_column выполняет известный нам по предыдущей статье запрос, возвращающий имя столбца первичного ключа, а также осуществляет проверку на то, что первичный ключ состоит из одного столбца.
Функция mg_add_dict, помимо заполнения mg_table, создает таблицу с префиксом 'mg_', в которой будут сохраняться удаленные дубликаты, на тот случай, если изменение понадобиться откатить. По своей структуре, эта таблица полностью аналогична исходной.
Переходим к самому интересному:
create or replace function mg_merge(in p_table varchar, in p_old varchar, in p_new varchar) returns void
as $$
declare
l_action int;
l_pk text;
l_sql text;
tabs record;
begin
perform mg_add_dict(p_table);
select pk_name into l_pk
from mg_table where table_name = lower(p_table);
l_action := nextval('mg_action_seq');
insert into mg_action(id, table_name, old_id, new_id)
values (l_action, p_table, p_old, p_new);
l_sql :=
'insert into mg_' || lower(p_table) || ' ' ||
'select * from ' || lower(p_table) || ' ' ||
'where ' || l_pk || ' = ''' || p_old || '''';
execute l_sql;
for tabs in
select b.relname as table_name,
d.attname as column_name
from pg_constraint a, pg_class b, pg_class c,
pg_attribute d
where a.contype = 'f'
and b.oid = a.conrelid
and c.oid = a.confrelid
and c.relname = lower(p_table)
and d.attrelid = b.oid
and a.conkey[1] = d.attnum
loop
l_sql :=
'insert into mg_action_detail(action_id, table_name, column_name, obj_id, pk_name) ' ||
'select ' || l_action || ', ''' || tabs.table_name || ''', ''' ||
tabs.column_name || ''', id, ' ||
'''' || mg_get_pk_column(tabs.table_name::varchar) || ''' ' ||
'from ' || lower(tabs.table_name) || ' ' ||
'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';
execute l_sql;
l_sql :=
'update ' || lower(tabs.table_name) || ' ' ||
'set ' || lower(tabs.column_name) || ' = ''' || p_new || ''' ' ||
'where ' || lower(tabs.column_name) || ' = ''' || p_old || '''';
execute l_sql;
end loop;
l_sql :=
'delete from ' || lower(p_table) || ' where ' || l_pk || ' = ''' || p_old || '''';
execute l_sql;
end;
$$ language plpgsql;
create or replace function mg_merge(in p_table varchar, in p_old bigint, in p_new bigint)
returns void
as $$
declare
begin
perform mg_merge(p_table, p_old::varchar, p_new::varchar);
end;
$$ language plpgsql;
Эта функция выполняет выполняет поиск всех таблиц, ссылающихся на p_table при помощи внешнего ключа и заменяет в них p_old на p_new, сохраняя данные, необходимые для отката измений. Поскольку, чаще всего, столбец первичного ключа будет числовым, для удобства, перегружена функция mg_merge(varchar, bigint, bigint).
Осталось разработать функцию отката изменений:
create or replace function mg_undo() returns void
as $$
declare
l_action int;
l_old varchar(50);
l_table text;
l_sql text;
tabs record;
begin
select max(id) into l_action
from mg_action;
if l_action is null then
raise EXCEPTION 'Can''t UNDO';
end if;
select table_name, old_id into l_table, l_old
from mg_action
where id = l_action;
l_sql :=
'insert into ' || l_table || ' ' ||
'select * from mg_' || l_table || ' ' ||
'where id = ''' || l_old || '''';
execute l_sql;
for tabs in
select table_name,
pk_name,
column_name
from mg_action_detail
where action_id = l_action
group by table_name, pk_name, column_name
loop
l_sql :=
'update ' || tabs.table_name || ' ' ||
'set ' || tabs.column_name || ' = ''' || l_old || ''' ' ||
'where ''*'' || ' || tabs.pk_name || ' in (' ||
'select ''*'' || obj_id from mg_action_detail '||
'where table_name = ''' || tabs.table_name || ''' ' ||
'and action_id = ' || l_action || ') ';
execute l_sql;
end loop;
l_sql :=
'delete from mg_' || l_table || ' where id = ''' || l_old || '''';
execute l_sql;
delete from mg_action_detail where action_id = l_action;
delete from mg_action where id = l_action;
end;
$$ language plpgsql;
Изменения будут откатываться в порядке строго противополложном их созданию. По этой причине, никакие аргументы для mg_undo передавать не требуется.
Посмотрим, как все это работает. Создадим справочные таблицы:
create sequence city_seq;
create table city (
id bigint default nextval('city_seq') not null,
name varchar(100) not null,
primary key(id)
);
create sequence street_seq;
create table street (
id bigint default nextval('street_seq') not null,
city_id bigint not null references city(id),
name varchar(100) not null,
primary key(id)
);
create sequence address_seq;
create table address (
id bigint default nextval('address_seq') not null,
street_id bigint not null references street(id),
house varchar(10) not null,
apartment varchar(10) not null,
primary key(id)
);
… и наполним их тестовыми данными:
insert into city(id, name) values (1, 'Казань');
insert into street(id, city_id, name) values (1, 1, 'Победы');
insert into street(id, city_id, name) values (2, 1, 'Победы проспект');
insert into address(id, street_id, house, apartment) values (1, 1, '10', '1');
insert into address(id, street_id, house, apartment) values (2, 2, '10', '2');
Теперь, для того чтобы «слить» улицу 'Победы проспект' с улицей 'Победы', достаточно выполнить следующую команду:
select mg_merge('street', 2, 1);
Функция mg_undo(), как и говорилось выше, откатит изменения.
Автор: GlukKazan