演習 - マッピング データ フローを使用して、タイプ 1 の緩やかに変化するディメンションを設計して実装する

完了

この演習では、Azure Synapse 専用 SQL プールをソースとターゲットとして使用して、タイプ 1 の SCD のデータ フローを作成します。 このデータ フローを Synapse パイプラインに追加して、抽出、変換、読み込み (ETL) プロセスの一部として実行できます。

ソースとディメンション テーブルの設定

この演習では、Azure SQL、Azure ストレージなどのさまざまなシステムの種類のソース データから、Azure Synapse でディメンション テーブルを読み込む必要があります。この例では、Azure Synapse データベースにソース データを作成して、シンプルにします。

  1. Synapse Studio から、[データ] ハブに移動します。

    Data hub.

  2. [ワークスペース] タブ (1) を選択し、[データベース] を展開して、SQLPool01 (2) を右クリックします。 [New SQL script](新しい SQL スクリプト) (3) を選択し、[Empty script](空のスクリプト) (4) を選択します。

    The data hub is displayed with the context menus to create a new SQL script.

  3. 次のスクリプトを空のスクリプト ウィンドウに貼り付け、[実行] または F5 キーを押してクエリを実行します。

    CREATE TABLE [dbo].[CustomerSource] (
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8),
        [FirstName] [nvarchar](50),
        [MiddleName] [nvarchar](50),
        [LastName] [nvarchar](50),
        [Suffix] [nvarchar](10),
        [CompanyName] [nvarchar](128),
        [SalesPerson] [nvarchar](256),
        [EmailAddress] [nvarchar](50),
        [Phone] [nvarchar](25)
    ) WITH ( HEAP )
    
    COPY INTO [dbo].[CustomerSource]
    FROM 'https://solliancepublicdata.blob.core.windows.net/dataengineering/dp-203/awdata/CustomerSource.csv'
    WITH (
        FILE_TYPE='CSV',
        FIELDTERMINATOR='|',
        FIELDQUOTE='',
        ROWTERMINATOR='0x0a',
        ENCODING = 'UTF16'
    )
    
    CREATE TABLE dbo.[DimCustomer](
        [CustomerID] [int] NOT NULL,
        [Title] [nvarchar](8) NULL,
        [FirstName] [nvarchar](50) NOT NULL,
        [MiddleName] [nvarchar](50) NULL,
        [LastName] [nvarchar](50) NOT NULL,
        [Suffix] [nvarchar](10) NULL,
        [CompanyName] [nvarchar](128) NULL,
        [SalesPerson] [nvarchar](256) NULL,
        [EmailAddress] [nvarchar](50) NULL,
        [Phone] [nvarchar](25) NULL,
        [InsertedDate] [datetime] NOT NULL,
        [ModifiedDate] [datetime] NOT NULL,
        [HashKey] [char](64)
    )
    WITH
    (
        DISTRIBUTION = REPLICATE,
        CLUSTERED COLUMNSTORE INDEX
    )
    

    The script and Run button are both highlighted.

マッピング データ フローを作成する

