分享方式:


使用控制資料表從資料庫進行差異複製

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

本文所說明的範本用來使用可儲存高浮水印值的外部控制資料表,以累加方式從資料庫資料表載入新的或已更新的資料列。

此範本需要來源資料庫的結構描述包含時間戳記資料行或遞增索引鍵,以識別新的或已更新的資料列。

注意

如果您來源資料庫的時間戳記資料行可識別新的或已更新的資料列,但不想要建立用於差異複製的外部控制資料表,則您可以改為使用 Azure Data Factory 複製資料工具來取得管線。 該工具使用觸發程序排程時間作為變數,以讀取來自來源資料庫的新資料列。

關於此解決方案範本

此範本會先擷取舊的浮水印值,然後將其與目前的浮水印值進行比較。 之後,其只會根據兩個浮水印值之間的比較,從來源資料庫複製變更。 最後,其會將新的高浮水印值儲存至外部控制資料表,以用於下一次差異資料載入。

範本包含四個活動:

  • 「查閱」會擷取外部控制資料表中所儲存的舊高浮水印值。
  • 另一個「查閱」活動會從來源資料庫擷取目前的高浮水印值。
  • 「複製」只會將來源資料庫的變更複製至目的地存放區。 可識別來源資料庫中變更的查詢類似 'SELECT * FROM Data_Source_Table WHERE TIMESTAMP_Column > “last high-watermark” and TIMESTAMP_Column <= “current high-watermark”'。
  • SqlServerStoredProcedure 會將目前的高浮水印值寫入至外部控制資料表,以用於下一次差異複製。

範本會定義下列參數:

  • Data_Source_Table_Name 是來源資料庫中您想要從中載入資料的資料表。
  • Data_Source_WaterMarkColumn 是來源資料表中用來識別新或已更新資料列的資料行名稱。 此資料行的類型通常是 datetimeINT 或類似的類型。
  • Data_Destination_Container 是目的地存放區中資料複製目標的位置根路徑。
  • Data_Destination_Directory 是目的地存放區中資料複製目標的位置根路徑下的目錄路徑。
  • Data_Destination_Table_Name 是目的地存放區中資料複製目標的位置 (選取 [Azure Synapse Analytics] 作為 [資料目的地] 時適用)。
  • Data_Destination_Folder_Path 是目的地存放區中資料複製目標的位置 (選取 [檔案系統] 或 [Azure Data Lake Storage Gen1] 作為 [資料目的地] 時適用)。
  • Control_Table_Table_Name 是可儲存高浮水印值的外部控制資料表。
  • Control_Table_Column_Name 是外部控制資料表中可儲存高浮水印值的資料行。

如何使用此解決方案範本

  1. 探索您想要載入的來源資料表,並且定義可用來識別新或已更新資料列的高浮水印資料行。 此資料行的類型可能是 datetimeINT 或類似的類型。 新增資料列時,此資料行的值會增加。 從下列範例來源資料表 (data_source_table) 中,我們可以使用 LastModifytime 資料行作為高浮水印資料行。

    PersonID	Name            LastModifytime
    1           aaaa            2017-09-01 00:56:00.000
    2           bbbb            2017-09-02 05:23:00.000
    3           cccc            2017-09-03 02:36:00.000
    4           dddd            2017-09-04 03:21:00.000
    5           eeee            2017-09-05 08:06:00.000
    6           fffffff         2017-09-06 02:23:00.000
    7           gggg            2017-09-07 09:01:00.000
    8           hhhh            2017-09-08 09:01:00.000
    9           iiiiiiiii       2017-09-09 09:01:00.000
    
  2. 在 SQL Server 或 Azure SQL Database 中建立控制資料表,以儲存高浮水印值來進行差異資料載入。 在下列範例中,控制資料表的名稱是 watermarktable。 在此資料表中,WatermarkValue 是儲存高浮水印值的資料行,且其類型為 datetime

    create table watermarktable
    (
    WatermarkValue datetime,
    );
    INSERT INTO watermarktable
    VALUES ('1/1/2010 12:00:00 AM')
    
  3. 在您用來建立控制資料表的相同 SQL Server 或 Azure SQL Database 執行個體中,建立預存程序。 預存程序用來將新的高浮水印值寫入至外部控制資料表,以用於下一次差異資料載入。

    CREATE PROCEDURE update_watermark @LastModifiedtime datetime
    AS
    
    BEGIN
    
        UPDATE watermarktable
        SET [WatermarkValue] = @LastModifiedtime 
    
    END
    
  4. 移至 [從資料庫進行差異複製] 範本。 建立來源資料庫的「新」連線,而您想要複製此來源資料庫中的資料。

    Screenshot showing the creation of a new connection to the source table.

  5. 建立目的地資料存放區的「新」連線,而您想要將資料複製至此目的地資料存放區。

    Screenshot showing the creation of a new connection to the destination table.

  6. 建立您已在步驟 2 和 3 中所建立外部控制資料表和預存程序的「新」連線。

    Screenshot showing the creation of a new connection to the control table data store.

  7. 選取使用此範本

  8. 您會看到可用的管線,如下列範例所示:

    Screenshot showing the pipeline.

  9. 選取 [預存程序]。 針對 [預存程序名稱],選擇 [dbo].[update_watermark]。 選取 [匯入參數],然後選取 [新增動態內容]

    Screenshot showing where to set the stored procedure activity.

  10. 撰寫內容 @{activity('LookupCurrentWaterMark').output.firstRow.NewWatermarkValue},然後選取 [完成]

    Screenshot showing where to write the content for the parameters of the stored procedure.

  11. 選取 [偵錯],輸入 [參數],然後選取 [完成]

    Screenshot showing the Debug button.

  12. 顯示與下列範例類似的結果:

    Sreenshot showing the result of the pipeline run.

  13. 您可以在您的來源資料表中建立新的資料列。 以下是建立新資料列的範例 SQL 語言:

    INSERT INTO data_source_table
    VALUES (10, 'newdata','9/10/2017 2:23:00 AM')
    
    INSERT INTO data_source_table
    VALUES (11, 'newdata','9/11/2017 9:01:00 AM')
    
  14. 若要再次執行管線,請選取 [偵錯],並輸入 [參數],然後選取 [完成]

    您將會看到只將新的資料列複製至目的地。

  15. (選用) 如果您選取 Azure Synapse Analytics 作為資料目的地,則也必須提供與 Azure Blob 儲存體的連線來進行暫存,而這是 Azure Synapse Analytics Polybase 的必要項目。 此範本將會為您產生容器路徑。 在管線執行之後,請檢查是否已在 Blob 儲存體中建立容器。

    Screenshot showing where to configure Polybase.