适用于:
Databricks SQL
重要
此功能在 Beta 版中。 需要 Databricks Runtime 17.3 及更高版本。
使用子 FLOW AUTO CDC 句 CREATE STREAMING TABLE 来处理源中的变更数据捕获(CDC)记录到流式处理表中。
以前,该 MERGE INTO 语句通常用于处理 Azure Databricks 上的 CDC 记录。 但是, MERGE INTO 由于序列外记录或需要复杂的逻辑重新排序记录,可能会生成不正确的结果。
AUTO CDC 通过自动处理无序记录来简化 CDC。 指定用于标识记录的键、排序的序列列,以及是将结果存储为 SCD 类型 1(直接更新)还是 SCD 类型 2(历史记录跟踪)。
Syntax
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
默认行为 INSERT 和 UPDATE 事件是从源更新 CDC 事件的:更新与指定键匹配的目标表中的任何行,或者在目标表中不存在匹配记录时插入新行。 可以通过DELETE条件来指定APPLY AS DELETE WHEN事件的处理。
参数
source数据的源。 源必须是流式处理源。 要使用流式处理语义从源中读取,请使用
STREAM关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。有关流数据的详细信息,请参阅 使用管道转换数据。
KEYS用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。
若要定义列的组合,请使用以逗号分隔的列列表。
此条款是必要的。
IGNORE NULL UPDATES允许导入包含目标列子集的更新。 当 CDC 事件与现有行匹配并
IGNORE NULL UPDATES指定时,具有null值的列会保留其目标中的现有值。 这也适用于具有null值的嵌套列。此子句是可选的。
默认是用
null值覆盖现有列。APPLY AS DELETE WHEN指定何时应将 CDC 事件视为
DELETE而不是更新插入。对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用
pipelines.cdc.tombstoneGCThresholdInSeconds配置保留间隔。此子句是可选的。
APPLY AS TRUNCATE WHEN指定何时应将 CDC 事件视为完整表
TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。APPLY AS TRUNCATE WHEN子句仅支持 SCD 类型 1。 SCD 类型 2 不支持截断操作。此子句是可选的。
SEQUENCE BY指定源数据中 CDC 事件的逻辑顺序的列名。 管道处理使用此排序来处理无序到达的更改事件。
如果需要多个列进行排序,可以使用表达式,方法是:它将先按第一个
STRUCT结构字段进行排序,然后按第二个字段进行排序(如果有相同的值)。指定的列必须是可排序的数据类型。
此条款是必要的。
COLUMNS指定要包含在目标表中的列的子集。 您可以选择:
- 指定要包括的列的完整列表:
COLUMNS (userId, name, city)。 - 指定要排除的列的列表:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是可选的。
当未指定
COLUMNS子句时,默认是在目标表中包括所有列。- 指定要包括的列的完整列表:
STORED AS将记录存储为 SCD 类型 1 还是 SCD 类型 2。
此子句是可选的。
默认值为 SCD 类型 1。
TRACK HISTORY ON指定输出列的子集,以在对这些指定列进行任何更改时生成历史记录。 您可以选择:
- 指定要跟踪的列的完整列表:
COLUMNS (userId, name, city)。 - 指定要从跟踪中排除的列的列表:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是可选的。 默认值是在发生任何更改时跟踪所有输出列的历史记录,等效于
TRACK HISTORY ON *。- 指定要跟踪的列的完整列表:
示例
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);