创建函数以检索变更数据

适用于: SQL Server Azure 数据工厂中的 SSIS Integration Runtime

在完成用于执行变更数据增量加载的 Integration Services 包的控制流之后,接下来的任务是创建用于检索变更数据的表值函数 (TVF)。 只需在第一次增量加载之前创建一次此函数。

注意

在创建用于执行变更数据增量加载的包的过程中,第二步是创建用于检索数据的函数。 有关创建此包的总体过程的说明,请参阅变更数据捕获 (SSIS)

变更数据捕获 (CDC) 函数的设计注意事项

为了检索变更数据,包数据流中的源组件将调用下面的变更数据捕获查询函数之中的一个:

然后,源组件获取函数返回的结果并将这些结果传递到下游转换和目标,以将变更数据应用到最终目标。

但是, Integration Services 源组件无法直接调用这些变更数据捕获函数。 Integration Services 源组件需要有关查询所返回列的元数据。 变更数据捕获函数不定义其输出表的列。 因此,这些函数不会为 Integration Services 源组件返回足够的元数据。

相反,因为表值包装函数在其 RETURNS 子句中显式定义了其输出表的列,所以应使用该函数。 列的显式定义提供了 Integration Services 源组件所需的元数据。 必须为每个要检索变更数据的表创建此函数。

有两种方法可创建调用变更数据捕获查询函数的表值包装函数:

  • 可调用 sys.sp_cdc_generate_wrapper_function 系统存储过程来创建表值函数。

  • 可以使用本主题中的准则和示例编写自己的表值函数。

调用存储过程来创建表值函数

创建所需表值函数最快速且简便的方法是调用 sys.sp_cdc_generate_wrapper_function 系统存储过程。 此存储过程生成用于创建包装函数的脚本,这些脚本是专为满足 Integration Services 源组件的需要而设计的。

重要

sys.sp_cdc_generate_wrapper_function 系统存储过程不直接创建包装器函数。 存储过程为包装函数生成 CREATE 脚本。 开发人员必须运行存储过程生成的 CREATE 脚本,然后增量加载包才能调用包装函数。

若要了解如何使用此系统存储过程,您应该了解该过程的执行内容、该过程生成的脚本以及这些脚本创建的包装函数。

了解和使用存储过程

sys.sp_cdc_generate_wrapper_function 系统存储过程生成脚本,这些脚本用于创建供 Integration Services 包使用的包装器函数。

下面是存储过程定义的前几行:

CREATE PROCEDURE sys.sp_cdc_generate_wrapper_function
(
@capture_instance sysname = null
@closed_high_end_point bit = 1,
@column_list = null,
@update_flag_list = null
)

该存储过程的所有参数都是可选的。 如果您在调用该存储过程时不提供任何参数的值,存储过程将为您具有访问权限的所有捕获实例创建包装函数。

注意

有关此存储过程的语法及其参数的详细信息,请参阅 sys.sp_cdc_generate_wrapper_function (Transact-SQL)

该存储过程会始终生成从每个捕获实例返回所有变更的包装函数。 如果 @supports_net_changes 参数在创建捕获实例时设置,则该存储过程还将生成从每个适用的捕获实例返回净变更的包装参数。

该存储过程返回带有两列的结果集:

  • 该存储过程生成的包装函数的名称。 此存储过程从捕获实例的名称派生函数名称。 (函数名称为“fn_all_changes_”后接捕获实例名称。用于净更改函数的前缀(如果已创建)为“fn_net_changes_”。)

  • 包装函数的 CREATE 语句。

了解和使用存储过程创建的脚本

通常,开发人员使用 INSERT...EXEC 语句调用 sys.sp_cdc_generate_wrapper_function 存储过程,并将该存储过程创建的脚本保存到临时表中。 然后,可以单独选择每个脚本,并运行该脚本以创建相应的包装函数。 但是,开发人员还可以使用一组 SQL 命令运行所有的 CREATE 脚本,如以下示例代码中所示:

create table #wrapper_functions  
      (function_name sysname, create_stmt nvarchar(max))  
insert into #wrapper_functions  
exec sys.sp_cdc_generate_wrapper_function  
  
declare @stmt nvarchar(max)  
declare #hfunctions cursor local fast_forward for   
      select create_stmt from #wrapper_functions  
open #hfunctions  
fetch #hfunctions into @stmt  
while (@@fetch_status <> -1)  
begin  
      exec sp_executesql @stmt  
      fetch #hfunctions into @stmt  
end  
close #hfunctions  
deallocate #hfunctions  

了解和使用存储过程创建的函数

为了系统地遍历捕获的变更数据的时间线,生成的包装函数要求一个时间间隔的 @end_time 参数作为下一个时间间隔的 @start_time 参数。 遵循此约定时,生成的包装函数可执行以下任务:

  • 将日期/时间值映射为内部使用的 LSN 值。

  • 确保没有数据丢失或重复。

为了简化对更改表的所有行的查询,生成的包装函数还支持以下约定:

  • 如果 @start_time 参数为 Null,包装函数使用捕获实例中最低的 LSN 值作为查询的下限。

  • 如果 @end_time 参数为 Null,包装函数使用捕获实例中最高的 LSN 值作为查询的上限。

  • 如果 @start_time 或 @end_time 参数的值超过最低 LSN 或最高 LSN 的时间,则执行生成的包装器函数将返回错误 313:Msg 313, Level 16, State 3, Line 1 An insufficient number of arguments were supplied for the procedure or function。 此错误应由开发人员处理。

