Lakeflow 声明性管道通过 AUTO CDC
和 AUTO CDC FROM SNAPSHOT
API 简化了变更数据捕获(CDC)。
注释
AUTO CDC
API 以前被调用APPLY CHANGES
过,语法相同。
使用的接口取决于更改数据源:
- 使用
AUTO CDC
处理变更数据馈送 (CDF) 中的变更。 - 使用
AUTO CDC FROM SNAPSHOT
(公共预览版,仅适用于 Python)处理数据库快照中的更改。
以前,通常使用 MERGE INTO
语句处理 Azure Databricks 上的 CDC 记录。 但是,MERGE INTO
可能会由于无序记录而生成不正确的结果,或者需要复杂的逻辑来重新排序记录。
Lakeflow 声明性管道 SQL 和 Python 接口支持该 AUTO CDC
API。 Lakeflow 声明性管道 Python 接口支持该 AUTO CDC FROM SNAPSHOT
API。
AUTO CDC
和AUTO CDC FROM SNAPSHOT
都支持使用 SCD 类型 1 和类型 2 更新表:
- 使用 SCD 类型 1 直接更新记录。 不会保留已更新记录的历史记录。
- 使用 SCD 类型 2 保留有关所有更新或者对指定列集的更新的记录的历史记录。
有关语法和其他引用,请参阅 AUTO CDC for Lakeflow 声明性管道 SQL、 适用于 Lakeflow 声明性管道 Python 的 AUTO CDC 和 适用于 Lakeflow 声明性管道 Python 的 AUTO CDC FROM SNAPSHOT。
注释
本文介绍如何根据源数据的变化,更新 Lakeflow 声明性流水线中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送。
要求
若要使用 CDC API,您必须将管道配置为使用 无服务器 Lakeflow 声明性管道或 Lakeflow 声明性管道 Pro
或 Advanced
版本。
CDC 如何通过 AUTO CDC API 实现?
通过自动处理序列外记录,Lakeflow 声明性管道中的 AUTO CDC API 可确保正确处理 CDC 记录,并不需要开发复杂的逻辑来处理序列外记录。 必须在源数据中指定用于对记录进行排序的列,Lakeflow 声明性管道将其解释为源数据正确排序的单调递增表示形式。 Lakeflow 声明式管道会自动处理无序到达的数据。 对于 SCD 类型 2 更改,Lakeflow Declarative Pipelines 会将相应的排序值传播到目标表的 __START_AT
和 __END_AT
列。 每个顺序值的每个键应该有一个非重复更新,不支持 NULL 顺序值。
若要使用 AUTO CDC
执行 CDC 处理,首先需要创建一个流式处理表,然后使用 SQL 中的 AUTO CDC ... INTO
语句或 Python 中的 create_auto_cdc_flow()
函数来指定更改源的源、键和顺序。 若要创建目标流式表,请使用 SQL 中的 CREATE OR REFRESH STREAMING TABLE
语句或 Python 中的 create_streaming_table()
函数。 请参阅SCD 类型 1 和类型 2 处理示例。
有关语法详细信息,请参阅 Lakeflow 声明性管道 SQL 参考 或 Python 参考。
如何使用AUTO CDC FROM SNAPSHOT
API 实现 CDC?
重要
AUTO CDC FROM SNAPSHOT
API 当前处于公共预览版。
AUTO CDC FROM SNAPSHOT
是声明性 API,它比较一系列有序快照以有效地确定源数据的更改,然后运行对快照中的记录进行 CDC 处理所需的处理。
AUTO CDC FROM SNAPSHOT
仅 Lakeflow 声明性管道 Python 接口支持。
AUTO CDC FROM SNAPSHOT
支持从多个源类型引入快照:
- 使用定期快照引入从现有的表或视图引入快照。
AUTO CDC FROM SNAPSHOT
提供了简单的简化界面,用于支持定期从现有数据库对象引入快照。 每次管道更新时都会引入一个新快照,引入时间用作快照版本。 在连续模式下运行管道时,将在每个管道更新中引入多个快照,该周期由包含处理的流的AUTO CDC FROM SNAPSHOT
设置决定。 - 使用历史快照引入来处理包含数据库快照的文件,例如从 Oracle 或 MySQL 数据库或数据仓库生成的快照。
要使用AUTO CDC FROM SNAPSHOT
从任何源类型执行 CDC 处理,请先创建流式表,然后使用 Python 中的create_auto_cdc_from_snapshot_flow()
函数指定实现处理所需的快照、键和其他参数。 请参阅定期快照引入和历史快照引入示例。
传递给 API 的快照必须按版本按升序排列。 如果 Lakeflow 声明式流水线检测到无序快照,则会抛出错误。
有关语法详细信息,请参阅 Lakeflow 声明式管道 Python 参考文档。
使用多个列进行排序
可以按多个列进行排序(例如使用时间戳和 ID 来解决并列情况),可以使用 STRUCT 将它们组合在一起:它先按 STRUCT 的第一个字段进行排序,如果出现并列情况,再考虑第二个字段,依此类推。
SQL 中的示例:
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python 中的示例:
sequence_by = struct("timestamp_col", "id_col")
局限性
用于排序的列必须是可排序数据类型。
示例:使用 CDF 源数据处理 SCD 类型 1 和 SCD 类型 2
以下部分提供了 Lakeflow 声明性管道 SCD 类型 1 和类型 2 查询的示例,这些查询基于变更数据提要中的源事件来更新目标表:
- 新建用户记录。
- 删除用户记录。
- 更新用户记录。 在 SCD 类型 1 示例中,最后的
UPDATE
操作延迟到达并从目标表中删除,展示了无序事件的处理。
以下示例假定熟悉如何配置和更新 Lakeflow 声明性管道。 请参阅 教程:通过 Lakeflow 声明性管道使用变更数据捕获生成 ETL 管道。
若要运行这些示例,必须首先创建一个示例数据集。 请参阅生成测试数据。
下面是这些示例的输入记录:
用户 ID | 姓名 | 城市 | 操作 | 序列号 |
---|---|---|---|---|
124 | 劳尔 | 瓦哈卡 | INSERT | 1 |
123 | 伊莎贝尔 | 蒙特雷 | INSERT | 1 |
125 | 梅赛德斯 | 提 华纳 | INSERT | 2 |
126 | 莉莉 | 坎昆 | INSERT | 2 |
123 | Null | Null | 删除 | 6 |
125 | 梅赛德斯 | 瓜达拉哈拉 | UPDATE | 6 |
125 | 梅赛德斯 | Mexicali | UPDATE | 5 |
123 | 伊莎贝尔 | 奇瓦瓦 | UPDATE | 5 |
如果你取消注释示例数据中的最后一行,则会插入以下记录用于指定记录的截断位置:
用户 ID | 姓名 | 城市 | 操作 | 序列号 |
---|---|---|---|---|
Null | Null | Null | 截断 | 3 |
注释
以下所有示例都包含用于指定DELETE
和TRUNCATE
操作的选项,但其中的每个选项都是可选的。
处理 SCD 类型 1 更新
以下示例演示了如何处理 SCD 类型 1 更新:
Python语言
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flowname AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
运行 SCD 类型 1 示例后,目标表包含以下记录:
用户 ID | 姓名 | 城市 |
---|---|---|
124 | 劳尔 | 瓦哈卡 |
125 | 梅赛德斯 | 瓜达拉哈拉 |
126 | 莉莉 | 坎昆 |
使用附加的 TRUNCATE
记录运行 SCD 类型 1 示例后,由于在 124
处执行 126
操作,记录 TRUNCATE
和 sequenceNum=3
被截断,目标表包含以下记录:
用户 ID | 姓名 | 城市 |
---|---|---|
125 | 梅赛德斯 | 瓜达拉哈拉 |
处理 SCD 类型 2 更新
以下示例演示了如何处理 SCD 类型 2 更新:
Python语言
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
运行 SCD 类型 2 示例后,目标表包含以下记录:
用户 ID | 姓名 | 城市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | 伊莎贝尔 | 蒙特雷 | 1 | 5 |
123 | 伊莎贝尔 | 奇瓦瓦 | 5 | 6 |
124 | 劳尔 | 瓦哈卡 | 1 | Null |
125 | 梅赛德斯 | 提 华纳 | 2 | 5 |
125 | 梅赛德斯 | Mexicali | 5 | 6 |
125 | 梅赛德斯 | 瓜达拉哈拉 | 6 | Null |
126 | 莉莉 | 坎昆 | 2 | Null |
SCD 类型 2 查询还可以指定一个输出列的子集,以便跟踪目标表中的历史记录。 对其他列的更改都就地更新,而不是生成新的历史记录。 以下示例演示了如何从跟踪中排除 city
列:
以下示例演示如何通过 SCD 类型 2 使用跟踪历史记录:
Python语言
import dlt
from pyspark.sql.functions import col, expr
@dlt.view
def users():
return spark.readStream.table("cdc_data.users")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_flow(
target = "target",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW target_flow
AS AUTO CDC INTO
target
FROM
stream(cdc_data.users)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
在没有额外 TRUNCATE
记录的情况下运行此示例后,目标表包含以下记录:
用户 ID | 姓名 | 城市 | __START_AT | __END_AT |
---|---|---|---|---|
123 | 伊莎贝尔 | 奇瓦瓦 | 1 | 6 |
124 | 劳尔 | 瓦哈卡 | 1 | Null |
125 | 梅赛德斯 | 瓜达拉哈拉 | 2 | Null |
126 | 莉莉 | 坎昆 | 2 | Null |
生成测试数据
以下代码用于生成可在本教程的示例查询中使用的示例数据集。 假设你拥有新建架构和新表的适当凭据,则可以使用笔记本或 Databricks SQL 运行这些语句。 以下代码 不应 作为 Lakeflow 声明性管道的一部分运行:
CREATE SCHEMA IF NOT EXISTS cdc_data;
CREATE TABLE
cdc_data.users
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The above batch at sequenceNum 6 will be the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
示例:定期快照处理
以下示例演示了 SCD 类型 2 处理,它将引入存储在 mycatalog.myschema.mytable
处的表的快照。 处理结果写入名为target
的表。
mycatalog.myschema.mytable
记录,时间戳为 2024-01-01 00:00:00
密钥 | 价值 |
---|---|
1 | a1 |
2 | a2 |
mycatalog.myschema.mytable
记录,时间戳为 2024-01-01 12:00:00
密钥 | 价值 |
---|---|
2 | b2 |
3 | a3 |
import dlt
@dlt.view(name="source")
def source():
return spark.read.table("mycatalog.myschema.mytable")
dlt.create_streaming_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target="target",
source="source",
keys=["key"],
stored_as_scd_type=2
)
处理快照后,目标表包含以下记录:
密钥 | 价值 | __START_AT | __END_AT |
---|---|---|---|
1 | a1 | 2024年1月1日 00:00:00 | 2024-01-01 12:00:00 |
2 | a2 | 2024年1月1日 00:00:00 | 2024-01-01 12:00:00 |
2 | b2 | 2024-01-01 12:00:00 | Null |
3 | a3 | 2024-01-01 12:00:00 | Null |
示例:历史快照处理
以下示例演示了 SCD 类型 2 处理,该处理基于存储在云存储系统中的两个快照中的源事件更新目标表:
快照在 timestamp
,存储在 /<PATH>/filename1.csv
密钥 | TrackingColumn | 不跟踪列 |
---|---|---|
1 | a1 | b1 |
2 | a2 | b2 |
4 | a4 | b4 |
快照在 timestamp + 5
,存储在 /<PATH>/filename2.csv
密钥 | TrackingColumn | 不跟踪列 |
---|---|---|
2 | a2_new | b2 |
3 | a3 | b3 |
4 | a4 | b4_new |
以下代码示例演示了如何使用以下快照处理 SCD 类型 2 更新:
import dlt
def exist(file_name):
# Storage system-dependent function that returns true if file_name exists, false otherwise
# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
latest_snapshot_version = latest_snapshot_version or 0
next_version = latest_snapshot_version + 1
file_name = "dir_path/filename_" + next_version + ".csv"
if (exist(file_name)):
return (spark.read.load(file_name), next_version)
else:
# No snapshot available
return None
dlt.create_streaming_live_table("target")
dlt.create_auto_cdc_from_snapshot_flow(
target = "target",
source = next_snapshot_and_version,
keys = ["Key"],
stored_as_scd_type = 2,
track_history_column_list = ["TrackingCol"]
)
处理快照后,目标表包含以下记录:
密钥 | TrackingColumn | 不跟踪列 | __START_AT | __END_AT |
---|---|---|---|---|
1 | a1 | b1 | 1 | 2 |
2 | a2 | b2 | 1 | 2 |
2 | a2_new | b2 | 2 | Null |
3 | a3 | b3 | 2 | Null |
4 | a4 | b4_new | 1 | Null |
在目标流式表中添加、更改或删除数据
如果管道将表发布到 Unity Catalog,则可以使用数据操作语言 (DML) 声明(包括插入、更新、删除与合并声明)来修改由 AUTO CDC ... INTO
声明创建的目标流式表。
注释
- 不支持用于修改流式处理表的表架构的 DML 语句。 确保 DML 语句不会尝试修改表架构。
- 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式表的 DML 语句。
- 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置
skipChangeCommits
后,将会忽略删除或修改源表上记录的事务。 如果你的处理不需要某个流式处理表,则可使用具体化视图(没有“仅追加”限制)作为目标表。
由于 Lakeflow 声明性管道使用指定的SEQUENCE BY
列,并将适当的排序值传播到目标表的__START_AT
和__END_AT
列(用于 SCD 类型 2),因此必须确保 DML 语句对这些列使用有效值,以维护记录的正确顺序。 请参阅 如何使用 AUTO CDC API 实现 CDC?。
有关如何使用 DML 声明处理流式表的详细信息,请参阅在流式表中添加、更改或删除数据。
以下示例插入一个起始序列为 5 的活动记录:
INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);
从 AUTO CDC 目标对象表读取更改数据流
在 Databricks Runtime 15.2 及更高版本中,可以从流式表中读取更改数据馈送,该表是 AUTO CDC
或 AUTO CDC FROM SNAPSHOT
查询的目标,读取方式与从其他 Delta 表中读取更改数据馈送的方式相同。 需要满足以下条件才能从目标流式处理表读取变更数据馈送:
- 目标流式处理表必须发布到 Unity Catalog。 请参阅 将 Unity 目录与 Lakeflow 声明性管道配合使用。
- 若要从目标流式表中读取更改数据馈送,必须使用 Databricks Runtime 15.2 或更高版本。 若要读取其他管道中的更改数据馈送,必须将管道配置为使用 Databricks Runtime 15.2 或更高版本。
从在 Lakeflow Declarative Pipelines 中创建的目标流式表读取变更数据馈送的方式与从其他 Delta 表读取变更数据馈送的方式相同。 若要详细了解如何使用 Delta 更改数据馈送功能,包括 Python 和 SQL 中的示例,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送。
注释
更改数据馈送记录包括标识更改事件类型的元数据。 当表中的记录更新时,关联更改记录的元数据通常包括设置为 _change_type
和 update_preimage
事件的 update_postimage
值。
但是,如果对包含更改主键值的目标流式表进行更新,则 _change_type
值会有所不同。 当更改包括对主键的更新时,_change_type
元数据字段将设置为 insert
和 delete
事件。 如果对具有 UPDATE
或 MERGE
语句的某个密钥字段进行手动更新,或者对于 SCD 类型 2 表,当 __start_at
字段更改以反映较早的起始序列值时,可能会更改主键。
AUTO CDC
查询确定主键值,这些值因 SCD 类型 1 和 SCD 类型 2 的处理而有所不同:
- 对于 SCD 类型 1 处理和 Lakeflow 声明性管道的 Python 接口,主键是
keys
函数中create_auto_cdc_flow()
参数的值。 对于 Lakeflow 声明性管道 SQL 接口,主键是由KEYS
语句中的AUTO CDC ... INTO
子句定义的列。 - 对于 SCD 类型 2,主键是
keys
参数或KEYS
子句以及coalesce(__START_AT, __END_AT)
操作的返回值,其中__START_AT
和__END_AT
分别是目标流式表中的相应列。
获取有关由 Lakeflow 声明性管道 CDC 查询处理的记录的数据
注释
以下指标仅通过 AUTO CDC
查询(而不是 AUTO CDC FROM SNAPSHOT
查询)捕获。
以下指标由 AUTO CDC
查询捕获:
-
num_upserted_rows
:在更新期间更新插入数据集的输出行数。 -
num_deleted_rows
:在更新期间从数据集中删除的现有输出行数。
对于 num_output_rows
查询,不会捕获作为非 CDC 流输出的 AUTO CDC
指标。
用于 Lakeflow 声明性管道 CDC 处理的数据对象有哪些?
注释
- 这些数据结构仅适用于
AUTO CDC
处理,而不适用于AUTO CDC FROM SNAPSHOT
处理。 - 仅当目标表被发布到 Hive 元存储时,这些数据结构才适用。 如果管道发布到 Unity Catalog,则用户无法访问内部支持表。
在 Hive 元存储中声明目标表时,会创建两个数据结构:
- 一个使用分配给目标表的名称的视图。
- Lakeflow 声明式管道用于管理 CDC 处理的内部支持表。 此表的命名方式是在目标表名称的前面加上
__apply_changes_storage_
。
例如,如果你声明一个名为 dlt_cdc_target
的目标表,则会在元存储中看到一个名为 dlt_cdc_target
的视图和一个名为 __apply_changes_storage_dlt_cdc_target
的表。 创建视图可让 Lakeflow 声明性管道筛选掉处理无序数据所需的额外信息(例如,墓碑和版本)。 若要查看处理后的数据,请查询目标视图。 由于表 __apply_changes_storage_
的架构可能会更改以支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动在表中添加数据,则认为记录出现在其他更改之前,因为缺少版本列。