Как мы переехали с Oracle на PostgreSQL в нагруженном сервисе без даунтайма

в 7:00, , рубрики: Без рубрики
График 99-го перцентиля времени ответа основного user-facing-сервиса: жизнь до, в процессе и после переезда. Смотрите, насколько стало лучше!

График 99-го перцентиля времени ответа основного user-facing-сервиса: жизнь до, в процессе и после переезда. Смотрите, насколько стало лучше!

Всем привет! Я Сергей, работаю в B2B-команде Яндекс Маркета последние 3,5 года. Как уже понятно из заголовка, сейчас я вам расскажу про yet-another-миграцию с базы на базу, которая началась в середине 2021 года и заняла почти год. Получается, мемуары.

Вас ждёт рассказ о том, как мы:

  • несколько месяцев чинили тесты и делали трансформер;

  • десятки раз переливали данные;

  • чинили баги незаметно для пользователей;

  • заставили сервис работать на PostgreSQL быстрее, чем он работал на Oracle.


Что было на старте

Есть в Яндекс Маркете один сервис, который отвечает за взаимодействие с партнёрами — например, с продавцами или поставщиками. Регистрация, работа с ассортиментом, аналитика, обработка заказов — в общем, в нём собрано всё, с чем приходится сталкиваться партнёрам каждый день. 

По сути этот сервис представляет из себя несколько больших API (бэк для партнёрского UI, API для партнёров, API для смежных систем), несколько сервисов для фоновых задач (всякие финансовые расчёты, генерация отчётов, интеграция со смежными системами) и админка. Фактически это монолит, который живёт на общей для всех своих компонентов базе данных.

Так исторически сложилось, что базой данных изначально был Oracle. Почему? Просто потому, что на момент зарождения сервиса он уже был куплен, а мы взяли то, что было. Сам по себе Oracle — это мощная база. В этом плане вопросов к ней не было, и можно было бы жить на ней и дальше, если бы не одна проблема — стоимость поддержки.

Во-первых, Oracle жил на дорогих железных тачках, без виртуализации. Во-вторых, Oracle DBA — достаточно дорогие ребята и со временем на всю компанию их стало не хватать. Ну и в-третьих, стоимость платной поддержки Oracle, которая является частью приобретённой лицензии, со временем стала огромна.

Вопрос переезда с Oracle встал ребром.

А тем временем популярность PostgreSQL росла как на дрожжах, в том числе и в Яндексе. Поскольку во главу угла была поставлена стоимость поддержки, то решили переезжать на Managed Service for PostgreSQL в Yandex Cloud.

Итак, обрисую… 

Стартовые условия и ограничения

  • Около 1,5 TiB данных в Oracle, один шард, более 700 таблиц, более 250 view, немножко stored procedure.

  • На продакшене в будни — до 3,5k RPS в API, большая часть которых так или иначе обращаются к базе.

  • Пара сотен регулярных фоновых задач.

  • Монолитный сервис-дедушка на Spring 5.1.

  • Для работы с базой используется Spring JDBC, никакого Hibernate.

  • Полноценная тестовая инсталляция сервиса со своими данными и очень неплохое покрытие интеграционными, функциональными и юнит-тестами (это оказалось крайне важным подспорьем).

  • Команда более чем из 20 человек, которая продолжает наваливать новые фичи каждый день.

  • Я и один выделенный в помощь стажёр на старте.

Верхнеуровнево схема сервиса выглядела так, ничего интересного:

Как мы переехали с Oracle на PostgreSQL в нагруженном сервисе без даунтайма - 2

Когда мне вручили эту задачку переезда, была надежда, что можно разбить данные на кластеры и переехать по частям за много маленьких миграций. Провели анализ связей таблиц друг с другом, построив граф таблиц по join из логов запросов базы и по foreign key из схем таблиц. И оказалось, что перед нами большой запутанный клубок, в котором всё завязано транзитивно на всё (за небольшими исключениями). Напомню, исходно было более 700 таблиц, а в основном клубке оказалось более 600. Когда взглянули на это всё (не без слёз), стало понятно, что есть такого слона по частям мы будем много лет.

Вопрос с миграцией по кускам положили на полку и попробовали зайти с другого конца. Стали думать, как бы мы стали переливать данные из старой базы в новую — эту проблему в любом случае пришлось бы решать. Появились такие варианты:

  • Какой-то самописный код дёргает данные из одной базы и вставляет в другую. Таким образом однажды уже была проделана миграция в Маркете, в одном из других сервисов, однако исходная ситуация там была несколько проще (не было user-facing-сервисов).

  • oracle_fdw. На первый взгляд, выглядело неплохо: live-данные из Oracle, но в PostgreSQL. Однако при наличии огромного числа различных join в запросах этот вариант становится слишком неудобным. Ну и всё же это не миграция как таковая. 

  • Ora2Pg. В своём названии содержит буквально то, что нам надо. Однако это one-shot-тулза, удобная в том случае, когда можно сделать stop the world. А в нашем случае это неприемлемо.

  • CDC (например, с помощью Oracle GoldenGate). Тут уже стало интересней. Если добиться низкого лага поставки изменений, то можем получить in-sync-базы, пригодные для продакшн-нагрузки.

 Смотрим на эти варианты, и возникает...

Идея

У нас ведь полно тестов, которые гоняются на базе H2 в режиме совместимости с Oracle. В дополнение к H2 у нас есть самопальный трансформер запросов, который еще немного адаптирует исходные запросы к синтаксису H2. А что, если взять CDC, похожую идею с трансформером, и адаптировать запросы к PostgreSQL на лету? Тесты в любом случае придётся править — почему бы не попробовать?

Составляем план:

  1. Находим и убираем все старые мусорные процессы и данные.

  2. Большие таблицы (в нашем случае выбрали отсечку более 10 GiB — просто потому что) пытаемся увозить в другие хранилища — в основном ими оказались динамические таблицы YTsaurus.

  3. По исходнику из Oracle генерируем целевую схему в PostgreSQL.

  4. Делаем аналогичный трансформер запросов из синтаксиса Oracle в PostgreSQL и переводим тесты c H2 на embedded PostgreSQL.

  5. Разворачиваем инфраструктуру для CDC, льём данные.

  6. Переключаем запросы на чтение в тестовой среде, чиним ошибки.

  7. Аналогично поступаем с запросами на запись.

  8. Повторяем все пункты для продакшена.

  9. Profit?

План вызывал достаточное количество скепсиса и сомнений со стороны руководства. Но, как вы понимаете, раз вы читаете эту статью, он как-то сработал.

Воплощаем план в жизнь

По плану видно, что многие штуки вначале прекрасно параллелятся и не мешают друг другу. Однако нас по-прежнему было полтора землекопа. Поэтому начать решили с чего-то осязаемого — с тестов.

Тесты и трансформер

Поскольку базовая идея состояла в использовании трансформера запросов, то, очевидно, он же должен использоваться и в тестах, которые стоит перевести на PostgreSQL вместо H2. На начало проекта у нас было около 15 тысяч тестов, которыми была покрыта бóльшая часть основной логики. Для переноса тестов в таком объёме потребовалось много рук, которые вскоре нашлись в виде ещё нескольких стажёров.

