Упражнение. Разработка и реализация медленно изменяющегося измерения типа 1 с потоками данных для сопоставления

Завершено

В этом упражнении вы создадите поток данных для медленно изменяющегося измерения типа 1, используя в качестве источника и места назначения выделенный пул SQL Azure Synapse. Затем этот поток данных можно добавить в конвейер Synapse и запустить в рамках процесса извлечения, преобразования и загрузки (ETL).

Настройка источника и таблицы измерения

Для этого упражнения необходимо загрузить таблицу измерения в Azure Synapse из исходных данных, которые могут находиться в системах множества различных типов, например Azure SQL, хранилище Azure и т. д. Для простоты в этом примере мы создадим исходные данные в базе данных Azure Synapse.

  1. В Synapse Studio перейдите в центр Данные.

    Data hub.

  2. Перейдите на вкладку Рабочая область(1), разверните узел "Базы данных", а затем щелкните правой кнопкой мыши SQLPool01 (2). Выберите New SQL script (3) (Создать скрипт SQL), а затем выберите Empty script (4) (Пустой скрипт).

    The data hub is displayed with the context menus to create a new SQL script.

  3. Вставьте следующий скрипт в пустое окно скрипта, а затем выберите Запустить или нажмите F5 для выполнения запроса:

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    The script and Run button are both highlighted.

Создание потока данных для сопоставления

Потоки данных для сопоставления — это действия конвейера, которые позволяют визуально указать, как следует преобразовать данные, без написания кода. Далее вы создадите поток данных для сопоставления для медленно изменяющегося измерения типа 1.

  1. Перейдите в центр Разработка.

    Develop hub.

  2. Выберите +, а затем — Поток данных.

    The plus button and data flow menu item are highlighted.

  3. На панели свойств нового потока данных введите UpdateCustomerDimension в поле Имя(1), а затем нажмите кнопку Свойства(2), чтобы скрыть панель свойств.

    The data flow properties pane is displayed.

  4. Выберите Добавить источник на холсте.

    The Add Source button is highlighted on the data flow canvas.

  5. В разделе Source settings настройте следующие свойства:

    • Имя потока вывода: укажите SourceDB
    • Тип источника: выберите Dataset
    • Параметры: установите флажок Allow schema drift; другие параметры должны остаться невыбранными
    • Выборка: выберите Disable
    • Набор данных: выберите + Создать, чтобы создать новый набор данных

    The New button is highlighted next to Dataset.

  6. В диалоговом окне для создания набора данных интеграции выберите Azure Synapse Analytics, а затем нажмите кнопку Продолжить.

    Azure SQL Database and the Continue button are highlighted.

  7. В свойствах набора данных настройте следующие параметры:

    • Имя: введите CustomerSource
    • Связанная служба: выберите связанную службу рабочей области Synapse
    • Имя таблицы: нажмите кнопку "Обновить" рядом с раскрывающимся списком

    The form is configured as described and the refresh button is highlighted.

  8. В поле Значение введите имя пула SQL, а затем нажмите кнопку ОК.

    The SQLPool01 parameter is highlighted.

  9. Выберите dbo.CustomerSource в разделе Имя таблицы, выберите From connection/store в разделе Импорт схемы, а затем нажмите кнопку ОК, чтобы создать набор данных.

    The form is completed as described.

  10. Нажмите кнопку Открыть рядом с добавленным набором данных CustomerSource.

    The open button is highlighted next to the new dataset.

  11. Введите имя пула SQL в поле Значение рядом с DBName.

  12. В редакторе потока данных выберите поле Добавить источник под действием SourceDB. Настройте этот источник в качестве таблицы DimCustomer, выполнив те же действия, которые использовались для CustomerSource.

    • Имя потока вывода: укажите DimCustomer
    • Тип источника: выберите Dataset
    • Параметры: установите флажок Allow schema drift; другие параметры должны остаться невыбранными
    • Выборка: выберите Disable
    • Набор данных: выберите + Создать, чтобы создать новый набор данных. Используйте связанную службу Azure Synapse и выберите таблицу DimCustomer. Убедитесь, что в качестве значения параметра DBName задано имя пула SQL.

    The Add Source, Output stream name, and Dataset name are highlighted in the Source settings.

