Часть 1 – habrahabr.ru/post/330618
В этой части изменим логику загрузки справочника Products:
- При помощи компонента «Union All» объединим два входящих потока в один;
- Для новых записей будем делать вставку, а для записей, которые уже были добавлены ранее будем делать обновление. Для разделения записей на добавляемые и обновляемые воспользуемся компонентом Lookup;
- Для обновления записей применим компонент «OLE DB Command».
В завершении этой части рассмотрим компонент Multicast для того чтобы распараллелить выходящий набор.
Итого в этой части мы познакомимся с четырьмя новыми компонентами: Union All, Lookup, OLE DB Command и Multicast.
Дальше так же будет очень много картинок.
Продолжим знакомство с SSIS
Создадим новый пакет:
И переименуем его в «LoadProducts_ver2.dtsx»:
В области «Control Flow» создадим «Data Flow Task»:
Двойным щелчком по элементу «Data Flow Task» зайдем в его область «Data Flow». Создадим два элемента «Source Assistant» для соединений SourceA и SourceB. Переименуем эти элементы в «Source A» и «Source B» соответственно:
«Source A» настроим следующим образом:
Текст запроса:
SELECT
ID SourceProductID,
Title,
Price
FROM Products
В целях демонстрации больших возможностей за раз, здесь я намеренно отпустил SourceID.
«Source B» настроим следующим образом:
Текст запроса:
SELECT
ID SourceProductID,
'B' SourceID,
Title,
Price
FROM Products
В результате набор A у нас будет иметь 3 колонки [SourceProductID, Title, Price], а набор B будет иметь 4 колонки [SourceProductID, SourceID, Title, Price].
Воспользуемся элементом «Union All», чтобы объединить данные из 2-х наборов в один. Направим в него синие стрелки из «Source A» и «Source B»:
Каким образом делается сопоставление колонок двух входящих наборов, можно увидеть дважды щелкнув на элементе «Union All»:
Как мы видим, здесь сделалось автоматическое сопоставление колонок имена которых совпадают. При необходимости мы можем сделать свое сопоставление, для примера добавим колонку SourceID из второго набора:
В данном случае значения SourceID набора «Source A» будут равны NULL.
Объединение двух наборов в данном случае делается на стороне SSIS. Здесь стоит обратить внимание на то, что базы источники и принимающая база могут располагаться на разных серверах/экземплярах SQL Server, по этой причине мы не всегда сможем так просто написать SQL запрос используя в нем таблицы из разных баз с применением SQL-операции UNION или JOIN (который можно было использовать вместо Lookup описанного ниже).
Для того чтобы заменить NULL значения на «A» воспользуемся компонентом «Derived Column» в который направим поток из «Union All»:
Двойным щелчком зайдем в редактор «Derived Column» и настроим его следующим образом:
Проделаем следующее (мышь в помощь):
- Укажем в «Derived Column» значение «Replace 'SourceID'» — это будет означать что мы на выходе заменяем старую колонку SourceID на новую;
- Перетащим в область «Expression» функцию REPLACENULL;
- Перетащим на место первого аргумента функции REPLACENULL колонку SourceID;
- В качестве второго аргумента пропишем константу «A».
Для того чтобы понять, что произошло с данными после прохождения «Union All» сделайте «Enable Data Viewer» для стрелки, идущей от «Union All» к «Derived Column»:
Теперь при запуске пакета на выполнение вы сможете увидеть набор, который получился в результате:
Здесь видно, что на этом этапе (до Derived Column) в колонке SourceID для строк первого набора стоят значения NULL.
Для того чтобы определить была ли добавлена ранее запись в базу DemoSSIS_Target воспользуемся компонентом Lookup:
Дважды щелкнув по нему настроим данный элемент:
Здесь мы скажем, что те строки, для которых не найдено соответствие, мы будем перенаправлять в поток «no match output». В этом случае на выходе мы получим 2 набора «Lookup Match Output» и «Lookup No Match Output».
Например, если выставить значение «Ignore failure», то в строках, для которых не нашлось сопоставления в поле TargetID (см. ниже) будет записано значение NULL и все строки будут возвращены через один набор «Lookup Match Output».
«Full cache» говорит о том, что набор, который будет использоваться в качестве справочника одним SQL запросом (см.на следующей вкладке) будет полностью загружен в память и строки будут сопоставляться уже с кэша без повторных обращений к SQL Server.
Если же выбрать «Partial cache» или «No cache», то на вкладке Advanced можно будет прописать запрос с параметрами, который будет выполняться для сопоставления каждой строки входящего набора. Для интереса можно поиграться с этим свойством и через SQL Server Profiler посмотреть какие будут формироваться запросы при выполнении пакета.
На следующей вкладке нам нужно определить набор, который будет выступать в роли справочника:
Я прописал здесь запрос:
SELECT
SourceID,
SourceProductID,
ID TargetID
FROM Products
На следующей вкладке нужно указать по каким полям делается поиск строки в справочнике и какие колонки из справочника нужно добавить в выходной набор (если это нужно):
Для определение связи нужно при помощи мыши перетащить поле SourceProductID на SourceProductID и поле SourceID на SourceID.
Добавим компонент «Destination Assistant» для вставки записей с потока «Lookup No Match Output»:
Перетащим синюю стрелку с «Lookup» на «OLE DB Destination» и в диалоговом окне выберем поток «Lookup No Match Output»:
В итоге мы получим следующее:
Дважды щелкнув по «OLE DB Destination» настроим его:
Обработку вставки новых записей мы сделали.
Теперь для обновления ранее вставленных записей воспользуемся компонентом «OLE DB Command» и перенесем на него синюю стрелку от Lookup:
В этот компонент автоматически будет направлен поток «Lookup Match Output», т.к. поток «Lookup No Match Output» мы уже выбрали ранее:
Дважды щелкнем на «OLE DB Command» и настроим его:
Пропишем следующий запрос на обновление:
UPDATE Products
SET
Title=?,
Price=?
WHERE ID=?
На следующей вкладке укажем каким образом будут задаваться параметры на основании данных строк входящего набора «Lookup Match Output»:
Через SSMS добавим новых продуктов в базу DemoSSIS_SourceB:
USE DemoSSIS_SourceB
GO
-- добавим новых товаров
SET IDENTITY_INSERT Products ON
INSERT Products(ID,Title,Price)VALUES
(6,N'Точилка',NULL),
(7,N'Ластик',NULL),
(8,N'Карандаш простой',NULL)
SET IDENTITY_INSERT Products OFF
GO
Для того чтобы отследить как менялись данные, вы можете, перед запуском пакета на выполнение, в необходимых местах сделать «Enable Data Viewer»:
Запустим пакет на выполнение:
В итоге мы должны увидеть, что 3 строки было вставлено при помощи компонента «OLE DB Destination» и 10 строк обновлено при помощи компонента «OLE DB Command».
Запрос прописанный в «OLE DB Command» выполнился для каждой строки входящего набора, т.е. в данном примере 10 раз.
В «OLE DB Command» можно прописать более сложную логику на TSQL, например, сделать проверку, были ли изменены Title или Price, и делать обновление соответствующей строки только если какое-то из значений отличается.
Для наглядности добавим новую колонку в таблицу Products в базе DemoSSIS_Target:
USE DemoSSIS_Target
GO
ALTER TABLE Products ADD UpdatedOn datetime
GO
Давайте теперь пропишем следующую команду:
Текст команды:
DECLARE @TargetID int=?
DECLARE @Title nvarchar(50)=?
DECLARE @Price money=?
IF(EXISTS(
SELECT Title,Price
FROM Products
WHERE ID=@TargetID
EXCEPT
SELECT @Title,@Price
)
)
BEGIN
UPDATE Products
SET
Title=@Title,
Price=@Price,
UpdatedOn=GETDATE()
WHERE ID=@TargetID
END
Так же можно было бы все это оформить в виде хранимой процедуры, а здесь прописать ее через вызов «EXEC ProcName ?,?,?». Здесь, думаю, кому как удобнее, мне порой удобнее, чтобы все было прописано в одном месте, т.е. в SSIS-проекте. Но если использовать процедуру, то тоже получаем свои удобства, в этом случае можно, было бы просто изменить процедуру и избежать переделки и повторного развертывания SSIS-проекта.
После чего переопределим привязку параметров согласно их очередности в тексте команды:
Сделаем в базе DemoSSIS_SourceA обновление:
USE DemoSSIS_SourceA
GO
UPDATE Products
SET
Price=30
WHERE ID=2 -- Корректор
И снова запустим проект на выполнение.
В результате после очередного запуска пакета на выполнение, UPDATE должен будет выполниться только 1 раз, только для этой записи.
После выполнения пакета проверим это при помощи запроса:
USE DemoSSIS_Target
GO
SELECT *
FROM Products
ORDER BY UpdatedOn DESC
В рамках данной части рассмотрим еще компонент «Multicast». Данный компонент позволяет получить из одного потока несколько. Это может быть полезно, когда одни и те же данные необходимо записать в два или более разных мест – т.е. входит один набор, а выходит столько его копий сколько нам нужно, и с каждой копией этого набора мы можем делать что захотим.
Для примера создадим в базе DemoSSIS_Target еще одну таблицу LastAddedProducts:
USE DemoSSIS_Target
GO
CREATE TABLE LastAddedProducts(
SourceID char(1) NOT NULL, -- используется для идентификации источника
SourceProductID int NOT NULL, -- ID в источнике
Title nvarchar(50) NOT NULL,
Price money,
CONSTRAINT PK_LastAddedProducts PRIMARY KEY(SourceID,SourceProductID),
CONSTRAINT CK_LastAddedProducts_SourceID CHECK(SourceID IN('A','B'))
)
GO
Для очистки этой таблицы добавим в область «Control Flow» компонент «Execute SQL Task» и пропишем в нем команду «TRUNCATE TABLE LastAddedProducts»:
Перейдем в область «Data Flow» компонента «Data Flow Task» и добавим компонент следующим образом:
Обратите внимание на желтый восклицательный знак – это произошло из-за того, что мы добавили колонку UpdatedOn и не привязали ее. Зайдем в элемент «OLE DB Destination», перейдем на вкладку Mappings оставим для колонки UpdatedOn в качестве входящего поля Ignore и нажмем OK:
Создадим еще один элемент «OLE DB Destination» и перетащим на него вторую синюю стрелку от элемента Multicast:
Переименуем для наглядности:
Настроим «To LastAddedProducts»:
Удалим через SSMS три последние вставленные записи:
USE DemoSSIS_Target
GO
DELETE Products
WHERE SourceID='B'
AND SourceProductID>=6
И запустим пакет на выполнение:
В итоге добавление произошло в 2 таблицы – Products и LastAddedProducts.
Заключение по второй части
В этой части мы рассмотрели каким образом можно делать синхронизацию небольших справочников. Здесь конечно не учитывается тот момент, что данные в источниках могут еще удаляться, но при необходимости вы сможете попробовать сделать это самостоятельно, т.к. при удалении иногда нужно учитывать дополнительные факторы, например, на удаляемую запись могут быть ссылки из других таблиц (в следующей части планируется это сделать).
Чтобы не нарушать ссылочную целостность, иногда запись в принимающей таблице удаляется логически, для этого, например, можно в эту таблицу добавить поле Deleted типа bit (флаг логического удаления) или DeletedOn типа datetime (дата/время логического удаления).
Порой на сервере, на котором располагается база Target делается вспомогательная промежуточная база (обычно ее называют Staging) и первым делом «сырые» данные из Source загружаются в нее. Так как теперь Target и Staging находятся на одном сервере, то вторым шагом мы можем легко написать SQL-запрос (например, используя SQL-конструкцию MERGE или запрос с применение конструкции JOIN), который оперирует с наборами обеих этих баз.
SSIS достаточно интересный инструмент, который на мой взгляд не помешает иметь в своем арсенале, так как в некоторых случаях он может сильно упростить процесс интеграции. Но конечно бывают ситуации, когда все взвесив, разумнее написать интеграцию прибегая к другим способам, например, использовать Linked Servers и писать процедуры на чистом TSQL или писать свою утилиту на каком-то другом языке программирования с применением всей мощи ООП и т.п.
Изучая материал проявляйте больше любопытства, например, щелкайте по вкладкам, которые я не показал, смотрите и анализируйте информацию на них, щелкайте по стрелкам, у них тоже есть свои свойства и настройки. Экспериментируйте, со всем что вам покажется интересным, не ленитесь делать свои небольшие тестовые примеры. Меняйте схему, так чтобы это приводило к исключениям, выбирайте более подходящие параметры у компонент пытаясь найти наиболее подходящий выход из сложившейся ситуации.
Спасибо за внимание! Удачи!
Продолжение следует…
Автор: Сергей Меньшов