Для генерации целевой схемы в PostgreSQL мы сперва сдампили схемы всех объектов из Oracle. Использовали всё, что было под рукой, в частности Ora2Pg, DataGrip, Oracle SQL Developer. После чего самописными скриптами на Python привели их всех к состоянию «liquibase в PostgreSQL должен прожевать». Кстати, при конвертации типов нам помогла статья про сопоставление типов данных и гайд по миграции

Теперь про трансформер. По факту это просто преобразование исходного SQL-запроса на лету. Базовая обвязка выглядит примерно так (код упрощён, оставлена суть):

Код обвязки трансформера
@FunctionalInterface
public interface Transformer {
    String transform(String sql);
}

/** Этим обернуть датасорс приложения */
public static DataSource wrap(DataSource dataSource, Transformer transformer) {
    return new DataSource() {
        @Override
        public Connection getConnection() throws SQLException {
            return wrap(dataSource.getConnection(), transformer);
        }

        @Override
        public Connection getConnection(String username, String password) throws SQLException {
            return wrap(dataSource.getConnection(username, password), transformer);
        }

        // остальные методы проксированы as is...
        // конечно, был использован существующий базовый прокси-класс, но для понимания и так сойдёт
    };
}

public static Connection wrap(Connection connection, Transformer transformer) {
    var handler = new ConnectionProxy(connection, transformer);
    return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), new Class<?>[]{Connection.class}, handler);
}

private record ConnectionProxy(Connection proxee, Transformer transformer) implements InvocationHandler {
    private static final Set<String> METHODS_FOR_PREPARED_STATEMENT = Set.of(
        "prepareCall",
        "prepareStatement"
    );
    private static final String METHODS_FOR_STATEMENT = "createStatement";

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (METHODS_FOR_PREPARED_STATEMENT.contains(method.getName())) {
            var sql = args[0].toString();
            args[0] = transformer.transform(sql);
            return method.invoke(proxee, args);
        }
        var result = method.invoke(proxee, args);
        if (result instanceof Statement statement && METHODS_FOR_STATEMENT.equals(method.getName())) {
            var handler = new StatementProxy(statement, transformer);
            return Proxy.newProxyInstance(Statement.class.getClassLoader(), new Class<?>[]{Statement.class}, handler);
        }
        return result;
    }
}

private record StatementProxy(Statement proxee, Transformer transformer) implements InvocationHandler {
    private static final Set<String> METHODS_WITH_QUERY = Set.of(
        "addBatch",
        "execute",
        "executeLargeUpdate",
        "executeQuery",
        "executeUpdate"
    );

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (METHODS_WITH_QUERY.contains(method.getName())) {
            var sql = args[0].toString();
            args[0] = transformer.transform(sql);
        }
        return method.invoke(proxee, args);
    }
}

Новый трансформер для Oracle спустя какое-то время выглядел примерно так:

public enum OracleToPostgresTransformer implements Transformer {
    INSTANCE;

    // Map.entry использован просто как пара
    private static final List<Map.Entry<Pattern, String>> PATTERNS = Stream.of(
        Map.entry("\browid\b", "ctid"),
        Map.entry("\bsys(?:date|timestamp)\b", "current_timestamp"),
        Map.entry("\bas clob\b", "as text"),
        Map.entry("\bas number\b", "as numeric"),
        Map.entry("\bminus\b", "except"), // select minus select
        Map.entry("n?varchar2\((?<v1>\d+)(?:\s+char)?\)", "varchar(${v1})"),
        Map.entry("\blistagg\b", "string_agg"), // + below
        Map.entry(
            "\) within group\s*\(\s*order by (?<v1>[^)]+)\)", // listagg(...) within group (order by ...)
            " order by ${v1})" // string_agg(... order by ...)
        ),
        Map.entry("(?<seq>[\w.]+)\.nextval", "nextval('${seq}')"),
        Map.entry("rownum(?: as)? (?<v1>\w+)", "row_number() over() as ${v1}"),
        // ... и ещё немного специфичных для нас преобразований
    ).map(e -> Map.entry(makeRegexp(e.getKey()), e.getValue())).toList();

    private static Pattern makeRegexp(String regex) {
        return Pattern.compile(
                regex.replaceAll("\s+", "\\s+"), // to simplify writing spaces in patterns
                Pattern.MULTILINE | Pattern.CASE_INSENSITIVE | Pattern.UNICODE_CASE | Pattern.DOTALL
        );
    }

    @Override
    public String transform(String sql) {
        for (var pattern : PATTERNS) {
            sql = pattern.getKey().matcher(sql).replaceAll(pattern.getValue());
        }
        return sql;
    }
}

Поверх также было добавлено кэширование (просто Guava-кэш), метрики (размер кэша, число успешных запросов, число ошибок), игнорирование отдельных запросов. 

Получив схему и трансформер с очевидными преобразованиями, мы запускали тесты на PostgreSQL и пытались их озеленить. Первой в чат влетела оракловая dual — её сделали так: create view dual as select 'x'::varchar as dummy (подсмотрели в Orafce). Но в большинстве случаев тесты удавалось озеленить достаточно просто: мы правили исходный запрос, приводя его к стандартному виду вроде fetch first :limit rows вместо where rownum <= :limit. Иногда мы просто добавляли новые преобразования в трансформер. Для переписывания части запросов помогла хорошая статья

С каждым изменением могли зеленеть сразу десятки, а то и сотни тестов — это внушало оптимизм. Часть запросов не получалось ни переписать, ни обойти трансформером, поэтому пришлось местами сделать две версии запроса, но об этом расскажу чуть позже.

Спустя несколько месяцев (а вы как думали?) рутинной работы мы озеленили последний тест. Теперь у нас был монстр Франкенштейна, работающий на Oracle в продакшене и на PostgreSQL в тестах. Выглядел он примерно так:

Как мы переехали с Oracle на PostgreSQL в нагруженном сервисе без даунтайма - 3

Ок, тесты позеленели, что дальше?

Про выбрасывание неактуального кода писать особо нечего: с помощью знатоков кодобазы мы просто разметили, что можно выкинуть — и выкинули.

Про перенос больших таблиц, честно говоря, тоже. Их было очень мало, и они все были достаточно независимые. Разве что ещё раз упомяну динамические таблицы YTsaurus: в существующих сценариях для тех таблиц они отлично подошли.

Пришла пора заняться перевозкой данных.

Льём данные

Напомню, для переливки данных мы остановились на варианте с CDC. В качестве инструмента взяли Oracle GoldenGate (в дальнейшем — GG). Инструмент платный, но у него был деморежим, и нам его хватило.

С настройкой GG нам помогли наши DBA. В целом у GG неплохая документация по всем аспектам настройки, в том числе по вопросам базы-источника, базы-приёмника, ограничений на данные и других моментов. Одним из нюансов подготовки данных оказались требования GG к уникальности строк. Пришлось местами добавлять в таблицы псевдоключи.

