В этой части я расскажу о работе с параметрами и переменными внутри SSIS-пакета. Узнаем, как можно задавать и отслеживать значения переменных во время выполнения пакета.
Также рассмотрим вызов одного пакета из другого при помощи «Execute Package Task» и некоторые дополнительные компоненты и решения.
Здесь тоже будет много картинок.
Продолжим знакомство с SSIS
Создадим в трех демонстрационных базах новую таблицу ProductResidues, которая будет содержать информацию об остатках на каждый день:
USE DemoSSIS_SourceA
GO
CREATE TABLE ProductResidues(
ResidueDate date NOT NULL,
ProductID int NOT NULL,
ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO
USE DemoSSIS_SourceB
GO
CREATE TABLE ProductResidues(
ResidueDate date NOT NULL,
ProductID int NOT NULL,
ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO
USE DemoSSIS_Target
GO
CREATE TABLE ProductResidues(
ResidueDate date NOT NULL,
ProductID int NOT NULL,
ResidueAmount decimal(10,2) NOT NULL,
CONSTRAINT PK_ProductResidues PRIMARY KEY(ResidueDate,ProductID),
CONSTRAINT FK_ProductResidues_ProductID FOREIGN KEY(ProductID) REFERENCES Products(ID)
)
GO
И наполним таблицы в источниках тестовыми данными, поочередно выполнив на базах DemoSSIS_SourceA и DemoSSIS_SourceB следующий скрипт:
USE DemoSSIS_SourceA
--USE DemoSSIS_SourceB
GO
DECLARE @MinDate date=DATEADD(MONTH,-2,GETDATE())
DECLARE @MaxDate date=GETDATE()
;WITH dayCTE AS(
SELECT CAST(@MinDate AS date) ResidueDate,10000 ResidueAmount
UNION ALL
SELECT DATEADD(DAY,1,ResidueDate),ResidueAmount-1
FROM dayCTE
WHERE ResidueDate<@MaxDate
)
INSERT ProductResidues(ResidueDate,ProductID,ResidueAmount)
SELECT
d.ResidueDate,
p.ID,
d.ResidueAmount
FROM dayCTE d
CROSS JOIN Products p
OPTION(MAXRECURSION 0)
Допустим, что в таблице ProductResidues будет очень много строк, и чтобы каждый раз не перезагружать всю информацию и упростить процедуру интеграции, логику загрузки в таблицу ProductResidues в базе DemoSSIS_Target реализуем следующую:
- Если в принимающей БД еще нет записей, то загрузим все данные;
- Если в принимающей БД есть данные, то будем удалять данные за неделю (последние 7 дней) от последней загруженной даты и загружать данные из источника начиная от этой даты снова.
Неудобство интеграции таблиц типа ProductResidues в том, что в ней нет полей, по которым можно было бы однозначно определить, когда появилась запись, в ней нет никакого идентификатора типа ID, нет ни поля типа UpdatedOn, которое содержало бы дату/время последнего обновления записи (т.е. зацепиться особо не за что), иначе бы мы, например, могли вычислить для каждого источника, по данным базы DemoSSIS_Target, последний ID или последнюю UpdatedOn и уже загружать данные из источников начиная от этих стартовых значений. К тому же не всегда есть возможность внести изменения в структуру источников, подстроив их под себя, т.к. это могут быть вообще чужие базы, к которым у нас нет полного доступа.
В нашем же случае еще допустим, что пользователи могут менять данные задним числом и могут вообще удалить некоторые ранее загруженные в базу DemoSSIS_Target строки из источников. Поэтому здесь обновление делается как-бы внахлёст, данные последней недели полностью перезаписываются. Здесь неделя берется условно, об этом минимальном сроке, например, мы могли бы условиться с заказчиком (он мог подтвердить, что данные обычно меняются максимум в течении недели). Конечно это не самый надежный способ, и иногда могут возникнуть расхождения, например, в том случае, когда пользователь поменял данные месячной давности и здесь стоит предусмотреть возможность перезагрузки данных начиная с более ранней даты, мы сделаем это при помощи параметра, в котором будем указывать нужное нам количество дней назад.
Создадим новый SSIS-пакет и назовем его «LoadResidues.dtsx».
При помощи контекстного меню, отобразим область ввода переменных пакета (это можно сделать также при помощи меню «SSIS → Variables»):
В данном пакете, нажав на кнопку «Add Variable», создадим переменную LoadFromDate типа DateTime:
По умолчанию переменной присваивается текущее значение даты/времени, т.к. мы будем переопределять значение внутри пакета, нам это не важно.
Так же стоит обратить внимание на Expression – если прописать в этом поле выражение, то переменная будет работать как формула и мы не сможем изменять ее значение при помощи присваивания. При каждом обращении к такой переменной ее значение будет рассчитываться согласно указанному выражению.
Значения переменных можно задавать при помощи компонента «Expression Task». Давайте рассмотрим, как это делается. Создадим элемент «Expression Task»:
Двойным щелчком откроем редактор данного элемента:
Пропишем следующее выражение:
@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) GETDATE()
Здесь так же была применена двойная конвертация типа, чтобы избавиться от составляющей времени и оставить только дату.
Давайте так же посмотрим как можно отследить значение переменной во время выполнения пакета.
Создадим точку останова на элементе «Expression Task»:
Укажем, что точка должна срабатывать по окончанию выполнения данного блока:
Запустим пакет на выполнение (F5) и после остановке в нашей точке, перейдем на вкладку «Locals»:
Раскроем список Variables и найдем в нем свою переменную:
Для интереса можем поменять выражение элементе «Expression Task» на следующее:
@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) DATEADD("DAY", -7, GETDATE())
и также поэкспериментировать:
Точку останова убирается таким же образом каким и была установлена. Либо можно удалить сразу все точки останова, если их было несколько:
Создадим параметр, который будет отвечать за количество дней назад.
Параметр можно создать, как глобальный для всего проекта:
Так и локальный, внутри конкретного пакета:
Параметр может быть обязательный для задания – за это отвечает флаг Required. Если этот флаг установлен, то при создании задачи или при вызове пакета из другого пакета нужно будет определить входящее значение параметра (мы рассмотрим это далее).
В отличие от переменных значение параметров при помощи «Expression Task» менять нельзя.
Сохраним параметры и снова зайдем в редактор «Expression Task»:
Для примера я поменял выражение на следующее:
@[User::LoadFromDate] = (DT_DATE) (DT_DBDATE) DATEADD("DAY", - @[$Package::DateOffset] , GETDATE())
Думаю, на этом суть параметров и переменных ясна, и мы можем продолжить.
После того как мы поигрались с «Expression Task» мы его удалим.
Создадим «Execute SQL Task»:
Настроим его следующим образом:
Пропишем в SQLStatement следующий запрос:
SELECT ISNULL(DATEADD(DAY,-?,MAX(ResidueDate)),'19000101') FromDate
FROM ProductResidues
Т.к. данный запрос возвращает одну строку, установим ResultSet = «Single Row» и ниже на вкладке «Result Set» сохраним результат в значение переменной LoadFromDate.
На вкладке «Parameter Mapping» зададим значения параметров, которые в запросе обозначены знаком вопроса (?):
Параметры нумеруются, начиная с нуля.
Стоит отметить, что если при создании соединения воспользоваться другим видом провайдера, например, «ADO» или «ADO.Net», то вместо вопросов мы сможем использовать именованные параметры типа @ParamName и в качестве «Parameter Name» тоже могли бы указывать @ParamName, а не его номер. Но увы типом соединения с другим провайдером мы сможем воспользоваться не во всех случаях.
Теперь на вкладке «Result Set» укажем в какую переменную нужно записать результат выполнения запроса:
Здесь так же можем продебажить установив у этого компонента точку останова на «Break when the container receives the OnPostExecute event» и запустив пакет на выполнение:
Здесь я для удобства мониторинга значения переменной прописал название переменной в «Watch», чтобы не искать ее в блоке «Locals».
Как видим все верно в переменную LoadFromDate записалась дата «01.01.1900», т.к. строк в таблице ProductResidues на Target еще нет.
Переименую для наглядности «Execute SQL Task» в «Set LoadFromDate».
Создадим еще один элемент «Execute SQL Task» и назовем его «Delete Old Rows»:
Настроим его следующим образом:
SQLStatement содержит следующий запрос:
DELETE ProductResidues
WHERE ResidueDate>=?
И зададим значение параметра на вкладке «Parameters Mapping»:
Все, удаление старых данных за указанный конечный период у нас реализовано.
Теперь сделаем часть отвечающую загрузку свежих данных. Для этого воспользуемся компонентом «Data Flow Task»:
Зайдем в область данного компонента и создадим «Source Assistant»:
Настроим его следующим образом:
Нажав на кнопку «Parameters…» зададим значение параметра:
Для записи новых данных воспользуемся уже знакомым компонентом «Destination Assistant»:
Протянем стрелку от «Source Assistant» и настроим его:
Все, пакет для переноса данных с источника SourceA у нас готов, можем запустить его на выполнение:
Запустим еще раз:
Как видим при повторном запуске удалились и залились только данные за указанный период. Ну вроде все работает так как мы и хотели. На всякий случай сверимся, что данные получателя по количеству равны данным источника:
Дойдя до сюда, я понял, что я допустил ошибку. Кто понял в чем дело, молодец!
Но может это и хорошо, т.к. пример получился не таким перегруженным.
Ошибка в том, что я забыл учесть, что при интеграции данных таблицы Products у нас в Target формируются свои идентификаторы (поле ID с флагом IDENTITY)!
Давайте переделаем, чтобы все было правильно. Ничего страшного повторим, зато лучше запомним.
Забежим чуть вперед и добавим в пакет еще один параметр, который назовем «SourceID»:
Перенастроим «Set LoadFromDate»:
В SQLStatement пропишем новый запрос с учетом SourceID:
SELECT ISNULL(DATEADD(DAY,-?,MAX(res.ResidueDate)),'19000101') FromDate
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
WHERE prod.SourceID=?
Настроим второй параметр:
Теперь перенастроим «Delete Old Rows» аналогичным образом, чтобы учитывался SourceID:
В SQLStatement пропишем новый запрос с учетов SourceID:
DELETE res
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
WHERE ResidueDate>=?
AND prod.SourceID=?
Настроим второй параметр:
Теперь зайдем в «Data Flow Task», удалим цепочку и добавим «Derived Column»:
Настроим его следующим образом:
Здесь я намеренно оставил тип «Unicode string», а не сделал преобразование как в первой части. Давайте за одно рассмотрим компонент «Data Conversion»:
Настроим его:
Теперь при помощи Lookup сделаем сопоставление и получим нужные нам идентификаторы продуктов:
Настроим его:
Теперь протянем синюю стрелку от Lookup к «OLE DB Destination»:
Выберем поток «Lookup Match Output»:
Настроим «OLE DB Destination», нужно перестроить Mappings:
Все, сделаем очистку таблицы от неправильно загруженных данных:
TRUNCATE TABLE DemoSSIS_Target.dbo.ProductResidues
И запустим пакет на выполнение:
И еще раз:
Похоже на правду. Можете самостоятельно проверить правильно ли разнеслись идентификаторы продуктов.
Так как структура DemoSSIS_SourceA и DemoSSIS_SourceB одинакова и нам нужно сделать для DemoSSIS_SourceB, то же самое, то мы можем при создании задачи создать два шага для пакета «LoadResidues.dtsx», в первом шаге настроить подключение к базе DemoSSIS_SourceA, а на втором шаге DemoSSIS_SourceB.
Откомпилируем и передеплоим SSIS проект:
Давайте теперь создадим новое задание в SQL Agent:
На вкладке Steps создадим шаг 1 для загрузки продуктов:
Создадим шаг 2 для загрузки остатков из SourceA:
На вкладке Configuration можем увидеть наши параметры:
Здесь от нас требуют ввести SourceID, т.к. мы указали его как Required. Зададим его:
Данные вкладки «Connection Managers» изменять для этого шаге не будем.
Создадим шаг 3 для загрузки остатков из SourceB:
Зададим параметр SourceID:
И изменим данные соединения SourceA таким образом, чтобы оно ссылалось на базу DemoSSIS_SourceB:
В данном случае мне достаточно было изменить ConnectionString и InitialCatalog, теперь они указывают на DemoSSIS_SourceB.
В итоге мы должны получить следующее – три шага:
Запустим эту задачу на выполнение:
Выполним запрос:
USE DemoSSIS_Target
GO
SELECT prod.SourceID,COUNT(*)
FROM ProductResidues res
JOIN Products prod ON res.ProductID=prod.ID
GROUP BY prod.SourceID
И убедимся, что все отработало как надо:
Теперь допустим, что базы DemoSSIS_SourceA и DemoSSIS_SourceB расположены на одном экземпляре SQL Server. Давайте переделаем «OLE DB Source»:
Текст команды:
DECLARE @SourceID char(1)=?
IF(@SourceID='A') USE DemoSSIS_SourceA
ELSE USE DemoSSIS_SourceB
SELECT
ResidueDate,
ProductID,
ResidueAmount
FROM ProductResidues
WHERE ResidueDate>=?
Зададим параметры:
Теперь наш пакет в зависимости от значения параметра SourceID будет брать данные либо из SourceA, либо из SourceB.
Для теста можете изменить значение параметра SourceB на «B» и запустить проект на выполнение:
Давайте теперь создадим новый пакет «LoadAll.dtsx» и создадим в нем «Execute Package Task» (переименуем его в «Load Products»):
Настроим «Load Products»:
Создадим в новом пакете 2 параметра:
В области «Control Flow» создадим еще 2 компонента «Execute Package Task» которые назовем «Load Resudues A» и «Load Resudues B»:
Настроим их задав у обоих название пакета «LoadResidues.dtsx»:
Зададим обязательный параметр SourceID для «Load Resudues A»:
Зададим обязательный параметр SourceID для «Load Resudues B»:
Обратите внимание, что у стрелок тоже есть свои свойства, например, мы можем поменять свойство Value на Completion, что будет означать, что следующий шаг будет выполнен даже в том случае если на шаге «Load Resudues A» произойдет ошибка:
Все, можем запустить пакет на выполнение:
Думаю, объяснять, что здесь произошло нет смысла.
Иногда параметры для конкретного пакета удобно хранить в вспомогательной таблице и считывать их оттуда в переменные пакета, например, используя для поиска глобальную переменную «System::PackageName». Для демонстрации, давайте переделаем наш пакет таким образом.
Создадим таблицу с параметрами:
USE DemoSSIS_Target
GO
CREATE TABLE IntegrationPackageParams(
PackageName nvarchar(128) NOT NULL,
DateOffset int NOT NULL,
CONSTRAINT PK_IntegrationPackageParams PRIMARY KEY(PackageName)
)
GO
INSERT IntegrationPackageParams(PackageName,DateOffset)VALUES
(N'LoadResidues',7)
GO
Удалим параметр DateOffset из пакета «LoadResidues.dtsx»:
Создадим в пакете переменную DateOffset:
В область «Control Flow» добавим еще один элемент «Execute SQL Task» и переименуем его «Load Params»:
Настроим его:
Запрос в SQLStatement пропишем следующий:
SELECT DateOffset
FROM IntegrationPackageParams
WHERE PackageName=?
Настроим параметр запроса используя системную переменную «System::PackageName»:
Осталось сбросить результат выполнения запроса в переменную:
Теперь осталось перенастроить «Set LoadFromDate», чтобы в нем использовалась теперь переменная:
Все, можем тестировать новую версию пакета.
Вот мы и добрались до финиша. Мои поздравления!
Заключение по третьей части
Уважаемые читатели, эта часть будет заключительной.
В данном цикле статей, я постарался продумать примеры таким образом, чтобы сделать их как можно короче и в свою очередь охватить как можно больше полезных и важных деталей.
Думаю, освоив это, далее вы уже без особого труда сможете освоить работу с остальными компонентами SSIS. В данных статьях я рассмотрел только самые важные компоненты (наиболее часто применяемые на моей практике), но зная только это вы уже можно сделать очень многое. По мере надобности изучайте самостоятельно другие компоненты, в первую очередь порекомендовал бы посмотреть следующее:
- компоненты-контейнеры (Sequence Container, For Loop Container, Foreach Loop Container);
- «Merge Join», который позволяет сделать операции JOIN, LEFT JOIN, FULL JOIN на стороне SSIS (это требует предварительно отсортировать два набора при помощи «Sort»);
- «Conditional Split», который позволяет разбить один поток на несколько в зависимости от условий;
- так же рассмотрите вкладку «Event Handlers», которая позволяет создать дополнительные области для определенного вида события пакета.
Доступного материала на эту тему очень много, используйте MSDN, Youtube и прочие источники.
Я не старался сделать подробный учебник (думаю, все это уже есть), а старался сделать такой материал, который позволит начинающим шаг за шагом создать все с самого нуля и делая все своими руками увидеть всю картину в целом, а после уже имея основу идти дальше самостоятельно. Очень надеюсь, что у меня это получилось и материал окажется полезен именно в таком ключе.
Я очень рад, что мне хватило сил осуществить задуманное и описать все так как я это хотел, даже получилось сделать большее, так как из-за допущенной в этой части ошибки возникли неожиданные повороты сюжета, но так я думаю, стало даже интересней. ;)
Спасибо за внимание! Удачи!
И возможно, до встреч в новых статьях…
Автор: Leran2002