通过


CREATE STREAMING TABLE ...FLOW AUTO CDC

适用于:勾选“是” Databricks SQL

重要

此功能在 Beta 版中。 需要 Databricks Runtime 17.3 及更高版本。

使用子 FLOW AUTO CDCCREATE STREAMING TABLE 来处理源中的变更数据捕获(CDC)记录到流式处理表中。

以前,该 MERGE INTO 语句通常用于处理 Azure Databricks 上的 CDC 记录。 但是, MERGE INTO 由于序列外记录或需要复杂的逻辑重新排序记录,可能会生成不正确的结果。

AUTO CDC 通过自动处理无序记录来简化 CDC。 指定用于标识记录的键、排序的序列列,以及是将结果存储为 SCD 类型 1(直接更新)还是 SCD 类型 2(历史记录跟踪)。

Syntax

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

默认行为 INSERTUPDATE 事件是从源更新 CDC 事件的:更新与指定键匹配的目标表中的任何行,或者在目标表中不存在匹配记录时插入新行。 可以通过DELETE条件来指定APPLY AS DELETE WHEN事件的处理。

参数

  • source

    数据的源。 源必须是流式处理源。 要使用流式处理语义从源中读取,请使用 STREAM 关键字。 如果读取遇到对现有记录的更改或删除,则会引发错误。 从静态源或仅限追加的源读取是最安全的。

    有关流数据的详细信息,请参阅 使用管道转换数据

  • KEYS

    用于唯一标识源数据中的行的列或列组合。 这些列中的值用于标识哪些 CDC 事件应用于目标表中的特定记录。

    若要定义列的组合,请使用以逗号分隔的列列表。

    此条款是必要的。

  • IGNORE NULL UPDATES

    允许导入包含目标列子集的更新。 当 CDC 事件与现有行匹配并 IGNORE NULL UPDATES 指定时,具有 null 值的列会保留其目标中的现有值。 这也适用于具有 null 值的嵌套列。

    此子句是可选的。

    默认是用 null 值覆盖现有列。

  • APPLY AS DELETE WHEN

    指定何时应将 CDC 事件视为 DELETE 而不是更新插入。

    对于 SCD 类型 2 源,为了处理无序数据,已删除的行暂时保留为基础 Delta 表中的墓碑,并在元存储中创建一个视图,用于筛选掉这些墓碑。 可以使用pipelines.cdc.tombstoneGCThresholdInSeconds配置保留间隔。

    此子句是可选的。

  • APPLY AS TRUNCATE WHEN

    指定何时应将 CDC 事件视为完整表 TRUNCATE。 由于此子句会触发目标表的完全截断,因此应仅将其用于需要此功能的特定用例。

    APPLY AS TRUNCATE WHEN子句仅支持 SCD 类型 1。 SCD 类型 2 不支持截断操作。

    此子句是可选的。

  • SEQUENCE BY

    指定源数据中 CDC 事件的逻辑顺序的列名。 管道处理使用此排序来处理无序到达的更改事件。

    如果需要多个列进行排序,可以使用表达式,方法是:它将先按第一个 STRUCT 结构字段进行排序,然后按第二个字段进行排序(如果有相同的值)。

    指定的列必须是可排序的数据类型。

    此条款是必要的。

  • COLUMNS

    指定要包含在目标表中的列的子集。 您可以选择:

    • 指定要包括的列的完整列表: COLUMNS (userId, name, city)
    • 指定要排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。

    当未指定COLUMNS子句时,默认是在目标表中包括所有列。

  • STORED AS

    将记录存储为 SCD 类型 1 还是 SCD 类型 2。

    此子句是可选的。

    默认值为 SCD 类型 1。

  • TRACK HISTORY ON

    指定输出列的子集,以在对这些指定列进行任何更改时生成历史记录。 您可以选择:

    • 指定要跟踪的列的完整列表: COLUMNS (userId, name, city)
    • 指定要从跟踪中排除的列的列表: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是可选的。 默认值是在发生任何更改时跟踪所有输出列的历史记录,等效于 TRACK HISTORY ON *

示例

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);