Спустя несколько итераций была выбрана самая простая схема работы GG в плане обслуживания:

  • Одна виртуалка.

  • Два пользователя в системе — Oracle и PostgreSQL, а между ними общая директория dirdat для trail-файлов с правами на чтение и запись для обоих пользователей.

  • Под пользователем Oracle установлен и настроен GG в режиме extract.

  • Под пользователем PostgreSQL установлен и настроен GG в режиме replicat.

  • Oracle GG читал изменения из Oracle и складывал в dirdat свои trail-файлы.

  • PostgreSQL GG читал из dirdat trail-файлы и накатывал изменения на целевую PostgreSQL.

Важный нюанс подключения к PostgreSQL со стороны репликации: чтобы PostgreSQL не валидировал и не ругался на нарушения foreign key, достаточно выставить в параметрах соединения параметр session_replication_role=replica.

Переливка данных с нуля представляет из себя первоначальный снапшот данных, заливку в целевую базу, после чего — репликацию изменений, следующих за снапшотом. GG умеет делать первоначальную заливку, но этот процесс для большого объёма данных супермедленный и неудобный. Поэтому мы решили сделать свой инструмент для копирования, назовём его Copier.

Для его реализации мы воспользовались oracle_fdw. Copier готовил foreign-таблицы в целевой базе и, по сути, делал insert into t_dst (cols) select cols from t_src. Для больших таблиц процесс был чуть посложнее, с параллельным копированием блоков, но идейно — то же самое. В такой схеме данные заливались сразу в целевую базу без всяких посредников.

Для управления связкой Copier и GG была написана пачка bash-скриптов, и началась работа по подбору оптимального сочетания extract и replicat. Суть в том, что при конфигурации в лоб репликация очень быстро начинала захлёбываться, запинаясь о большие транзакции. Поэтому пришлось сперва подбирать количество extract, чтобы они всегда успевали вычитывать изменения из Oracle в trail-файлы. А потом подбирать разбивку каждого extract на replicat, чтобы они успевали накатывать все изменения из trail-файлов. В процессе подбора итоговой конфигурации данные для некоторых таблиц переливались десятки раз. Без скриптов этот процесс занимал поначалу часы, а со скриптами — минуты.

Попутно пришлось переделать несколько фоновых задач, чтобы избавиться от больших объёмов изменений. Например, одним из простых способов оказалась замена update t set c=v where ... на update t set c=v where ... and c!=v. И ещё, где было возможно, переделывали большие транзакции на обработку по батчам.

Однако некоторые фоновые задачи генерировали такой объём изменений, что никакой extract уже не поспевал. Обычно это был регулярный импорт каких-нибудь больших данных. Для таких случаев мы делали копии задач, которые писали сразу в PostgreSQL. И да, задачи мы не переписывали, а просто делали копию бина и на вход инжектили DataSource с пресловутым трансформером.

В итоге лаг данных в PostgreSQL относительно Oracle на такой схеме гулял в пределах от 0 до 10 секунд максимум. После всех этих изменений картина стала примерно такой:

Как мы переехали с Oracle на PostgreSQL в нагруженном сервисе без даунтайма - 4

Как видно, стало ещё лучше, чем было — правда же? Настала пора зафиналить эту историю и переключаться на PostgreSQL.

Подготовка к переключению

Итак, теперь у нас есть тесты на PostgreSQL и относительная уверенность, что бóльшая часть сервиса не поляжет в ошибках. Есть данные, которые отставали от Oracle не более 10 секунд. Теперь надо как-то обезопасить себя от потенциальных ошибок, которые не покрыты тестами.

Поскольку у нас Spring, я решил сделать аналог AbstractRoutingDataSource на стероидах. Хотелось не просто «либо первый DataSource, либо второй», а разные режимы роутинга запросов. В итоге список желаемых режимов получился таким:

/** уровни роутинга запросов по возрастанию — от самых безобидных до более инвазивных */
public enum QueryRouting {
    /** ходим только в основной датасорс */
    DISABLED,

    /**
     * сперва повторяем все читающие запросы во второй датасорс, 
     * но полностью игнорируем результат,
     * далее вызываем основной датасорс и возвращаем его результат как обычно
     */
    NOOP_REPEAT,

    /**
     * повторяем все читающие запросы, не требующие транзакции во второй датасорс, 
     * и возвращаем его результат
     * в случае любых ошибок делаем fallback на основной датасорс
     */
    WITH_FALLBACK,

    /**
     * ходим только во второй датасорс
     */
    ENABLED
}