マッピング データ フローはパイプライン アクティビティで、コードを書かないデータ変換方法を指定する視覚的な方法を提供します。 次に、タイプ 1 の SCD を作成するマッピング データ フローを作成します。

  1. [開発] ハブに移動します。

    Develop hub.

  2. [+] を選んでから、[データ フロー] を選びます。

    The plus button and data flow menu item are highlighted.

  3. 新しいデータ フローのプロパティ ペインで、[名前] フィールド (1) に「UpdateCustomerDimension」と入力し、[プロパティ] ボタン (2) を選択してプロパティ ペインを非表示にします。

    The data flow properties pane is displayed.

  4. キャンバス上の [ソースの追加] を選択します。

    The Add Source button is highlighted on the data flow canvas.

  5. Source settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「SourceDB」を入力します
    • [ソースの種類]: Dataset を選択します
    • [オプション]: Allow schema drift をオンにし、他のオプションをオフのままにします
    • [サンプリング]: Disable を選択します
    • [データセット]: [+ 新規] を選択して、新しいデータセットを作成します

    The New button is highlighted next to Dataset.

  6. 新しい統合データセット ダイアログで、[Azure Synapse Analytics] を選択し、[続行] を選択します。

    Azure SQL Database and the Continue button are highlighted.

  7. データセットのプロパティで、次のように構成します。

    • [名前]: 「CustomerSource」を入力します
    • [リンクされたサービス]: Synapse ワークスペースのリンクされたサービスを選択します
    • [テーブル名]: ドロップダウンの横にある [更新] ボタンを選択します

    The form is configured as described and the refresh button is highlighted.

  8. [値] フィールドにご自分の SQL プール名を入力し、[OK] を選択します。

    The SQLPool01 parameter is highlighted.

  9. [テーブル名]dbo.CustomerSource を選択し、[スキーマのインポート]From connection/store を選択し、[OK] を選択して、データセットを作成します。

    The form is completed as described.

  10. 追加した CustomerSource データセットの横にある [開く] を選択します。

    The open button is highlighted next to the new dataset.

  11. DBName の横にある [値] フィールドにご自分の SQL プール名を入力します。

  12. データ フロー エディターで、SourceDB アクティビティの下にある [ソースの追加] ボックスを選択します。 CustomerSource で使用したのと同じ手順に従って、このソースを DimCustomer テーブルとして構成します。

    • [出力ストリーム名]: 「DimCustomer」を入力します
    • [ソースの種類]: Dataset を選択します
    • [オプション]: Allow schema drift をオンにし、他のオプションをオフのままにします
    • [サンプリング]: Disable を選択します
    • [データセット]: [+ 新規] を選択して、新しいデータセットを作成します。 リンクされたサービスの Azure Synapse を使用して、DimCustomer テーブルを選択します。 ご自分の SQL プール名には必ず DBName を設定してください。

    The Add Source, Output stream name, and Dataset name are highlighted in the Source settings.

変換をデータ フローに追加する

  1. キャンバス上の SourceDB ソースの右側にある [+] を選んでから、[派生列] を選びます。

    The plus button and derived column menu item are highlighted.

  2. Derived column's settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「CreateCustomerHash」を入力します
    • [Incoming stream](受信ストリーム): SourceDB を選択します
    • [列]: 次のように入力します。
    説明
    HashKey」を入力 sha2(256, iifNull(Title,'') +FirstName +iifNull(MiddleName,'') +LastName +iifNull(Suffix,'') +iifNull(CompanyName,'') +iifNull(SalesPerson,'') +iifNull(EmailAddress,'') +iifNull(Phone,'')) テーブル値の SHA256 ハッシュを作成します。 これを使用して、入力レコードのハッシュとターゲット レコードのハッシュ値を比較し、CustomerID 値で照合することで、行の変更を検出します。 iifNull 関数を使用して、null 値を空の文字列に置き換えます。 それ以外の場合、null エントリが存在すると、ハッシュ値が重複する傾向があります。

    The Derived column's settings form is configured as described.

  3. キャンバス上の CreateCustomerHash 派生列の右側にある [+] を選んでから、[存在する] を選びます。

    The plus button and exists menu item are both highlighted.

  4. Exists settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「Exists」を入力します
    • [Left stream](左側のストリーム): CreateCustomerHash を選択します
    • [Right stream](右側のストリーム): SynapseDimCustomer を選択します
    • [Exist type](存在タイプ): Doesn't exist を選択します
    • [Exists conditions](存在条件): 左と右に次のように設定します。
    左: CreateCustomerHash の列 右: SynapseDimCustomer の列
    HashKey HashKey

    The Exists settings form is configured as described.

  5. キャンバス上の Exists の右側にある [+] を選んでから、[検索] を選びます。

    The plus button and lookup menu item are both highlighted.

  6. Lookup settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「LookupCustomerID」を入力します
    • [Primary stream](プライマリ ストリーム): Exists を選択します
    • [Lookup stream](検索ストリーム): SynapseDimCustomer を選択します
    • [Match multiple rows](複数の行の一致): オフ
    • [Match on](一致対象): Any row を選択します
    • [Lookup conditions](検索条件): 左と右に次のように設定します。
    左: Exists の列 右: SynapseDimCustomer の列
    CustomerID CustomerID

    The Lookup settings form is configured as described.

  7. キャンバス上の LookupCustomerID の右側にある [+] を選んでから、[派生列] を選択します。

    The plus button and derived column menu item are both highlighted.

  8. Derived column's settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「SetDates」を入力します
    • [Incoming stream](受信ストリーム): LookupCustomerID を選択します
    • [列]: 次のように入力します。
    説明
    [InsertedDate] を選択します iif(isNull(InsertedDate), currentTimestamp(), {InsertedDate}) InsertedDate 値が null の場合、現在のタイムスタンプを挿入します。 それ以外の場合は、InsertedDate 値を使用します。
    [ModifiedDate] を選択します currentTimestamp() 常に ModifiedDate 値を現在のタイムスタンプで値を更新します。

    Another Derived column's settings form is configured as described.

    Note

    2 番目の列を挿入するには、[列] リストの上にある [+ 追加] を選択し、[列の追加] を選択します。

  9. キャンバス上の SetDates 派生列の右側にある [+] を選んでから、[行の変更] を選びます。

    The plus button and alter row menu item are both highlighted.

  10. Alter row settings で、次のプロパティを構成します。

    • [出力ストリーム名]: 「AllowUpserts」を入力します
    • [Incoming stream](受信ストリーム): SetDates を選択します
    • [Alter row conditions](行の変更条件): 次のように入力します。
    条件 説明
    [Upsert if] を選択します true() Upsert if 条件で、条件を true() に設定して、アップサートを許可します。 これにより、マッピング データ フローのステップを通過するすべてのデータが確実にシンクに挿入または更新されるようになります。

    The alter row settings form is configured as described.

  11. キャンバス上の AllowUpserts 行の変更ステップの右側にある [+] を選んでから、[シンク] を選びます。

    The plus button and sink menu item are both highlighted.

  12. Sink で、次のプロパティを構成します。

    • [出力ストリーム名]: 「Sink」を入力します
    • [Incoming stream](受信ストリーム): AllowUpserts を選択します
    • [Sink type](シンクの種類): Dataset を選択します
    • [データセット]: DimCustomer を選択します
    • [オプション]: Allow schema drift をオン、Validate schema をオフにします

    The sink properties form is configured as described.

  13. [設定] タブを選択し、次のプロパティを構成します。

    • [更新方法]: Allow upsert をオン、他のすべてのオプションをオフにします
    • [キー列]: List of columns を選択し、リスト内の CustomerID を選択します
    • [Table action](テーブル アクション): None を選択します
    • [Enable staging](ステージングを有効にする): オフ

    The sink settings are configured as described.

  14. [マッピング] タブを選択し、[Auto mapping](自動マッピング) をオフにします。 次に示すように、入力列のマッピングを構成します。

    入力列 出力列
    SourceDB@CustomerID CustomerID
    SourceDB@Title Title
    SourceDB@FirstName FirstName
    SourceDB@MiddleName MiddleName
    SourceDB@LastName LastName
    SourceDB@Suffix Suffix
    SourceDB@CompanyName CompanyName
    SourceDB@SalesPerson SalesPerson
    SourceDB@EmailAddress EmailAddress
    SourceDB@Phone Phone
    InsertedDate InsertedDate
    ModifiedDate ModifiedDate
    CreateCustomerHash@HashKey HashKey

    Mapping settings are configured as described.

  15. 完成したマッピング フローは次のようになります。 [すべて公開] を選択して、変更を保存します。

    The completed data flow is displayed and Publish all is highlighted.

  16. 公開を選択します。

    The publish button is highlighted.