Добавление преобразований в поток данных

  1. Выберите + справа от источника SourceDB на холсте, а затем выберите Производный столбец.

    The plus button and derived column menu item are highlighted.

  2. В разделе Derived column's settings настройте следующие свойства:

    • Имя потока вывода: укажите CreateCustomerHash
    • Входящий поток: выберите SourceDB
    • Столбцы: укажите следующие параметры:
    Column Expression Description
    Введите HashKey. sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) Создает хэш SHA256 на основе значений таблицы. Он используется для обнаружения изменений строк путем сравнения хэша входящих записей с хэшем записей в месте назначения; сравнение производится по значению CustomerID. Функция iifNull заменяет значения NULL пустыми строками. В противном случае при наличии значений NULL значения хэша будут дублироваться.

    The Derived column's settings form is configured as described.

  3. Выберите + справа от производного столбца CreateCustomerHash на холсте, а затем выберите Существует.

    The plus button and exists menu item are both highlighted.

  4. В разделе Exists settings настройте следующие свойства:

    • Имя потока вывода: укажите Exists
    • Левый поток: выберите CreateCustomerHash
    • Правый поток: выберите SynapseDimCustomer
    • Тип существования: выберите Doesn't exist
    • Условия столбца "Существует": установите следующие значения слева и справа:
    Слева: столбец CreateCustomerHash Справа: столбец SynapseDimCustomer
    HashKey HashKey

    The Exists settings form is configured as described.

  5. Выберите + справа от Exists на холсте, а затем выберите Поиск.

    The plus button and lookup menu item are both highlighted.

  6. В разделе Lookup settings настройте следующие свойства:

    • Имя потока вывода: укажите LookupCustomerID
    • Основной поток: выберите Exists
    • Поток поиска: выберите SynapseDimCustomer
    • Сопоставить несколько строк: не выбрано
    • Сопоставлять по: выберите Any row
    • Условия поиска: установите следующие значения слева и справа:
    Слева: столбец "Существует" Справа: столбец SynapseDimCustomer
    CustomerID CustomerID

    The Lookup settings form is configured as described.

  7. Выберите + справа от LookupCustomerID на холсте, а затем выберите Производный столбец.

    The plus button and derived column menu item are both highlighted.

  8. В разделе Derived column's settings настройте следующие свойства:

    • Имя потока вывода: укажите SetDates
    • Входящий поток: выберите LookupCustomerID
    • Столбцы: укажите следующие параметры:
    Column Expression Description
    Выберите InsertedDate iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) Если значение InsertedDate равно NULL, вставьте текущую метку времени. В противном случае используйте значение InsertedDate.
    Выберите ModifiedDate currentTimestamp() Всегда обновляйте значение ModifiedDate текущей меткой времени.

    Another Derived column's settings form is configured as described.

    Примечание.

    Чтобы вставить второй столбец, выберите + Добавить над списком "Столбцы", а затем выберите Добавить столбец.

  9. Выберите на холсте + справа от шага производного столбца SetDates, а затем щелкните Изменить строку.

    The plus button and alter row menu item are both highlighted.

  10. В разделе Alter row settings настройте следующие свойства:

    • Имя потока вывода: укажите AllowUpserts
    • Входящий поток: выберите SetDates
    • Условия изменения строки: введите следующее:
    Condition Expression Description
    Выберите Upsert if true() Задайте условие true() для условия Upsert if, чтобы разрешить операции upsert. Это гарантирует, что все данные, прошедшие через шаги в потоке данных сопоставления, будут вставлены или обновлены в приемнике.

    The alter row settings form is configured as described.

  11. Выберите + справа от шага изменения строки AllowUpserts на холсте, а затем выберите Приемник.

    The plus button and sink menu item are both highlighted.

  12. В разделе Sink настройте следующие свойства:

    • Имя потока вывода: укажите Sink
    • Входящий поток: выберите AllowUpserts
    • Тип приемника: выберите Dataset
    • Набор данных: выберите DimCustomer
    • Параметры: установите флажок Allow schema drift и снимите флажок Validate schema

    The sink properties form is configured as described.

  13. Перейдите на вкладку Параметры и настройте следующие свойства:

    • Метод обновления: установите флажок Allow upsert и снимите флажки для всех остальных параметров
    • Ключевые столбцы: выберите List of columns, а затем выберите CustomerID в списке
    • Действие таблицы: выберите None
    • Включить промежуточное хранение: снимите флажок

    The sink settings are configured as described.

  14. Перейдите на вкладку Сопоставление, а затем снимите флажок Автоматическое сопоставление. Настройте сопоставление входных столбцов, как описано ниже:

    Входные столбцы Выходные столбцы
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    Mapping settings are configured as described.

  15. Готовый поток данных сопоставления будет выглядеть так, как показано ниже. Выберите Опубликовать все, чтобы сохранить изменения.

    The completed data flow is displayed and Publish all is highlighted.

  16. Выберите Опубликовать.

    The publish button is highlighted.

Проверка потока данных

Вы подготовили поток данных для медленно изменяющегося измерения типа 1. Если вы решили протестировать его, можно добавить этот поток данных в конвейер интеграции Synapse. Затем можно запустить конвейер один раз, чтобы выполнить первоначальную загрузку исходных данных клиента в место назначения DimCustomer.

При каждом следующем запуске конвейера данные в исходной таблице будут сравниваться с теми данными, которые уже есть в таблице измерения (с помощью ключа кэша), и будут обновляться только измененные записи. Чтобы это проверить, можно обновить запись в исходной таблице, а затем снова запустить конвейер и проверить обновления записей в таблице измерения.

Возьмем в качестве примера клиента Джанет Гейтс (Janet Gates). При первоначальной загрузке столбец LastName содержит значение "Gates", а столбец CustomerId содержит значение 4.

The script is displayed with the initial customer record.

Ниже приведен пример инструкции, которая обновляет фамилию клиента в исходной таблице.

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

После обновления записи и повторного запуска конвейера эти обновленные данные появятся в DimCustomer.

The script is displayed with the updated customer record.

Значение LastName в записи клиента было успешно обновлено в соответствии с исходной записью, а также было обновлено ModifiedDate без отслеживания старого значения LastName. Это ожидаемое поведение для медленно изменяющегося измерения типа 1. Если для поля LastName требуется сохранять исторические данные, то необходимо изменить таблицу и поток данных на один из других изученных типов медленно изменяющегося измерения.