Идея была в том, чтобы включить в рантайме сперва режим NOOP_REPEAT, после чего найти и починить все ошибки. Потом включить WITH_FALLBACK и починить то, что не всплыло раньше. И наконец включить ENABLED (билет в один конец). Чтобы это заработало, пришлось написать пачку кастомных классов:

  • OracleToPostgresJdbcTemplate, который знает про второй датасорс и поддерживает все режимы роутинга;

    OracleToPostgresJdbcTemplate
    /**
     * JdbcTemplate, который может переключаться на второй датасорс в рамках selectов вне транзакций.
     *
     * @implNote использует ThreadLocal-копию флага с роутингом, чтобы на один логический запрос в темплейт
     * был один запрос в Supplier. Это нужно, так как JdbcTemplate в рамках запроса вызывает obtainDataSource,
     * а потом ещё раз вызывает getDataSource, когда закрывает соединение.
     * <p>
     * на данный момент не поддерживает Spring 5.3+, см. {@link #checkSpringVersion()}
     */
    public class OracleToPostgresJdbcTemplate extends JdbcTemplate {
        private static final Logger log = LoggerFactory.getLogger(OracleToPostgresJdbcTemplate.class);
        static final ThreadLocal<QueryRouting> ROUTING = new NamedThreadLocal<>("routing");
        private static final ThreadLocal<Connection> MANUAL_COMMIT = new NamedThreadLocal<>("manualCommit");
        private static final Pattern ROUTING_DESIRED_WORDS = Pattern.compile(
                "\bselect\b",
                Pattern.CASE_INSENSITIVE | Pattern.DOTALL
        );
        private static final Pattern ROUTING_FORBIDDEN_WORDS = Pattern.compile(
                Stream.of(
                        "nextval", // select from seq.nextval, etc
                        "create",
                        "insert",
                        "update",
                        "delete",
                        "drop",
                        "truncate",
                        "begin",
                        "commit",
                        "function",
                        "procedure",
                        "call",
                        "execute",
                        "invoke",
                        "do"
                ).collect(Collectors.joining("|", "\b(?:", ")\b")),
                Pattern.CASE_INSENSITIVE
        );
    
        private final DataSource newDataSource;
        private final Supplier<QueryRouting> dataSourceRouting;
        private final AtomicBoolean routingDataSourceExceptionTranslatorRequired = new AtomicBoolean(true);
    
        /**
         * @param oldDataSource     основной, обычный датасорс, например Oracle
         * @param newDataSource     датасорс, на который мы хотим переключать часть запросов, например PostgreSQL
         * @param dataSourceRouting будет вызван перед каждым подходящим под критерии
         *                          роутинга запросом с sql в параметре, поэтому должен работать
         *                          быстро, кеширование и прочее — на вашей совести,
         *                          все ошибки будут просто залогированы, поэтому о них можно не заботиться
         */
        public OracleToPostgresJdbcTemplate(
                DataSource oldDataSource,
                DataSource newDataSource,
                Supplier<QueryRouting> dataSourceRouting
        ) {
            checkSpringVersion();
    
            super.setDataSource(Objects.requireNonNull(oldDataSource, "regular DataSource"));
            this.newDataSource = Objects.requireNonNull(newDataSource, "new DataSource");
            Objects.requireNonNull(dataSourceRouting);
            this.dataSourceRouting = () -> {
                try {
                    return dataSourceRouting.get();
                } catch (RuntimeException e) {
                    log.error("Failed to get query routing flag, got {}", e, e);
                    return null;
                }
            };
    
            // нам важно инициировать транслятор ошибок сперва к основному датасорсу,
            // чтобы потом корректно встроить транслятор для нового датасорса
            super.setLazyInit(false);
        }
    
        /**
         * previously all requests were routed to one of public execute overloads,
         * it was changed by addition of queryForStream method
         */
        static void checkSpringVersion() {
            Stream.of(JdbcTemplate.class.getDeclaredMethods())
                    .filter(m -> "execute".equals(m.getName()) && Modifier.isPrivate(m.getModifiers()))
                    .findFirst()
                    .ifPresent(m -> {
                        throw new UnsupportedOperationException("Spring 5.3+ is not supported yet," +
                                " JdbcTemplate impl has diverged from expected by this class");
                    });
        }
    
        @Override
        public final void setLazyInit(boolean lazyInit) {
            log.error("Setting lazy init is not supported");
            // do nothing to enforce correct getExceptionTranslator behavior
        }
    
        @Override
        public final void setExceptionTranslator(SQLExceptionTranslator exceptionTranslator) {
            log.error("Setting shared translator is usually not what you really want");
            // do nothing to enforce default SQLErrorCodeSQLExceptionTranslator that is bound to regularDataSource
        }
    
        /**
         * @implNote {@link org.springframework.jdbc.support.JdbcAccessor#setExceptionTranslator(SQLExceptionTranslator)} и
         * {@link org.springframework.jdbc.support.JdbcAccessor#getExceptionTranslator()} потокобезопасны,
         * но {@link AbstractFallbackSQLExceptionTranslator#setFallbackTranslator(SQLExceptionTranslator)} — нет,
         * поэтому единожды лочимся ради него, а также чтобы много потоков не полезли одновременно в базу,
         * как это происходит в {@link SQLErrorCodeSQLExceptionTranslator#setDataSource(DataSource)}
         * @see #setLazyInit(boolean)
         */
        @Nonnull
        @Override
        public final SQLExceptionTranslator getExceptionTranslator() {
            @Nonnull var et = super.getExceptionTranslator();
            if (routingDataSourceExceptionTranslatorRequired.get()) {
                // синкаемся на this аналогично super.getExceptionTranslator
                synchronized (this) {
                    // пытаемся инициализировать транслятор ошибок единожды,
                    // чтобы не пытаться делать это, например, при ошибках доступа к experimentDataSource
                    if (routingDataSourceExceptionTranslatorRequired.getAndSet(false)) {
                        var etExp = appendExceptionTranslator(et, newDataSource);
                        if (etExp != null) {
                            super.setExceptionTranslator(etExp);
                            et = etExp;
                        }
                    }
                }
            }
            return et;
        }
    
        /**
         * @return {@code null} if current was modified, new translator with fallback to current if not
         * @apiNote модифицирует переданный транслятор!
         */
        @Nullable
        public static SQLExceptionTranslator appendExceptionTranslator(
                @Nonnull SQLExceptionTranslator current,
                DataSource additionalDataSource
        ) {
            var additional = new SQLErrorCodeSQLExceptionTranslator(additionalDataSource);
            if (current instanceof AbstractFallbackSQLExceptionTranslator) {
                // встраиваемся в середину цепочки, чтобы сперва транслировать ошибки переданным транслятором
                var currentFt = (AbstractFallbackSQLExceptionTranslator) current;
                var currentFet = currentFt.getFallbackTranslator();
                if (currentFet != null) {
                    additional.setFallbackTranslator(currentFet);
                }
                currentFt.setFallbackTranslator(additional);
                return null;
            } else {
                additional.setFallbackTranslator(current);
                return additional;
            }
        }
    
        @Override
        public final void setDataSource(DataSource dataSource) {
            if (dataSource != super.getDataSource()) {
                log.error("Setting another datasource (implicitly) is not what you want, only constructor is supported");
            }
            // do nothing to avoid implicit resetting
        }
    
        @Override
        public final DataSource getDataSource() {
            return obtainDataSource();
        }
    
        @Nonnull
        @Override
        protected final DataSource obtainDataSource() {
            return isAnyRoutingEnabled()
                    ? newDataSource
                    : Objects.requireNonNull(super.getDataSource(), "should be guaranteed by ctor and setter");
        }
    
        /**
         * @see #obtainDataSource()
         */
        private <T> T tryWithRouting(Object sqlProvider, Supplier<T> action) {
            var r = ROUTING.get();
            UnaryOperator<QueryRouting> rSetter = OracleToPostgresJdbcTemplate::setRouting;
            Runnable rRemover = ROUTING::remove;
            if (r == null) {
                // если есть открытая транзакция, то по возможности продолжаем запросы
                // с тем же базовым значением роутинга до конца транзакции
                var rTx = OracleToPostgresDataSourceTransactionManager.ROUTING.get();
                // на один запрос всего один поход за флагом
                var rFlag = dataSourceRouting.get();
                // при полных противоположностях выбираем флаг в ущерб транзакции, см DatasourceRouting.FORCE_***
                if (rTx == null
                        || (rTx == QueryRouting.ENABLED && rFlag == QueryRouting.DISABLED)
                        || (rTx == QueryRouting.DISABLED && rFlag == QueryRouting.ENABLED)) {
                    r = rFlag;
                } else {
                    r = rTx;
                }
            } else {
                // если у нас вложенный запрос, просто поддерживаем текущий роутинг
                rSetter = UnaryOperator.identity();
                rRemover = () -> {
                };
            }
            if (r == null) {
                r = QueryRouting.DISABLED;
            }
    
            // тюним роутинг под конкретные виды запросов
            @Nullable var sql = (sqlProvider instanceof SqlProvider)
                    ? ((SqlProvider) sqlProvider).getSql()
                    : null;
            if (r == QueryRouting.WITH_FALLBACK && isDataConsistencyRequired()) {
                // все запросы в rw-датасорс, под транзакцией или при кеширующих запросах
                // идут только копией во избежание неконсистентности и гонок
                r = QueryRouting.NOOP_REPEAT;
            }
            if ((r == QueryRouting.NOOP_REPEAT || r == QueryRouting.WITH_FALLBACK) && isUnsafeToRepeat(sql)) {
                // все неподходящие запросы мимо, проверка на тип запроса самая тяжелая, поэтому стоит в самом конце
                r = QueryRouting.DISABLED;
            }
    
            switch (rSetter.apply(r)) {
                case NOOP_REPEAT:
                    // в этом случае просто дублируем запрос, но результат возвращаем только из оригинального датасорса
                    try {
                        action.get();
                    } catch (RuntimeException e) {
                        // просто логируем ошибки
                        log.error("Failed to execute query [{}], got {}", sql, e, e);
                    } finally {
                        rRemover.run();
                    }
                    return action.get();
                case WITH_FALLBACK:
                    // полноценно пробуем новый датасорс, но в случае любых ошибок — сразу фолбек на обычный
                    try {
                        return action.get();
                    } catch (RuntimeException e) {
                        log.error("Failed to execute query [{}], got {}", sql, e, e);
                        rRemover.run();
                        return action.get();
                    } finally {
                        rRemover.run();
                    }
                case ENABLED:
                case DISABLED:
                default:
                    try {
                        return action.get();
                    } finally {
                        rRemover.run();
                    }
            }
        }
    
        private boolean isDataConsistencyRequired() {
            // тут было ещё несколько специфичных нам проверок
            return TransactionSynchronizationManager.isActualTransactionActive();
        }
    
        private static boolean isUnsafeToRepeat(@Nullable String sql) {
            return sql == null // неизвестно что, скорее всего callback, повторять такое опасно
                    || !ROUTING_DESIRED_WORDS.matcher(sql).find()
                    || ROUTING_FORBIDDEN_WORDS.matcher(sql).find();
        }
    
        @Override
        public <T> T execute(CallableStatementCreator csc, CallableStatementCallback<T> action) {
            return tryWithRouting(csc, () -> super.execute(csc, cs -> handleCustomFetchSize(
                    CallableStatement.class,
                    cs,
                    cs.getConnection(),
                    cs.getFetchSize(),
                    action::doInCallableStatement
            )));
        }
    
        @Override
        public <T> T execute(ConnectionCallback<T> action) {
            return tryWithRouting(action, () -> super.execute((Connection con) -> handleCustomFetchSize(
                    null,
                    null,
                    con,
                    getFetchSize(),
                    nullStmt -> action.doInConnection(con)
            )));
        }
    
        @Override
        public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) {
            return tryWithRouting(psc, () -> super.execute(psc, ps -> handleCustomFetchSize(
                    PreparedStatement.class,
                    ps,
                    ps.getConnection(),
                    ps.getFetchSize(),
                    action::doInPreparedStatement
            )));
        }
    
        @Override
        public <T> T execute(StatementCallback<T> action) {
            return tryWithRouting(action, () -> super.execute((Statement stmt) -> handleCustomFetchSize(
                    Statement.class,
                    stmt,
                    stmt.getConnection(),
                    stmt.getFetchSize(),
                    action::doInStatement
            )));
        }
    
        @Override
        @Nullable
        public <T> T query(String sql, ResultSetExtractor<T> rse) {
            return super.query(sql, new RoutingAwareResultSetExtractor<>(rse));
        }
    
        @Override
        @Nullable
        public <T> T query(PreparedStatementCreator psc, @Nullable PreparedStatementSetter pss, ResultSetExtractor<T> rse) {
            return super.query(psc, pss, new RoutingAwareResultSetExtractor<>(rse));
        }
    
        /**
         * все читающие запросы в новом rw-датасорсе будут выбрасывать результат,
         * таким образом мы будем проверять корректность, ловить ошибки,
         * но не будем портить переданный {@link ResultSetExtractor}, который может оказаться stateful
         */
        private static final class RoutingAwareResultSetExtractor<T> implements ResultSetExtractor<T> {
            private final ResultSetExtractor<T> proxee;
    
            private RoutingAwareResultSetExtractor(ResultSetExtractor<T> proxee) {
                this.proxee = Objects.requireNonNull(proxee, "result set extractor");
            }
    
            @Override
            public T extractData(ResultSet rs) throws SQLException {
                if (ROUTING.get() == QueryRouting.NOOP_REPEAT) {
                    rs.next();
                    return null;
                } else {
                    return proxee.extractData(rs);
                }
            }
        }
    
        /**
         * handles connection autocommit if custom fetch size is specified as described here
         * https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
         *
         * @apiNote visible for {@link ru.yandex.market.mbi.util.db.DbUtil}
         * @see org.postgresql.core.QueryExecutor#QUERY_FORWARD_CURSOR
         * @see java.sql.ResultSet#setFetchSize(int)
         * @see java.sql.Statement#setFetchSize(int)
         * @see JdbcTemplate#setFetchSize(int)
         */
        public static <T, S extends Statement> T handleCustomFetchSize(
                @Nullable Class<S> stmtClass,
                @Nullable S stmt,
                Connection connection,
                int fetchSize,
                ThrowingJdbcFunction<S, T> action
        ) throws SQLException {
            try {
                if (fetchSize > 0) {
                    // fetch size may be set by eg JdbcTemplate#applyStatementSettings(Statement)
                    setManualCommitIfRequired(connection, fetchSize);
                } else if (stmt != null) {
                    // fetchSize could be set within action callback,
                    // in this case CustomFetchSizeHandler should handle autoCommit.
                    // additional check is just to avoid creating unnecessary objects
                    if (shouldHandleManualCommit(connection)) {
                        assert stmtClass != null : "should be not null when stmt is not null";
                        stmt = stmtClass.cast(Proxy.newProxyInstance(stmtClass.getClassLoader(),
                                new Class<?>[]{stmtClass}, new CustomFetchSizeHandler(stmt)));
                    }
                }
                return action.apply(stmt);
            } catch (SQLException | RuntimeException e) {
                var manualCommit = MANUAL_COMMIT.get();
                if (manualCommit == connection) {
                    manualCommit.rollback();
                }
                throw e;
            } finally {
                var manualCommit = MANUAL_COMMIT.get();
                MANUAL_COMMIT.remove();
                if (manualCommit == connection) {
                    // this will also implicitly call commit if not yet
                    manualCommit.setAutoCommit(true);
                }
            }
        }
    
        /**
         * helps {@link #handleCustomFetchSize} to handle cases when callback sets custom fetch size
         */
        private static class CustomFetchSizeHandler implements InvocationHandler {
            private static final String METHOD = "setFetchSize";
            private final Statement proxee;
    
            CustomFetchSizeHandler(Statement proxee) {
                this.proxee = proxee;
            }
    
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                if (METHOD.equals(method.getName())) {
                    setManualCommitIfRequired(proxee.getConnection(), (Integer) args[0]);
                }
                try {
                    return method.invoke(proxee, args);
                } catch (InvocationTargetException ex) {
                    throw ex.getCause();
                }
            }
        }
    
        /**
         * @apiNote visible for {@link ru.yandex.market.mbi.util.db.DbUtil}
         */
        @FunctionalInterface
        public interface ThrowingJdbcFunction<S, T> {
            @Nullable
            T apply(@Nullable S s) throws SQLException;
        }
    
        private static void setManualCommitIfRequired(Connection connection, int fetchSize) throws SQLException {
            if (fetchSize > 0 && shouldHandleManualCommit(connection)) {
                connection.setAutoCommit(false); // may fail so call before storing
                MANUAL_COMMIT.set(connection);
                // we store connection to avoid nested connection issues
                // eg if somebody opens another connecton within running callback
                // then we won't apply manual commit/rollback to it
            }
        }
    
        private static boolean shouldHandleManualCommit(Connection connection) throws SQLException {
            // don't check for isAnyRoutingEnabled, all this stuff is only for pg
            if (MANUAL_COMMIT.get() != null || !connection.getAutoCommit()) {
                return false;
            }
            try {
                return "PostgreSQL".equals(connection.getMetaData().getDatabaseProductName());
            } catch (SQLException e) {
                throw new UncategorizedSQLException("DatabaseType::of", null, e);
            }
        }
    
        private static boolean isAnyRoutingEnabled() {
            var r = ROUTING.get();
            return r != null && r != QueryRouting.DISABLED;
        }
    
        @Nonnull
        private static QueryRouting setRouting(QueryRouting r) {
            ROUTING.set(Objects.requireNonNull(r, "routing flag should always be defined before request"));
            return r;
        }
    }

  • OracleToPostgresDataSourceTransactionManager, который работает в паре с предыдущим классом и в рамках одной транзакции поддерживает единый уровень роутинга;

    OracleToPostgresDataSourceTransactionManager
    /**
     * TransactionManager, который может переключаться на второй датасорс в рамках selectов вне транзакций.
     *
     * @implNote использует ThreadLocal-копию флага с роутингом, чтобы на один логический запрос в темплейт
     * был один запрос в Supplier. Это нужно, так как JdbcTemplate в рамках запроса вызывает obtainDataSource,
     * а потом ещё раз getDataSource, когда закрывает соединение.
     * <p>
     * на данный момент не поддерживает Spring 5.3+, см {@link OracleToPostgresJdbcTemplate},
     * так как этот класс должен работать с ним в связке.
     * <p>
     * наследует {@link CallbackPreferringPlatformTransactionManager}, чтобы уметь работать с любым наследником {@link TransactionTemplate}
     */
    public class OracleToPostgresDataSourceTransactionManager
            extends DataSourceTransactionManager
            implements CallbackPreferringPlatformTransactionManager {
        private static final Logger log = LoggerFactory.getLogger(OracleToPostgresDataSourceTransactionManager.class);
        static final ThreadLocal<QueryRouting> ROUTING = new NamedThreadLocal<>("routing");
        private final DataSource newDataSource;
        private final Supplier<QueryRouting> dataSourceRouting;
    
        /**
         * @param oldDataSource     основной, обычный датасорс, например Oracle
         * @param newDataSource     датасорс, на который мы хотим переключать часть запросов, например PostgreSQL
         * @param dataSourceRouting будет вызван перед каждым подходящим под критерии
         *                          роутинга запросом с sql в параметре, поэтому должен работать
         *                          быстро, кеширование и прочее — на вашей совести,
         *                          все ошибки будут просто залогированы, поэтому о них можно не заботиться
         */
        public OracleToPostgresDataSourceTransactionManager(
                DataSource oldDataSource,
                DataSource newDataSource,
                Supplier<QueryRouting> dataSourceRouting
        ) {
            OracleToPostgresJdbcTemplate.checkSpringVersion();
    
            super.setDataSource(Objects.requireNonNull(oldDataSource, "regular DataSource"));
            this.newDataSource = Objects.requireNonNull(newDataSource, "new DataSource");
            Objects.requireNonNull(dataSourceRouting);
            this.dataSourceRouting = () -> {
                try {
                    return dataSourceRouting.get();
                } catch (RuntimeException e) {
                    log.error("Failed to get query routing flag, got {}", e, e);
                    return null;
                }
            };
        }
    
        @Override
        public final void setDataSource(DataSource dataSource) {
            if (dataSource != super.getDataSource()) {
                log.error("Setting another datasource (implicitly) is not what you wanted, only constructor is supported");
            }
            // do nothing to avoid implicit resetting
        }
    
        @Override
        public final DataSource getDataSource() {
            return obtainDataSource();
        }
    
        @Nonnull
        @Override
        protected final DataSource obtainDataSource() {
            return ROUTING.get() == QueryRouting.ENABLED
                    ? newDataSource
                    : Objects.requireNonNull(super.getDataSource(), "should be guaranteed by ctor and setter");
        }
    
        @Override
        public <T> T execute(
                TransactionDefinition definition,
                TransactionCallback<T> callback
        ) throws TransactionException {
            var r = ROUTING.get();
            if (r == null) {
                // если есть активный запрос, то открываем транзакцию
                // с тем же базовым значением роутинга до конца транзакции
                // это немного странная ситуация и такого по-хорошему быть не должно,
                // но шаловливые ручки девелоперов могут и не такое, поэтому тут защита от дурака
                r = OracleToPostgresJdbcTemplate.ROUTING.get();
            } else {
                // если у нас вложенная транзакция, просто поддерживаем текущий роутинг
                return doExecute(definition, callback);
            }
            if (r == null) {
                // на одну транзакцию всего один поход за флаг
                r = dataSourceRouting.get();
            }
            if (r == null) {
                // выключаем роутинг до конца транзакции, чтобы запросы не начали ходить куда попало
                r = QueryRouting.DISABLED;
            }
    
            ROUTING.set(Objects.requireNonNull(r, "routing flag should always be defined before request"));
            try {
                return doExecute(definition, callback);
            } finally {
                ROUTING.remove();
            }
        }
    
        /**
         * @see TransactionTemplate#execute
         */
        private <T> T doExecute(
                TransactionDefinition definition,
                TransactionCallback<T> callback
        ) throws TransactionException {
            TransactionStatus status = getTransaction(definition);
            T result;
            try {
                result = callback.doInTransaction(status);
            } catch (RuntimeException | Error ex) {
                doRollbackOnException(status, ex);
                throw ex;
            } catch (Throwable ex) {
                doRollbackOnException(status, ex);
                throw new UndeclaredThrowableException(ex, "TransactionCallback threw undeclared checked exception");
            }
            commit(status);
            return result;
        }
    
        /**
         * @see TransactionTemplate#rollbackOnException
         */
        private void doRollbackOnException(
                TransactionStatus status,
                Throwable ex
        ) throws TransactionException {
            try {
                rollback(status);
            } catch (TransactionSystemException ex2) {
                log.error("Application exception overridden by rollback exception", ex);
                ex2.initApplicationException(ex);
                throw ex2;
            } catch (RuntimeException | Error ex2) {
                log.error("Application exception overridden by rollback exception", ex);
                throw ex2;
            }
        }
    }

  • OracleToPostgresLobHandler для вызова PreparedStatement.setString для PostgreSQL вместо setClobAsString, так как в PostgreSQL вместо CLOB-колонок мы создавали text-колонки;

    OracleToPostgresLobHandler
    /**
     * Предназначен для использования {@link #getClobAsString} и {@link LobCreator#setClobAsString}
     * и их единообразной работы с драйверами для oracle и pg.
     */
    @Immutable
    public final class OracleToPostgresLobHandler extends DefaultLobHandler {
        /**
         * максимальный размер типа varchar2
         * https://docs.oracle.com/en/database/oracle/oracle-database/19/refrn/datatype-limits.html
         */
        private static final int ORACLE_MAX_VARCHAR_SIZE_BYTES = 4000;
    
        public OracleToPostgresLobHandler() {
            // сейчас мы используем современный оракловый драйвер,
            // поэтому явно указываем дефолтные настройки и используем НЕ TemporaryLobCreator,
            // очисткой ресурсов занимается сам оракловый драйвер при закрытии PreparedStatement
            setCreateTemporaryLob(false);
            setStreamAsLob(true);
        }
    
        @Override
        public LobCreator getLobCreator() {
            return new DefaultLobCreator() {
                @Override
                public void setClobAsString(
                        @Nonnull PreparedStatement ps,
                        int paramIndex,
                        @Nullable String content
                ) throws SQLException {
                    if (content == null
                            // берём по максимуму (по 4 байта) на символ для быстрой проверки
                            || content.length() <= ORACLE_MAX_VARCHAR_SIZE_BYTES >> 2
                            // на данный момент мы мапим оракловые clob-колонки на text, не на oid
                            || "PostgreSQL".equals(ps.getConnection().getMetaData().getDatabaseProductName())
                            // медленная проверка по границе размеров
                            || content.getBytes(StandardCharsets.UTF_8).length <= ORACLE_MAX_VARCHAR_SIZE_BYTES
                    ) {
                        // оракловый драйвер для небольших строк спокойно обрабатывает clob-колонку как обычную строковую
                        ps.setString(paramIndex, content);
                    } else {
                        super.setClobAsString(ps, paramIndex, content);
                    }
                }
            };
        }
    }

  • OracleToPostgresSql для случая разных запросов в зависимости от типа базы (если запрос не удавалось привести к единому виду) — именно этот класс помогал обходить случаи, когда трансформера не хватало;

    OracleToPostgresSql
    @ThreadSafe
    public final class OracleToPostgresSql implements PreparedStatementCreator, SqlProvider {
        private static final ThreadLocal<String> LAST_DB_TYPE = new NamedThreadLocal<>("db type");
        private final String sqlOracle;
        private final String sqlPg;
    
        public OracleToPostgresSql(
                String sqlOracle,
                String sqlPg
        ) {
            this.sqlOracle = Objects.requireNonNull(sqlOracle);
            this.sqlPg = Objects.requireNonNull(sqlPg);
        }
    
        @Override
        public PreparedStatement createPreparedStatement(Connection con) throws SQLException {
            LAST_DB_TYPE.set(con.getMetaData().getDatabaseProductName());
            return con.prepareStatement(getSql());
        }
    
        @Override
        public String getSql() {
            return "PostgreSQL".equals(LAST_DB_TYPE.get())
                    ? sqlPg
                    : sqlOracle;
        }
    
        /**
         * uses {@link PreparedStatementCreator} instead of sql string
         *
         * @see JdbcOperations#batchUpdate(String, List)
         */
        public int[] batchUpdate(
                JdbcOperations jdbcTemplate,
                List<Object[]> batchArgs
        ) throws DataAccessException {
            return batchUpdate(jdbcTemplate, batchArgs, new int[0]);
        }
    
        /**
         * uses {@link PreparedStatementCreator} instead of sql string
         *
         * @see JdbcOperations#batchUpdate(String, List, int[])
         */
        public int[] batchUpdate(
                JdbcOperations jdbcTemplate,
                List<Object[]> batchArgs,
                int[] argTypes
        ) throws DataAccessException {
            if (batchArgs.isEmpty()) {
                return new int[0];
            }
            return batchUpdate(jdbcTemplate, new BatchPreparedStatementSetter() {
                @Override
                public void setValues(PreparedStatement ps, int i) throws SQLException {
                    Object[] values = batchArgs.get(i);
                    int colIndex = 0;
                    for (Object value : values) {
                        colIndex++;
                        if (value instanceof SqlParameterValue) {
                            SqlParameterValue paramValue = (SqlParameterValue) value;
                            StatementCreatorUtils.setParameterValue(ps, colIndex, paramValue, paramValue.getValue());
                        } else {
                            int colType;
                            if (argTypes.length < colIndex) {
                                colType = SqlTypeValue.TYPE_UNKNOWN;
                            } else {
                                colType = argTypes[colIndex - 1];
                            }
                            StatementCreatorUtils.setParameterValue(ps, colIndex, colType, value);
                        }
                    }
                }
    
                @Override
                public int getBatchSize() {
                    return batchArgs.size();
                }
            });
        }
    
        /**
         * uses {@link PreparedStatementCreator} instead of sql string
         *
         * @see JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)
         */
        public int[] batchUpdate(
                JdbcOperations jdbcTemplate,
                BatchPreparedStatementSetter pss
        ) throws DataAccessException {
            return Objects.requireNonNull(jdbcTemplate.execute(this, ps -> {
                try {
                    int batchSize = pss.getBatchSize();
                    var ipss = pss instanceof InterruptibleBatchPreparedStatementSetter
                            ? (InterruptibleBatchPreparedStatementSetter) pss
                            : null;
                    if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
                        for (int i = 0; i < batchSize; i++) {
                            pss.setValues(ps, i);
                            if (ipss != null && ipss.isBatchExhausted(i)) {
                                break;
                            }
                            ps.addBatch();
                        }
                        return ps.executeBatch();
                    } else {
                        List<Integer> rowsAffected = new ArrayList<>(batchSize);
                        for (int i = 0; i < batchSize; i++) {
                            pss.setValues(ps, i);
                            if (ipss != null && ipss.isBatchExhausted(i)) {
                                break;
                            }
                            rowsAffected.add(ps.executeUpdate());
                        }
                        int[] rowsAffectedArray = new int[rowsAffected.size()];
                        for (int i = 0; i < rowsAffectedArray.length; i++) {
                            rowsAffectedArray[i] = rowsAffected.get(i);
                        }
                        return rowsAffectedArray;
                    }
                } finally {
                    if (pss instanceof ParameterDisposer) {
                        ((ParameterDisposer) pss).cleanupParameters();
                    }
                }
            }));
        }
    
        /**
         * uses {@link PreparedStatementCreator} instead of sql string
         *
         * @see JdbcOperations#batchUpdate(String, Collection, int, ParameterizedPreparedStatementSetter)
         */
        public <T> int[][] batchUpdate(
                JdbcOperations jdbcTemplate,
                Collection<T> batchArgs,
                int batchSize,
                ParameterizedPreparedStatementSetter<T> pss
        ) throws DataAccessException {
            return Objects.requireNonNull(jdbcTemplate.execute(this, ps -> {
                try {
                    boolean batchSupported = JdbcUtils.supportsBatchUpdates(ps.getConnection());
                    List<int[]> rowsAffected = new ArrayList<>(batchSupported
                            ? Math.min(batchArgs.size(), (batchArgs.size() / batchSize) + 1)
                            : batchArgs.size());
                    int n = 0;
                    for (T obj : batchArgs) {
                        pss.setValues(ps, obj);
                        n++;
                        if (batchSupported) {
                            ps.addBatch();
                            if (n % batchSize == 0 || n == batchArgs.size()) {
                                rowsAffected.add(ps.executeBatch());
                            }
                        } else {
                            int i = ps.executeUpdate();
                            rowsAffected.add(new int[]{i});
                        }
                    }
                    int[][] result1 = new int[rowsAffected.size()][];
                    for (int i = 0; i < result1.length; i++) {
                        result1[i] = rowsAffected.get(i);
                    }
                    return result1;
                } finally {
                    if (pss instanceof ParameterDisposer) {
                        ((ParameterDisposer) pss).cleanupParameters();
                    }
                }
            }));
        }
    }

  • рантайм-флажок для QueryRouting с фоновым обновлением (чтобы не блокировать никакие запросы), чтобы значение роутинга фиксировалось на момент создания транзакции (или просто запроса, если явной транзакции не было).

