AUTO CDC API:使用 Lakeflow 声明性管道简化变更数据捕获

Lakeflow 声明性管道通过 AUTO CDCAUTO CDC FROM SNAPSHOT API 简化了变更数据捕获(CDC)。

注释

AUTO CDC API 以前被调用APPLY CHANGES过,语法相同。

使用的接口取决于更改数据源:

  • 使用 AUTO CDC 处理变更数据馈送 (CDF) 中的变更。
  • 使用 AUTO CDC FROM SNAPSHOT (公共预览版,仅适用于 Python)处理数据库快照中的更改。

以前,通常使用 MERGE INTO 语句处理 Azure Databricks 上的 CDC 记录。 但是,MERGE INTO可能会由于无序记录而生成不正确的结果,或者需要复杂的逻辑来重新排序记录。

Lakeflow 声明性管道 SQL 和 Python 接口支持该 AUTO CDC API。 Lakeflow 声明性管道 Python 接口支持该 AUTO CDC FROM SNAPSHOT API。

AUTO CDCAUTO CDC FROM SNAPSHOT都支持使用 SCD 类型 1 和类型 2 更新表:

  • 使用 SCD 类型 1 直接更新记录。 不会保留已更新记录的历史记录。
  • 使用 SCD 类型 2 保留有关所有更新或者对指定列集的更新的记录的历史记录。

有关语法和其他引用,请参阅 AUTO CDC for Lakeflow 声明性管道 SQL适用于 Lakeflow 声明性管道 Python 的 AUTO CDC适用于 Lakeflow 声明性管道 Python 的 AUTO CDC FROM SNAPSHOT

注释

本文介绍如何根据源数据的变化,更新 Lakeflow 声明性流水线中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送

要求

若要使用 CDC API,您必须将管道配置为使用 无服务器 Lakeflow 声明性管道或 Lakeflow 声明性管道 ProAdvanced版本

CDC 如何通过 AUTO CDC API 实现?

通过自动处理序列外记录,Lakeflow 声明性管道中的 AUTO CDC API 可确保正确处理 CDC 记录,并不需要开发复杂的逻辑来处理序列外记录。 必须在源数据中指定用于对记录进行排序的列,Lakeflow 声明性管道将其解释为源数据正确排序的单调递增表示形式。 Lakeflow 声明式管道会自动处理无序到达的数据。 对于 SCD 类型 2 更改,Lakeflow Declarative Pipelines 会将相应的排序值传播到目标表的 __START_AT__END_AT 列。 每个顺序值的每个键应该有一个非重复更新,不支持 NULL 顺序值。

若要使用 AUTO CDC 执行 CDC 处理,首先需要创建一个流式处理表,然后使用 SQL 中的 AUTO CDC ... INTO 语句或 Python 中的 create_auto_cdc_flow() 函数来指定更改源的源、键和顺序。 若要创建目标流式表,请使用 SQL 中的 CREATE OR REFRESH STREAMING TABLE 语句或 Python 中的 create_streaming_table() 函数。 请参阅SCD 类型 1 和类型 2 处理示例。

有关语法详细信息,请参阅 Lakeflow 声明性管道 SQL 参考Python 参考

如何使用AUTO CDC FROM SNAPSHOT API 实现 CDC?

重要

AUTO CDC FROM SNAPSHOT API 当前处于公共预览版

AUTO CDC FROM SNAPSHOT是声明性 API,它比较一系列有序快照以有效地确定源数据的更改,然后运行对快照中的记录进行 CDC 处理所需的处理。 AUTO CDC FROM SNAPSHOT 仅 Lakeflow 声明性管道 Python 接口支持。

