以增量方式将数据从数据仓库加载到湖屋

在本教程中,你将了解如何以增量方式将数据从数据仓库加载到湖屋。

概述

下面是高级解决方案关系图:

示意图显示了以增量方式加载数据的逻辑。

下面是创建此解决方案所要执行的重要步骤:

  1. 选择水印列。 在源数据表中选择一列,该列可用于将每个运行的新记录或已更新记录切片。 通常,在创建或更新行时,此选定列中的数据(例如 last_modify_time 或 ID)会不断递增。 此列中的最大值用作水印。

  2. 准备一个表以在数据仓库中存储最后一个水印值

  3. 创建采用以下工作流的管道

    此解决方案中的管道具有以下活动:

    • 创建两个 Lookup 活动。 使用第一个查找活动检索最后一个水印值。 使用第二个查找活动检索新的水印值。 这些水印值会传递到复制活动。
    • 创建复制活动,以复制源数据表中其水印列值大于旧水印值但小于新水印值的行。 然后,它会将数据从数据仓库作为新文件复制到湖屋。
    • 创建存储过程活动,以更新下一次管道运行的最后一个水印值。

先决条件

  • 数据仓库。 将数据仓库用作源数据存储。 如果没有数据仓库,请参阅创建数据仓库以了解创建数据仓库的步骤。
  • 湖屋。 使用湖屋作为目标数据存储。 如果没有湖屋,请参阅创建湖屋,以了解创建湖屋的步骤。 创建名为 IncrementalCopy 的文件夹来存储复制的数据。

准备源

下面是在配置增量复制管道之前需要在源数据仓库中准备的一些表和存储过程。

1. 在数据仓库中创建数据源表

在数据仓库中运行以下 SQL 命令,以创建名为 data_source_table 的表作为数据源表。 在本教程中,你将使用它作为示例数据来执行增量复制。

create table data_source_table
(
    PersonID int,
    Name varchar(255),
    LastModifytime DATETIME2(6)
);

INSERT INTO data_source_table
    (PersonID, Name, LastModifytime)
VALUES
    (1, 'aaaa','9/1/2017 12:56:00 AM'),
    (2, 'bbbb','9/2/2017 5:23:00 AM'),
    (3, 'cccc','9/3/2017 2:36:00 AM'),
    (4, 'dddd','9/4/2017 3:21:00 AM'),
    (5, 'eeee','9/5/2017 8:06:00 AM');

数据源表中的数据如下所示:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1        | aaaa | 2017-09-01 00:56:00.000
2        | bbbb | 2017-09-02 05:23:00.000
3        | cccc | 2017-09-03 02:36:00.000
4        | dddd | 2017-09-04 03:21:00.000
5        | eeee | 2017-09-05 08:06:00.000

在本教程中,你将使用 LastModifytime 作为水印列。

2. 在数据仓库中准备另一个表来存储最后一个水印值

  1. 在数据仓库中运行以下 SQL 命令,以创建名为 watermarktable 的表来存储最后一个水印值:

    create table watermarktable
    (
    TableName varchar(255),
    WatermarkValue DATETIME2(6),
    );
    
  2. 使用源数据表的表名设置最后一个水印的默认值。 在本教程中,表名为 data_source_table,默认值为 1/1/2010 12:00:00 AM

    INSERT INTO watermarktable
    VALUES ('data_source_table','1/1/2010 12:00:00 AM')    
    
  3. 查看表 watermarktable 中的数据。

    Select * from watermarktable
    

    输出:

    TableName  | WatermarkValue
    ----------  | --------------
    data_source_table | 2010-01-01 00:00:00.000
    

3. 在数据仓库中创建存储过程

运行以下命令,以在数据仓库中创建存储过程。 此存储过程用于帮助更新上次管道运行后的最后一个水印值。

CREATE PROCEDURE usp_write_watermark @LastModifiedtime datetime, @TableName varchar(50)
AS

BEGIN

UPDATE watermarktable
SET [WatermarkValue] = @LastModifiedtime
WHERE [TableName] = @TableName

END

配置用于增量复制的管道

步骤 1:创建管道

  1. 导航到 Power BI

  2. 选择屏幕左下角的 Power BI 图标,然后选择“数据工厂”以打开数据工厂的主页。

  3. 导航到你的 Microsoft Fabric 工作区。

  4. 选择“数据管道”,然后输入管道名称以创建新管道。

    显示新建的工作区中新建数据管道按钮的屏幕截图。

    显示创建新管道的名称的屏幕截图。

步骤 2:为最后一个水印添加查找活动

在此步骤中,你将创建查找活动以获取最后一个水印值。 获取先前设置的默认值 1/1/2010 12:00:00 AM

  1. 选择“添加管道活动”,然后从下拉列表中选择“查找”。

  2. 在“常规”选项卡下,将此活动重命名为 LookupOldWaterMarkActivity

  3. 在“设置”选项卡下,执行以下配置:

    • 数据存储类型:选择“工作区”。
    • 工作区数据存储类型:选择“数据仓库”。
    • 数据仓库:选择你的数据仓库。
    • 使用查询:选择“”。
    • :选择“dbo.watermarktable”。
    • 仅限首行:已选择。

    屏幕截图显示查找旧水印。

步骤 3:为新水印添加查找活动