По итогу самой сложной частью оказалось корректно подружить кастомные jdbc template и tx manager. Скажу так: если вам кажется, что у вас маловато тестов, — вам не кажется, пишите ещё. Лишь когда мы покрыли тестами все возможные сценарии работы с транзакциями (особенно вложенность), появилось достаточно уверенности для перехода к самому переключению.

К этому моменту схема стала такой:

Как мы переехали с Oracle на PostgreSQL в нагруженном сервисе без даунтайма - 5

Переключение. Тестовое окружение

Наконец в начале марта 2022 года настал момент, когда можно было что-нибудь включить. Разумеется, на тестовом окружении. Хотя оно и далековато от полноценной нагрузки, но все фоновые процессы там такие же, как на продакшене, а большинство основных сценариев покрыто автотестами, а ещё ручными и интеграционными тестами. В общем, недовольными могли остаться лишь тестировщики, но им не привыкать.

Короче говоря, мы переключились на NOOP_REPEAT и стали ждать ошибок. Разумеется, они не заставили себя ждать. Ошибки отлавливали в Sentry (просто потому что он уже был, а так вместо Sentry мог быть ELK и любой другой вариант сбора и просмотра логов, tail -f тоже сойдёт): вбили фильтр по классу исключений и в спокойном режиме чинили всё, что нашлось. 