データ フローをテストするには

タイプ 1 の SCD データ フローが完成しました。 これのテストを選択した場合は、このデータ フローを Synapse 統合パイプラインに追加できます。 次に、パイプラインを 1 回実行して、顧客ソース データをターゲットの DimCustomer に初期読み込みすることができます。

パイプラインを追加実行するたびに、ソース テーブル内のデータとディメンション テーブル内に既に存在するデータが (HashKey を使用して) 比較され、変更のあったレコードのみが更新されます。 これをテストするには、ソース テーブル内のレコードを更新した後、パイプラインを再度実行して、ディメンション テーブル内のレコードが更新されることを確認します。

顧客の Janet Gates を例にとります。 初期読み込みで、LastName が Gates、CustomerId が 4 であることが示されます。

The script is displayed with the initial customer record.

次に、ソース テーブル内の顧客の姓を更新するステートメントの例を示します。

UPDATE [dbo].[CustomerSource]
SET LastName = 'Lopez'
WHERE [CustomerId] = 4

レコードを更新し、パイプラインを再度実行すると、DimCustomer で、この更新されたデータが表示されます。

The script is displayed with the updated customer record.

顧客レコードの LastName 値がソース レコードと一致するように正常に更新され、ModifiedDate も古い LastName 値を追跡せずに更新されました。 これは、タイプ 1 の SCD の予想される動作です。 LastName フィールドに履歴が必要な場合は、テーブルとデータ フローを、これまでに説明した他の SCD タイプのいずれかに変更します。