AUTO CDC FROM SNAPSHOT支持从多个源类型引入快照:

  • 使用定期快照引入从现有的表或视图引入快照。 AUTO CDC FROM SNAPSHOT提供了简单的简化界面,用于支持定期从现有数据库对象引入快照。 每次管道更新时都会引入一个新快照,引入时间用作快照版本。 在连续模式下运行管道时,将在每个管道更新中引入多个快照,该周期由包含处理的流的AUTO CDC FROM SNAPSHOT设置决定。
  • 使用历史快照引入来处理包含数据库快照的文件,例如从 Oracle 或 MySQL 数据库或数据仓库生成的快照。

要使用AUTO CDC FROM SNAPSHOT从任何源类型执行 CDC 处理,请先创建流式表,然后使用 Python 中的create_auto_cdc_from_snapshot_flow()函数指定实现处理所需的快照、键和其他参数。 请参阅定期快照引入历史快照引入示例。

传递给 API 的快照必须按版本按升序排列。 如果 Lakeflow 声明式流水线检测到无序快照,则会抛出错误。

有关语法详细信息,请参阅 Lakeflow 声明式管道 Python 参考文档

使用多个列进行排序

可以按多个列进行排序(例如使用时间戳和 ID 来解决并列情况),可以使用 STRUCT 将它们组合在一起:它先按 STRUCT 的第一个字段进行排序,如果出现并列情况,再考虑第二个字段,依此类推。

SQL 中的示例:

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python 中的示例:

sequence_by = struct("timestamp_col", "id_col")

局限性

用于排序的列必须是可排序数据类型。

示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2

以下部分提供了 Lakeflow 声明性管道 SCD 类型 1 和类型 2 查询的示例,这些查询基于变更数据提要中的源事件来更新目标表:

  1. 新建用户记录。
  2. 删除用户记录。
  3. 更新用户记录。 在 SCD 类型 1 示例中,最后的 UPDATE 操作延迟到达并从目标表中删除,展示了无序事件的处理。

以下示例假定熟悉如何配置和更新 Lakeflow 声明性管道。 请参阅 教程:通过 Lakeflow 声明性管道使用变更数据捕获生成 ETL 管道

若要运行这些示例,必须首先创建一个示例数据集。 请参阅生成测试数据

下面是这些示例的输入记录:

用户 ID 姓名 城市 操作 序列号
124 劳尔 瓦哈卡 INSERT 1
123 伊莎贝尔 蒙特雷 INSERT 1
125 梅赛德斯 提 华纳 INSERT 2
126 莉莉 坎昆 INSERT 2
123 Null Null 删除 6
125 梅赛德斯 瓜达拉哈拉 UPDATE 6
125 梅赛德斯 Mexicali UPDATE 5
123 伊莎贝尔 奇瓦瓦 UPDATE 5

如果你取消注释示例数据中的最后一行,则会插入以下记录用于指定记录的截断位置:

用户 ID 姓名 城市 操作 序列号
Null Null Null 截断 3

注释

以下所有示例都包含用于指定DELETETRUNCATE操作的选项,但其中的每个选项都是可选的。

处理 SCD 类型 1 更新

以下示例演示了如何处理 SCD 类型 1 更新:

Python语言

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flowname AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

运行 SCD 类型 1 示例后,目标表包含以下记录:

用户 ID 姓名 城市
124 劳尔 瓦哈卡
125 梅赛德斯 瓜达拉哈拉
126 莉莉 坎昆

使用附加的 TRUNCATE 记录运行 SCD 类型 1 示例后,由于在 124 处执行 126 操作,记录 TRUNCATEsequenceNum=3 被截断,目标表包含以下记录:

用户 ID 姓名 城市
125 梅赛德斯 瓜达拉哈拉

处理 SCD 类型 2 更新

以下示例演示了如何处理 SCD 类型 2 更新:

Python语言

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

运行 SCD 类型 2 示例后,目标表包含以下记录:

用户 ID 姓名 城市 __START_AT __END_AT
123 伊莎贝尔 蒙特雷 1 5
123 伊莎贝尔 奇瓦瓦 5 6
124 劳尔 瓦哈卡 1 Null
125 梅赛德斯 提 华纳 2 5
125 梅赛德斯 Mexicali 5 6
125 梅赛德斯 瓜达拉哈拉 6 Null
126 莉莉 坎昆 2 Null