Ошибки ошибками, но немного неожиданным стало появление тормозов. Поскольку при переносе схемы в PostgreSQL мы создавали ровно те же индексы, как и в Oracle, мы ожидали, что performance будет сопоставимым. Например, мы думали, что тайминги при shadow-запросах будут на уровне x2. Но у PostgreSQL своё представление о прекрасном, поэтому пришлось в срочном порядке погружаться в мир тюнинга запросов под PostgreSQL.

Учитывая, что на тот момент опыта оптимизации PostgreSQL у меня было примерно ноль, пришлось спешно искать информацию, чтение которой не заняло бы несколько недель. И я её нашёл. Под спойлером ссылки, которые остались в закромах и которые помогли лично мне.

Полезные ссылки по оптимизации запросов

Спустя какое-то время выработался такой шорт-лист оптимизаций:

  1. Выкинуть из запроса всё лишнее (join, поля и остальные мелочи — Oracle это прощал).

  2. Создать подходящий индекс (несколько раз пригодился даже brin).

  3. Поменять порядок join. Это особенно актуально, когда их много (на это также, например, влияет параметр join_collapse_limit, но мы его не меняли).

  4. Заменить join на where [not] exists(...).

  5. Заменить row_number over as rn ... where rn = 1 на select distinct on (columns) order by — это крутая PostgreSQL-specific-фича, которая воспринимается легче и работает быстрее.

  6. Если хочется nested loop для join, попробовать PostgreSQL-specific join lateral. Правда, нам он пригодился всего один раз. 

  7. Поднять work_mem на кластере PostgreSQL. При 16 MiB мы упирались в IO, поэтому подняли до 32 MiB (есть очень большие запросы).

  8. Поднять lock_timeout. Дефолтное значение в секунду может быть слишком маленьким.

