閱讀英文

共用方式為


以累加方式將資料從資料倉儲載入 Lakehouse

在本教學課程中,您將了解如何以累加方式將資料從資料倉儲載入 Lakehouse。

概觀

以下是高階解決方案圖表:

顯示以累加方式載入資料邏輯的圖表。

以下是建立此解決方案的重要步驟:

  1. 選取水位線資料行。 選取來源資料資料表中的一個資料行,可用於切割每次執行時新增或更新的記錄。 一般來說,當建立或更新資料列時,這個選取的資料行 (例如,last_modify_time 或 ID) 中的資料會持續增加。 此資料行中的最大值就作為水位線。

  2. 準備資料表,以將最後一個水位線值儲存在您的資料倉儲中

  3. 使用下列工作流程建立管線

    此解決方案中的管道有下列活動:

    • 建立兩個查閱活動。 使用第一個查閱活動來取出最後一個水位線值。 使用第二個查閱活動來擷取新的水位線值。 這些水位線值會傳遞給複製活動。
    • 建立複製活動,以複製來源資料資料表的資料列,而這些資料列的水位線資料行值大於舊水位線值,且小於新水位線值。 然後,其會將資料從資料倉儲複製到 Lakehouse 作為新檔案。
    • 建立預存程序活動,以更新下次管線執行的最後一個管線水位線值。

必要條件

  • 資料倉儲。 您需要使用資料倉儲作為來源資料存放區。 如果您沒有此資料庫,請參閱建立資料倉儲,按照步驟建立一個。
  • Lakehouse。 您會使用 Lakehouse 作為目的地資料存放區。 如果您沒有此資料庫,請參閱建立 Lakehouse,按照步驟建立一個。 建立名為 IncrementalCopy 的資料夾來儲存複製的資料。

準備來源

以下是設定累加複製管線之前,您需要在來源資料倉儲中準備的一些資料表和預存程序。

1. 在資料倉儲中建立資料來源資料表

在資料倉儲中執行下列 SQL 命令,以建立名為 data_source_table 的資料表,作為資料來源資料表。 在本教學課程中,您會使用它作為執行增量複製的範例資料。

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

資料來源資料表中的資料如下所示:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

在本教學課程中,您會使用 LastModifytime 作為水位線資料行。

2. 在您的資料倉儲中建立另一個資料表來儲存最後一個水位線值

  1. 對資料倉儲執行下列 SQL 命令,以建立名為 watermarktable 的資料表來儲存最後一個水位線值:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. 使用來源資料資料表的資料表名稱來設定最後一個水位線的預設值。 在本教學課程中,資料表名稱是 data_source_table,而預設值為 1/1/2010 12:00:00 AM

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. 檢閱資料表 watermarktable 中的資料。

    Select * from watermarktable
    

    輸出:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. 在您的資料倉儲中建立預存程序

執行下列命令,在您的資料倉儲中建立預存程序。 此預存程序可用來協助更新上次管線執行之後的最後一個水位線值。

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

設定累加複製的管線

步驟 1:建立管線

  1. 瀏覽至 Power BI

  2. 選取畫面左下方的 Power BI 圖示,然後選取 "Data Factory" 以開啟 Data Factory 的首頁。

  3. 瀏覽至 Microsoft Fabric 工作區。

  4. 選取 [資料管線],然後輸入管線名稱以建立新的管線。

    顯示新建立工作區中的新資料管線按鈕的螢幕擷取畫面。

    顯示建立新管線的名稱的螢幕擷取畫面。

步驟 2:新增最後一個水位線的查閱活動

在此步驟中,您會建立查閱活動,以取得最後一個水位線值。 取得先前設定的預設值 1/1/2010 12:00:00 AM

  1. 選取 [新增管線活動],然後從下拉式清單中選取 [查閱]

  2. 在 [一般] 索引標籤下,將此活動重新命名為 LookupOldWaterMarkActivity

  3. 在 [設定] 索引標籤下,執行下列設定:

    • 資料存放區類型:選取 [工作區]
    • 工作區資料存放區類型:選取 [資料倉儲]
    • 資料倉儲:選取您的資料倉儲。
    • 使用查詢:選擇 [資料表]
    • 資料表:選擇 "dbo.watermarktable"
    • 僅限第一個資料列:已選取。

    顯示查閱舊水位線的螢幕擷取畫面。

步驟 3:新增新水位線的查閱活動

在此步驟中,您會建立查閱活動,以取得新水位線值。 您可以使用查詢,從源數據數據表取得新的浮水印。 在 data_source_table 中的 LastModifytime 欄位中獲得最大值。

  1. 在頂端列上,選取 [活動] 索引標籤下的 [查閱],以新增第二個查閱活動。

  2. 在 [一般] 索引標籤下,將此活動重新命名為 LookupNewWaterMarkActivity

  3. 在 [設定] 索引標籤下,執行下列設定:

    • 資料存放區類型:選取 [工作區]

    • 工作區資料存放區類型:選取 [資料倉儲]

    • 資料倉儲:選取您的資料倉儲。

    • 使用查詢:選擇 [查詢]

    • 查詢:輸入下列查詢,以挑選上次修改時間上限作為新的水位線:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • 僅限第一個資料列:已選取。

    顯示查閱新水位線的螢幕擷取畫面。