在此步骤中,将会创建查找活动以获取新的水印值。 您使用查询从源数据表中获取新的水印。 获取 data_source_tableLastModifytime 列中的最大值。

  1. 在顶部栏上,选择“活动”选项卡下的“查找”选项卡,以添加第二个查找活动。

  2. 在“常规”选项卡下,将此活动重命名为 LookupNewWaterMarkActivity

  3. 在“设置”选项卡下,执行以下配置:

    • 数据存储类型:选择“工作区”。

    • 工作区数据存储类型:选择“数据仓库”。

    • 数据仓库:选择你的数据仓库。

    • 使用查询:选择“查询”。

    • 查询:输入以下查询以选择最大上次修改时间作为新水印:

      select MAX(LastModifytime) as NewWatermarkvalue from data_source_table
      
    • 仅限首行:已选择。

    屏幕截图显示查找新水印。

步骤 4:添加复制活动以复制增量数据

在此步骤中,你将添加一个复制活动,以将最后一个水印和新水印之间的增量数据从数据仓库复制到湖屋。

  1. 选择顶部栏上的“活动”,然后选择“复制数据”->“添加到画布”以获取复制活动。

  2. 在“常规”选项卡下,将此活动重命名为 IncrementalCopyActivity

  3. 通过将附加到查找活动的绿色按钮(在成功时)拖动到复制活动,将两个查找活动同时连接到复制活动。 在看到复制活动的边框颜色变为蓝色时,松开鼠标按钮。

    屏幕截图显示了连接查找和复制活动。

  4. 在“”选项卡下,执行以下配置:

    • 数据存储类型:选择“工作区”。

    • 工作区数据存储类型:选择“数据仓库”。

    • 数据仓库:选择你的数据仓库。

    • 使用查询:选择“查询”。

    • 查询:输入以下查询,以复制最后一个水印和新水印之间的增量数据。

      select * from data_source_table where LastModifytime > '@{activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}'
      

    显示复制源配置的屏幕截图。

  5. 在“目标”选项卡下,执行以下配置:

    • 数据存储类型:选择“工作区”。
    • 工作区数据存储类型:选择“湖屋”。
    • 湖屋:选择你的湖屋。
    • 根文件夹:选择“文件”。
    • 文件路径:指定要存储复制数据的文件夹。 选择“浏览”以选择文件夹。 对于文件名,打开“添加动态内容”,并在打开的窗口中输入 @CONCAT('Incremental-', pipeline().RunId, '.txt'),以便在湖屋中为复制的数据文件创建文件名。
    • 文件格式:选择数据的格式类型。

    显示复制目标配置的屏幕截图。

步骤 5:添加存储过程活动

在此步骤中,你将添加存储过程活动,以更新下一个管道运行的最后一个水印值。

  1. 选择顶部栏上的“活动”,然后选择“存储过程”以添加存储过程活动。

  2. 在“常规”选项卡下,将此活动重命名为 StoredProceduretoWriteWatermarkActivity

  3. 将复制活动的绿色(在成功时)输出连接到存储过程活动。

  4. 在“设置”选项卡下,执行以下配置:

    • 数据存储类型:选择“工作区”。

    • 数据仓库:选择你的数据仓库。

    • 存储过程名称:指定已在数据仓库中创建的存储过程:[dbo].[usp_write_watermark]

    • 展开存储过程参数。 要指定存储过程参数的值,请选择“导入”,然后为参数输入以下值:

      名称 类型
      LastModifiedtime DateTime @{activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue}
      TableName 字符串 @{activity('LookupOldWaterMarkActivity').output.firstRow.TableName}

    显示存储过程活动配置的屏幕截图。

步骤 6:运行管道并监视结果

在顶部栏上,选择“主页”选项卡下的“运行”。然后选择“保存并运行”。 管道会开始运行,你可以在“输出”选项卡下监视管道。

显示管道运行结果的屏幕截图。

如果转到你的湖屋,你会发现数据文件位于指定的文件夹下,并且可以选择该文件来预览复制的数据。

显示第一次管道运行的湖屋数据的屏幕截图。

显示第一次管道运行的湖屋数据预览的屏幕截图。

添加更多数据以查看增量复制结果

完成第一个管道运行后,我们可以尝试在数据仓库源表中添加更多数据,以查看此管道是否可以复制增量数据。

步骤 1:向源添加更多数据

通过运行以下查询将新数据插入数据仓库:

INSERT INTO data_source_table
VALUES (6, 'newdata','9/6/2017 2:23:00 AM')

INSERT INTO data_source_table
VALUES (7, 'newdata','9/7/2017 9:01:00 AM')

data_source_table 的更新数据为:

PersonID | Name | LastModifytime
-------- | ---- | --------------
1 | aaaa | 2017-09-01 00:56:00.000
2 | bbbb | 2017-09-02 05:23:00.000
3 | cccc | 2017-09-03 02:36:00.000
4 | dddd | 2017-09-04 03:21:00.000
5 | eeee | 2017-09-05 08:06:00.000
6 | newdata | 2017-09-06 02:23:00.000
7 | newdata | 2017-09-07 09:01:00.000

步骤 2:触发另一个管道运行并监视结果

返回到管道页。 在顶部栏上,再次选择“主页”选项卡下的“运行”。 管道会开始运行,你可以在“输出”下监视管道。

如果转到你的湖屋,你会发现新复制的数据文件位于指定的文件夹下,并且可以选择该文件来预览复制的数据。 你会看到增量数据显示在此文件中。

显示第二次管道运行的湖屋数据的屏幕截图。

显示第二次管道运行的湖屋数据预览的屏幕截图。

接下来,请继续了解有关从 Azure Blob 存储 复制到湖屋的详细信息。