Но к этому списку мы пришли не сразу. Вот, что мы ещё успели попробовать на старте в попытке заставить работать планировщик, «как в Oracle»:

  • Поигрались с cost-настройками. В итоге сейчас у нас cpu_tuple_cost=0.01, cpu_index_tuple_cost=0.001, cpu_operator_cost = 0.0005, но эти чиселки были взяты где-то по совету агентства ОБС, в реальности влияние настройки неизвестно 🙂

  • Пробовали менять join_collapse_limit — не помогло, отказались.

  • Пробовали выключать enable_seqscan — не помогло, отказались.

  • Пробовали расширение pg_hint_plan — слишком неудобно по сравнению с хинтами Oracle и смахивает на костыли, отказались. В исходных запросах на Oracle была куча хинтов, а для PostgreSQL в итоге хватило остальных оптимизаций.

Итого в рамках тестового окружения было исправлено 94 проблемы, связанные с запросами или логикой, а ещё 27 проблем, связанных с performance.

План переключения составили такой:

  1. В час Ч отбираем у Oracle права на запись.

  2. Ждём примерно 10 секунд, прибиваем все зависшие долгие транзакции Oracle.

  3. Дожидаемся нулевого лага репликации по нашим репликатам.

  4. Останавливаем репликацию.

  5. Накатываем на PostgreSQL гранты, даём права на запись сервису и отбираем их у GG.

  6. Переключаем на запись в PostgreSQL.

  7. Идём чинить то, что вылезет.