步驟 4:新增複製活動以複製累加資料

在此步驟中,您會新增複製活動,以將最後一個水位線與新水位線之間的累加資料從資料倉儲複製到 Lakehouse。

  1. 選取頂端列上的 [活動],然後選取 [複製資料] -> [新增至畫布] 以取得複製活動。

  2. 在 [一般] 索引標籤下,將此活動重新命名為 IncrementalCopyActivity

  3. 透過將 [查閱] 活動所附加的綠色按鈕 (成功時) 拖移至 [複製] 活動,即可將兩個 [查閱] 活動同時連線至 [複製] 活動。 當您看到 [複製] 活動的框線顏色變為綠色時即鬆開滑鼠按鈕。

    顯示連線查閱和複製活動的螢幕擷取畫面。

  4. 在 [來源] 索引標籤下,執行下列設定:

    • 資料存放區類型:選取 [工作區]

    • 工作區資料存放區類型:選取 [資料倉儲]

    • 資料倉儲:選取您的資料倉儲。

    • 使用查詢:選擇 [查詢]

    • 查詢:輸入下列查詢,以在最後一個水位線和新水位線之間複製累加資料。

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    顯示複製來源設定的螢幕擷取畫面。

  5. 在 [目的地] 索引標籤下,提供下列設定:

    • 資料存放區類型:選取 [工作區]
    • 工作區資料存放區類型:選取 "Lakehouse"
    • Lakehouse:選取您的 Lakehouse。
    • 根資料夾:選擇 [檔案]
    • 檔案路徑:指定您要儲存複製的資料的資料夾。 選取 [瀏覽] 以選取您的資料夾。 針對檔案名稱,開啟 [新增動態內容],然後在開啟的視窗中輸入 @CONCAT('Incremental-', pipeline().RunId, '.txt'),為 Lakehouse 中複製的資料檔案建立檔案名稱。
    • 檔案格式:選取資料的格式類型。

    顯示複製目的地設定的螢幕擷取畫面。

步驟 5:新增預存程序活動

在此步驟中,您新增預存程序活動,以更新下次管線執行的最後一個管線水位線值。

  1. 選取頂端列上的 [活動],然後選取 [預存程序] 以新增預存程序活動。

  2. 在 [一般] 索引標籤下,將此活動重新命名為 StoredProceduretoWriteWatermarkActivity

  3. 將 [複製] 活動的綠色 (成功時) 輸出連線至 [預存程序] 活動。

  4. 在 [設定] 索引標籤下,執行下列設定:

    • 資料存放區類型:選取 [工作區]

    • 資料倉儲:選取您的資料倉儲。

    • 預存程序名稱:指定您在資料倉儲中建立的預存程序:[dbo].[usp_write_watermark]

    • 展開 [預存程序參數]。 若要指定預存程序參數的值,請選取 [匯入],然後輸入參數的下列值:

      名稱 類型
      LastModifiedtime Datetime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName String @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    顯示預存程序活動設定的螢幕擷取畫面。

步驟 6:執行管線並監視結果

在頂端列上,選取 [首頁] 索引標籤下的 [執行]。然後選取 [儲存並執行]。 管線開始執行,您可以在 [輸出] 索引標籤下監視管線。

顯示管線執行結果的螢幕擷取畫面。

移至您的 Lakehouse,您會發現資料檔案位於您指定的資料夾底下,而且您可以選取檔案來預覽複製的資料。

顯示第一個管線執行的 Lakehouse 資料的螢幕擷取畫面。

顯示第一個管線執行的 Lakehouse 資料預覽的螢幕擷取畫面。

新增更多資料,以查看累加複製結果

在您完成第一次管線執行之後,讓我們嘗試在資料倉儲來源資料表中新增更多資料,以查看此管線是否可以複製累加資料。

步驟 1:將更多資料新增至來源

執行下列查詢,將新資料插入資料倉儲中:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

data_source_table 的更新資料為:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

步驟 2:觸發另一個管線執行並監視結果

返回您的管線頁面。 在頂端列上,再次選取 [首頁] 索引標籤下的 [執行]。 管線開始執行,您可以在 [輸出] 下監視管線。

移至您的 Lakehouse,您會發現新複製的資料檔案位於您指定的資料夾底下,而且您可以選取檔案來預覽複製的資料。 您會看到累加資料顯示在此檔案中。

顯示第二個管線執行的 Lakehouse 資料的螢幕擷取畫面。

顯示第二個管線執行的 Lakehouse 資料預覽的螢幕擷取畫面。

接下來,請繼續進行以深入了解從 Azure Blob 儲存體複製到 Lakehouse。