SCD 类型 2 查询还可以指定一个输出列的子集,以便跟踪目标表中的历史记录。 对其他列的更改都就地更新,而不是生成新的历史记录。 以下示例演示了如何从跟踪中排除 city 列:

以下示例演示如何通过 SCD 类型 2 使用跟踪历史记录:

Python语言

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_flow(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW target_flow
AS AUTO CDC INTO
  target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在没有额外 TRUNCATE 记录的情况下运行此示例后,目标表包含以下记录:

用户 ID 姓名 城市 __START_AT __END_AT
123 伊莎贝尔 奇瓦瓦 1 6
124 劳尔 瓦哈卡 1 Null
125 梅赛德斯 瓜达拉哈拉 2 Null
126 莉莉 坎昆 2 Null

生成测试数据

以下代码用于生成可在本教程的示例查询中使用的示例数据集。 假设你拥有新建架构和新表的适当凭据,则可以使用笔记本或 Databricks SQL 运行这些语句。 以下代码 不应 作为 Lakeflow 声明性管道的一部分运行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

示例:定期快照处理

以下示例演示了 SCD 类型 2 处理,它将引入存储在 mycatalog.myschema.mytable 处的表的快照。 处理结果写入名为target的表。

mycatalog.myschema.mytable 记录,时间戳为 2024-01-01 00:00:00

密钥 价值
1 a1
2 a2

mycatalog.myschema.mytable 记录,时间戳为 2024-01-01 12:00:00

密钥 价值
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

处理快照后,目标表包含以下记录:

密钥 价值 __START_AT __END_AT
1 a1 2024年1月1日 00:00:00 2024-01-01 12:00:00
2 a2 2024年1月1日 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 Null
3 a3 2024-01-01 12:00:00 Null

示例:历史快照处理

以下示例演示了 SCD 类型 2 处理,该处理基于存储在云存储系统中的两个快照中的源事件更新目标表:

快照在 timestamp,存储在 /<PATH>/filename1.csv

密钥 TrackingColumn 不跟踪列
1 a1 b1
2 a2 b2
4 a4 b4

快照在 timestamp + 5,存储在 /<PATH>/filename2.csv

密钥 TrackingColumn 不跟踪列
2 a2_new b2
3 a3 b3
4 a4 b4_new

以下代码示例演示了如何使用以下快照处理 SCD 类型 2 更新:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

处理快照后,目标表包含以下记录:

密钥 TrackingColumn 不跟踪列 __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 Null
3 a3 b3 2 Null
4 a4 b4_new 1 Null

在目标流式表中添加、更改或删除数据

如果管道将表发布到 Unity Catalog,则可以使用数据操作语言 (DML) 声明(包括插入、更新、删除与合并声明)来修改由 AUTO CDC ... INTO 声明创建的目标流式表。

注释

  • 不支持用于修改流式处理表的表架构的 DML 语句。 确保 DML 语句不会尝试修改表架构。
  • 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
  • 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置 skipChangeCommits 后,将会忽略删除或修改源表上记录的事务。 如果你的处理不需要某个流式处理表,则可使用具体化视图(没有“仅追加”限制)作为目标表。

由于 Lakeflow 声明性管道使用指定的SEQUENCE BY列,并将适当的排序值传播到目标表的__START_AT__END_AT列(用于 SCD 类型 2),因此必须确保 DML 语句对这些列使用有效值,以维护记录的正确顺序。 请参阅 如何使用 AUTO CDC API 实现 CDC?

有关如何使用 DML 声明处理流式表的详细信息,请参阅在流式表中添加、更改或删除数据

以下示例插入一个起始序列为 5 的活动记录:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

从 AUTO CDC 目标对象表读取更改数据流

在 Databricks Runtime 15.2 及更高版本中,可以从流式表中读取更改数据馈送,该表是 AUTO CDCAUTO CDC FROM SNAPSHOT 查询的目标,读取方式与从其他 Delta 表中读取更改数据馈送的方式相同。 需要满足以下条件才能从目标流式处理表读取变更数据馈送:

  • 目标流式处理表必须发布到 Unity Catalog。 请参阅 将 Unity 目录与 Lakeflow 声明性管道配合使用
  • 若要从目标流式表中读取更改数据馈送,必须使用 Databricks Runtime 15.2 或更高版本。 若要读取其他管道中的更改数据馈送,必须将管道配置为使用 Databricks Runtime 15.2 或更高版本。

从在 Lakeflow Declarative Pipelines 中创建的目标流式表读取变更数据馈送的方式与从其他 Delta 表读取变更数据馈送的方式相同。 若要详细了解如何使用 Delta 更改数据馈送功能,包括 Python 和 SQL 中的示例,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送

注释

更改数据馈送记录包括标识更改事件类型的元数据。 当表中的记录更新时,关联更改记录的元数据通常包括设置为 _change_typeupdate_preimage 事件的 update_postimage 值。

但是,如果对包含更改主键值的目标流式表进行更新,则 _change_type 值会有所不同。 当更改包括对主键的更新时,_change_type 元数据字段将设置为 insertdelete 事件。 如果对具有 UPDATEMERGE 语句的某个密钥字段进行手动更新,或者对于 SCD 类型 2 表,当 __start_at 字段更改以反映较早的起始序列值时,可能会更改主键。

AUTO CDC 查询确定主键值,这些值因 SCD 类型 1 和 SCD 类型 2 的处理而有所不同:

  • 对于 SCD 类型 1 处理和 Lakeflow 声明性管道的 Python 接口,主键是keys函数中create_auto_cdc_flow()参数的值。 对于 Lakeflow 声明性管道 SQL 接口,主键是由KEYS语句中的AUTO CDC ... INTO子句定义的列。
  • 对于 SCD 类型 2,主键是 keys 参数或 KEYS 子句以及 coalesce(__START_AT, __END_AT) 操作的返回值,其中 __START_AT__END_AT 分别是目标流式表中的相应列。

获取有关由 Lakeflow 声明性管道 CDC 查询处理的记录的数据

注释

以下指标仅通过 AUTO CDC 查询(而不是 AUTO CDC FROM SNAPSHOT 查询)捕获。

以下指标由 AUTO CDC 查询捕获:

  • num_upserted_rows:在更新期间更新插入数据集的输出行数。
  • num_deleted_rows:在更新期间从数据集中删除的现有输出行数。

对于 num_output_rows 查询,不会捕获作为非 CDC 流输出的 AUTO CDC 指标。

用于 Lakeflow 声明性管道 CDC 处理的数据对象有哪些?

注释

  • 这些数据结构仅适用于 AUTO CDC 处理,而不适用于 AUTO CDC FROM SNAPSHOT 处理。
  • 仅当目标表被发布到 Hive 元存储时,这些数据结构才适用。 如果管道发布到 Unity Catalog,则用户无法访问内部支持表。

在 Hive 元存储中声明目标表时,会创建两个数据结构:

  • 一个使用分配给目标表的名称的视图。
  • Lakeflow 声明式管道用于管理 CDC 处理的内部支持表。 此表的命名方式是在目标表名称的前面加上 __apply_changes_storage_

例如,如果你声明一个名为 dlt_cdc_target 的目标表,则会在元存储中看到一个名为 dlt_cdc_target 的视图和一个名为 __apply_changes_storage_dlt_cdc_target 的表。 创建视图可让 Lakeflow 声明性管道筛选掉处理无序数据所需的额外信息(例如,墓碑和版本)。 若要查看处理后的数据,请查询目标视图。 由于表 __apply_changes_storage_ 的架构可能会更改以支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动在表中添加数据,则认为记录出现在其他更改之前,因为缺少版本列。

其他资源