План был приведён в действие в конце марта 2022 года. Отработал на ура (правда, сразу всплыло, что мы забыли сдвинуть sequences в PostgreSQL 🙂). В этот день несколько ребят вызвались помочь быстренько разгрести всплывшие проблемы, которые пропустили все предыдущие приготовления. 

К вечеру этого же дня для тестинга всё закончилось, впереди маячил продакшн.

И наконец — продакшн

Для продакшена мы повторили все те же действия. Разумеется, появились новые проблемы: примерно за две недели нам пришлось исправить 51 ошибку в запросах и логике и 25 ошибок в performance. С учётом прошлого опыта переключений снова был составлен поминутный план переключения — и в этот раз про sequences не забыли 🙂.

И вот наступил день Д, час Ч. Знаете то чувство, когда ладошки становятся потными и пульс подскакивает? Вот его я и испытал. Когда все были готовы, я дал отмашку к началу.

И оно случилось. И оно заработало. Сначала, конечно, была тишина и редкие вопросы: «А мы уже переключились?» Но потом пришло осознание того, что всё хорошо. А самое главное — пользователи сервиса ничего не заметили. Все прошло максимально тихо и гладко — разве что было несколько единичных пятисоток.

Редкие всплывающие ошибки мы починили в пределах пары часов. Уставшие, но довольные разошлись по домам.

Схема стала такой:

Как мы переехали с Oracle на PostgreSQL в нагруженном сервисе без даунтайма - 6

Эпилог

В итоге получился подход в духе «нарисовать сову», но он сработал, хотя в него поначалу не очень верили. Причём результат вышел даже лучше, чем мы ожидали вначале. Мы оптимизировали работу с базой, получив на выходе более низкое время ответа сервиса, что видно на самой первом скриншоте в статье.

Этот же подход, но в сильно меньших масштабах мы применили ещё раз в соседнем небольшом сервисе: те же полтора землекопа, несколько недель работы, всего несколько ошибок по пути — и дело готово.

Ну и напоследок: тесты — наше всё. Именно благодаря тестам мы были уверены, что не уроним всё и сразу. Тестов много не бывает.

Несколько полезных ссылок по миграции данных

Автор: Сергей

Источник

* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js