大部分用户应该能够使用 sys.sp_cdc_generate_wrapper_function 系统存储过程创建的包装器函数而无需进行修改。 但是,若要自定义包装函数,您必须自定义 CREATE 脚本,然后再运行该脚本。

当您的包调用包装函数时,该包必须为三个参数提供值。 这三个参数类似于变更数据捕获函数使用的三个参数。 这三个参数分别是:

包装器函数返回的结果集包含以下数据:

  • 请求的所有变更数据列。

  • 名为 __CDC_OPERATION 的列,该列使用单字符或双字符字段来标识与该行关联的操作。 此字段的有效值如下:“I”表示插入,“D”表示删除,“UO”表示更新旧值,“UN”表示更新新值。

  • 更新标志,当你请求这些标志时,它们作为位列显示在操作代码后,并以在 @update_flag_list 参数中指定的顺序显示。 这些列的命名方式是在关联的列名后追加“_uflag”。

如果您的包调用查询所有变更的包装函数,该包装函数还将返回列 __CDC_STARTLSN 和 __CDC_SEQVAL。 这两列分别成为结果集的第一列和第二列。 包装函数还将基于这两列对结果集进行排序。

编写自己的表值函数

还可以使用 SQL Server Management Studio 编写自己的可调用变更数据捕获查询函数的表值包装函数,并将该表值包装函数存储在 SQL Server中。 有关如何创建 Transact-SQL 函数的详细信息,请参阅 CREATE FUNCTION (Transact-SQL)

下面的示例定义一个表值函数,该表值函数将检索 Customer 表在指定的变更间隔发生的变更。 此函数使用变更数据捕获函数将 datetime 值映射到变更表内部使用的二进制日志序列号 (LSN) 值。 此函数还可以处理以下几种特殊情况:

  • 将 null 值传递到开始时间时,函数将采用最早的可用值。

  • 将 null 值传递到结束时间时,函数将采用最晚的可用值。

  • 如果开始的 LSN 与结束的 LSN 相等,则通常指示所选间隔不存在记录,此函数会退出。

查询变更数据的表值函数示例

CREATE function CDCSample.uf_Customer (  
     @start_time datetime  
    ,@end_time datetime  
)  
returns @Customer table (  
     CustomerID int  
    ,TerritoryID int  
    ,CustomerType nchar(1)  
    ,rowguid uniqueidentifier  
    ,ModifiedDate datetime  
    ,CDC_OPERATION varchar(1)  
) as  
begin  
    declare @from_lsn binary(10), @to_lsn binary(10)  
  
    if (@start_time is null)  
        select @from_lsn = sys.fn_cdc_get_min_lsn('Customer')  
    else  
        select @from_lsn = sys.fn_cdc_increment_lsn(sys.fn_cdc_map_time_to_lsn('largest less than or equal',@start_time))  
  
    if (@end_time is null)  
        select @to_lsn = sys.fn_cdc_get_max_lsn()  
    else  
        select @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal',@end_time)  
  
    if (@from_lsn = sys.fn_cdc_increment_lsn(@to_lsn))  
        return  
  
    -- Query for change data  
    insert into @Customer  
    select   
        CustomerID,      
        TerritoryID,   
        CustomerType,   
        rowguid,   
        ModifiedDate,   
        case __$operation  
                when 1 then 'D'  
                when 2 then 'I'  
                when 4 then 'U'  
                else null  
         end as CDC_OPERATION  
    from   
        cdc.fn_cdc_get_net_changes_Customer(@from_lsn, @to_lsn, 'all')  
  
    return  
end   
go  
  

检索带有变更数据的其他元数据

尽管上述用户创建的表值函数仅使用 __$operation 列,但 cdc.fn_cdc_get_net_changes_<capture_instance> 函数针对每个变更行返回四列元数据。 如果想要在数据流中使用这些值,可以从表值包装函数中将它们作为额外的列返回。

列名称 数据类型 说明
__$start_lsn binary(10) 与更改的提交事务关联的 LSN。

在同一事务中提交的所有更改将共享同一个提交 LSN。 例如,如果对源表的更新操作修改了两个不同的行,则更改表将包含四行(两行具有旧值,两行具有新值),每一行均具有相同的 __$start_lsn 值。
__$seqval binary(10) 用于对事务中的行更改进行排序的序列值。
__$operation int 与更改关联的数据操作语言 (DML) 操作。 可以是以下值之一:

1 = 删除

2 = 插入

3 = 更新(执行更新操作前的值。)

4 = 更新(执行更新操作后的值。)
__$update_mask varbinary(128) 基于变更表的列序号的位掩码,用于标识那些发生了变更的列。 如果需要确定哪些列发生了更改,则可检查此值。
<捕获的源表列> 多种多样 函数返回的其余列是在创建捕获实例时源表中标识为已捕获列的那些列。 如果已捕获列的列表中最初未指定任何列,则将返回源表中的所有列。

有关详细信息,请参阅 cdc.fn_cdc_get_net_changes_<capture_instance> (Transact-SQL)

下一步

在创建了用于查询变更数据的表值函数之后,下一步就是开始设计包中的数据流。

下一主题:检索和了解变更数据