Упражнение. Разработка и реализация медленно изменяющегося измерения типа 1 с потоками данных для сопоставления
В этом упражнении вы создадите поток данных для медленно изменяющегося измерения типа 1, используя в качестве источника и места назначения выделенный пул SQL Azure Synapse. Затем этот поток данных можно добавить в конвейер Synapse и запустить в рамках процесса извлечения, преобразования и загрузки (ETL).
Настройка источника и таблицы измерения
Для этого упражнения необходимо загрузить таблицу измерения в Azure Synapse из исходных данных, которые могут находиться в системах множества различных типов, например Azure SQL, хранилище Azure и т. д. Для простоты в этом примере мы создадим исходные данные в базе данных Azure Synapse.
В Synapse Studio перейдите в центр Данные.
Перейдите на вкладку Рабочая область(1), разверните узел "Базы данных", а затем щелкните правой кнопкой мыши SQLPool01 (2). Выберите New SQL script (3) (Создать скрипт SQL), а затем выберите Empty script (4) (Пустой скрипт).
Вставьте следующий скрипт в пустое окно скрипта, а затем выберите Запустить или нажмите
F5
для выполнения запроса:CREATE TABLE [dbo].[CustomerSource] ( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8), [FirstName] [nvarchar](50), [MiddleName] [nvarchar](50), [LastName] [nvarchar](50), [Suffix] [nvarchar](10), [CompanyName] [nvarchar](128), [SalesPerson] [nvarchar](256), [EmailAddress] [nvarchar](50), [Phone] [nvarchar](25) ) WITH ( HEAP ) COPY INTO [dbo].[CustomerSource] FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv' WITH ( FILE_TYPE='CSV', FIELDTERMINATOR='|', FIELDQUOTE='', ROWTERMINATOR='0x0a', ENCODING = 'UTF16' ) CREATE TABLE dbo.[DimCustomer]( [CustomerID] [int] NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NOT NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NOT NULL, [Suffix] [nvarchar](10) NULL, [CompanyName] [nvarchar](128) NULL, [SalesPerson] [nvarchar](256) NULL, [EmailAddress] [nvarchar](50) NULL, [Phone] [nvarchar](25) NULL, [InsertedDate] [datetime] NOT NULL, [ModifiedDate] [datetime] NOT NULL, [HashKey] [char](64) ) WITH ( DISTRIBUTION = REPLICATE, CLUSTERED COLUMNSTORE INDEX )
Создание потока данных для сопоставления
Потоки данных для сопоставления — это действия конвейера, которые позволяют визуально указать, как следует преобразовать данные, без написания кода. Далее вы создадите поток данных для сопоставления для медленно изменяющегося измерения типа 1.
Перейдите в центр Разработка.
Выберите +, а затем — Поток данных.
На панели свойств нового потока данных введите
UpdateCustomerDimension
в поле Имя(1), а затем нажмите кнопку Свойства(2), чтобы скрыть панель свойств.Выберите Добавить источник на холсте.
В разделе
Source settings
настройте следующие свойства:- Имя потока вывода: укажите
SourceDB
- Тип источника: выберите
Dataset
- Параметры: установите флажок
Allow schema drift
; другие параметры должны остаться невыбранными - Выборка: выберите
Disable
- Набор данных: выберите + Создать, чтобы создать новый набор данных
- Имя потока вывода: укажите
В диалоговом окне для создания набора данных интеграции выберите Azure Synapse Analytics, а затем нажмите кнопку Продолжить.
В свойствах набора данных настройте следующие параметры:
- Имя: введите
CustomerSource
- Связанная служба: выберите связанную службу рабочей области Synapse
- Имя таблицы: нажмите кнопку "Обновить" рядом с раскрывающимся списком
- Имя: введите
В поле Значение введите имя пула SQL, а затем нажмите кнопку ОК.
Выберите
dbo.CustomerSource
в разделе Имя таблицы, выберитеFrom connection/store
в разделе Импорт схемы, а затем нажмите кнопку ОК, чтобы создать набор данных.Нажмите кнопку Открыть рядом с добавленным набором данных
CustomerSource
.Введите имя пула SQL в поле Значение рядом с
DBName
.В редакторе потока данных выберите поле Добавить источник под действием SourceDB. Настройте этот источник в качестве таблицы DimCustomer, выполнив те же действия, которые использовались для CustomerSource.
- Имя потока вывода: укажите
DimCustomer
- Тип источника: выберите
Dataset
- Параметры: установите флажок
Allow schema drift
; другие параметры должны остаться невыбранными - Выборка: выберите
Disable
- Набор данных: выберите + Создать, чтобы создать новый набор данных. Используйте связанную службу Azure Synapse и выберите таблицу DimCustomer. Убедитесь, что в качестве значения параметра DBName задано имя пула SQL.
- Имя потока вывода: укажите
Добавление преобразований в поток данных
Выберите + справа от источника
SourceDB
на холсте, а затем выберите Производный столбец.В разделе
Derived column's settings
настройте следующие свойства:- Имя потока вывода: укажите
CreateCustomerHash
- Входящий поток: выберите
SourceDB
- Столбцы: укажите следующие параметры:
Column Expression Description Введите HashKey
.sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,''))
Создает хэш SHA256 на основе значений таблицы. Он используется для обнаружения изменений строк путем сравнения хэша входящих записей с хэшем записей в месте назначения; сравнение производится по значению CustomerID
. ФункцияiifNull
заменяет значения NULL пустыми строками. В противном случае при наличии значений NULL значения хэша будут дублироваться.- Имя потока вывода: укажите
Выберите + справа от производного столбца
CreateCustomerHash
на холсте, а затем выберите Существует.В разделе
Exists settings
настройте следующие свойства:- Имя потока вывода: укажите
Exists
- Левый поток: выберите
CreateCustomerHash
- Правый поток: выберите
SynapseDimCustomer
- Тип существования: выберите
Doesn't exist
- Условия столбца "Существует": установите следующие значения слева и справа:
Слева: столбец CreateCustomerHash Справа: столбец SynapseDimCustomer HashKey
HashKey
- Имя потока вывода: укажите
Выберите + справа от
Exists
на холсте, а затем выберите Поиск.В разделе
Lookup settings
настройте следующие свойства:- Имя потока вывода: укажите
LookupCustomerID
- Основной поток: выберите
Exists
- Поток поиска: выберите
SynapseDimCustomer
- Сопоставить несколько строк: не выбрано
- Сопоставлять по: выберите
Any row
- Условия поиска: установите следующие значения слева и справа:
Слева: столбец "Существует" Справа: столбец SynapseDimCustomer CustomerID
CustomerID
- Имя потока вывода: укажите
Выберите + справа от
LookupCustomerID
на холсте, а затем выберите Производный столбец.В разделе
Derived column's settings
настройте следующие свойства:- Имя потока вывода: укажите
SetDates
- Входящий поток: выберите
LookupCustomerID
- Столбцы: укажите следующие параметры:
Column Expression Description Выберите InsertedDate
iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate})
Если значение InsertedDate
равно NULL, вставьте текущую метку времени. В противном случае используйте значениеInsertedDate
.Выберите ModifiedDate
currentTimestamp()
Всегда обновляйте значение ModifiedDate
текущей меткой времени.Примечание.
Чтобы вставить второй столбец, выберите + Добавить над списком "Столбцы", а затем выберите Добавить столбец.
- Имя потока вывода: укажите
Выберите на холсте + справа от шага производного столбца
SetDates
, а затем щелкните Изменить строку.В разделе
Alter row settings
настройте следующие свойства:- Имя потока вывода: укажите
AllowUpserts
- Входящий поток: выберите
SetDates
- Условия изменения строки: введите следующее:
Condition Expression Description Выберите Upsert if
true()
Задайте условие true()
для условияUpsert if
, чтобы разрешить операции upsert. Это гарантирует, что все данные, прошедшие через шаги в потоке данных сопоставления, будут вставлены или обновлены в приемнике.- Имя потока вывода: укажите
Выберите + справа от шага изменения строки
AllowUpserts
на холсте, а затем выберите Приемник.В разделе
Sink
настройте следующие свойства:- Имя потока вывода: укажите
Sink
- Входящий поток: выберите
AllowUpserts
- Тип приемника: выберите
Dataset
- Набор данных: выберите
DimCustomer
- Параметры: установите флажок
Allow schema drift
и снимите флажокValidate schema
- Имя потока вывода: укажите
Перейдите на вкладку Параметры и настройте следующие свойства:
- Метод обновления: установите флажок
Allow upsert
и снимите флажки для всех остальных параметров - Ключевые столбцы: выберите
List of columns
, а затем выберитеCustomerID
в списке - Действие таблицы: выберите
None
- Включить промежуточное хранение: снимите флажок
- Метод обновления: установите флажок
Перейдите на вкладку Сопоставление, а затем снимите флажок Автоматическое сопоставление. Настройте сопоставление входных столбцов, как описано ниже:
Входные столбцы Выходные столбцы SourceDB@CustomerID
CustomerID
SourceDB@Title
Title
SourceDB@FirstName
FirstName
SourceDB@MiddleName
MiddleName
SourceDB@LastName
LastName
SourceDB@Suffix
Suffix
SourceDB@CompanyName
CompanyName
SourceDB@SalesPerson
SalesPerson
SourceDB@EmailAddress
EmailAddress
SourceDB@Phone
Phone
InsertedDate
InsertedDate
ModifiedDate
ModifiedDate
CreateCustomerHash@HashKey
HashKey
Готовый поток данных сопоставления будет выглядеть так, как показано ниже. Выберите Опубликовать все, чтобы сохранить изменения.
Выберите Опубликовать.
Проверка потока данных
Вы подготовили поток данных для медленно изменяющегося измерения типа 1. Если вы решили протестировать его, можно добавить этот поток данных в конвейер интеграции Synapse. Затем можно запустить конвейер один раз, чтобы выполнить первоначальную загрузку исходных данных клиента в место назначения DimCustomer.
При каждом следующем запуске конвейера данные в исходной таблице будут сравниваться с теми данными, которые уже есть в таблице измерения (с помощью ключа кэша), и будут обновляться только измененные записи. Чтобы это проверить, можно обновить запись в исходной таблице, а затем снова запустить конвейер и проверить обновления записей в таблице измерения.
Возьмем в качестве примера клиента Джанет Гейтс (Janet Gates). При первоначальной загрузке столбец LastName
содержит значение "Gates", а столбец CustomerId
содержит значение 4.
Ниже приведен пример инструкции, которая обновляет фамилию клиента в исходной таблице.
UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4
После обновления записи и повторного запуска конвейера эти обновленные данные появятся в DimCustomer.
Значение LastName
в записи клиента было успешно обновлено в соответствии с исходной записью, а также было обновлено ModifiedDate
без отслеживания старого значения LastName
. Это ожидаемое поведение для медленно изменяющегося измерения типа 1. Если для поля LastName
требуется сохранять исторические данные, то необходимо изменить таблицу и поток данных на один из других изученных типов медленно изменяющегося измерения.