在增量实时表中使用 APPLY CHANGES API 简化变更数据捕获

增量实时表使用 APPLY CHANGES API 简化了变更数据捕获 (CDC)。 以前,通常使用 MERGE INTO 语句处理 Azure Databricks 上的 CDC 记录。 但是,MERGE INTO 可能会由于无序记录而生成不正确的结果,或者需要复杂的逻辑来重新排序记录。

增量实时表中的 APPLY CHANGES API 可自动处理无序记录,从而确保正确处理 CDC 记录,而且无需开发复杂的逻辑来处理无序记录。

增量实时表 SQL 和 Python 接口支持 APPLY CHANGES API,包括支持使用 SCD 类型 1 和类型 2 更新表:

  • 使用 SCD 类型 1 直接更新记录。 不为更新后的记录保留历史记录。
  • 使用 SCD 类型 2 保留有关所有更新或者对指定列集的更新的记录的历史记录。

有关语法和其他参考,请参阅:

注意

本文介绍如何根据源数据中的更改来更新增量实时表管道中的表。 若要了解如何记录和查询 Delta 表的行级更改信息,请参阅在 Azure Databricks 上使用 Delta Lake 更改数据馈送

如何使用增量实时表实现 CDC?

必须在源数据中指定一列作为记录排序依据,增量实时表将此顺序解释为源数据正确排序的单调递增表示形式。 增量实时表会自动处理不按顺序到达的数据。 对于 SCD 类型 2 的更改,增量实时表会将相应的顺序值传播到目标表的 __START_AT__END_AT 列。 每个顺序值的每个键应该有一个非重复更新,不支持 NULL 顺序值。

若要使用增量实时表执行 CDC 处理,首先需要创建一个流式处理表,然后使用 APPLY CHANGES INTO 语句指定更改源的源、键和顺序。 若要创建目标流式处理表,请使用 SQL 中的 CREATE OR REFRESH STREAMING TABLE 语句或 Python 中的 create_streaming_table() 函数。 若要创建用于定义 CDC 处理的语句,请使用 SQL 中的 APPLY CHANGES 语句或 Python 中的 apply_changes() 函数。 有关语法详细信息,请参阅在增量实时表中使用 SQL 进行变更数据捕获在增量实时表中使用 Python 进行变更数据捕获

哪些数据对象用于增量实时表 CDC 处理?

在 Hive 元存储中声明目标表时,会创建两个数据结构:

  • 一个使用分配给目标表的名称的视图。
  • 一个由增量实时表用来管理 CDC 处理的内部支持表。 此表的命名方式是在目标表名称的前面加上 __apply_changes_storage_

例如,如果你声明一个名为 dlt_cdc_target 的目标表,则会在元存储中看到一个名为 dlt_cdc_target 的视图和一个名为 __apply_changes_storage_dlt_cdc_target 的表。 创建视图后,增量实时表可以筛选出处理无序数据所需的额外信息(例如逻辑删除和版本)。 若要查看处理后的数据,请查询目标视图。 由于表 __apply_changes_storage_ 的架构可能会更改以支持将来的功能或增强功能,因此不应查询表以供生产使用。 如果手动在表中添加数据,则认为记录出现在其他更改之前,因为缺少版本列。

如果管道发布到 Unity Catalog,则用户无法访问内部支持表。

获取有关由 Delta Live Tables CDC 查询处理的记录的数据

以下指标由 apply changes 查询捕获:

  • num_upserted_rows:在更新期间更新插入数据集的输出行数。
  • num_deleted_rows:在更新期间从数据集中删除的现有输出行数。

对于 apply changes 查询,不会捕获作为非 CDC 流输出的 num_output_rows 指标。

限制

APPLY CHANGES INTO 查询或 apply_changes 函数的目标不可用作流式处理表的源。 读取 APPLY CHANGES INTO 查询或 apply_changes 函数中目标的表必须是具体化视图。

Azure Databricks 上的SCD 类型 1 和 SCD 类型 2

以下部分提供的示例演示了根据源事件更新目标表的增量实时表 SCD 类型 1 和类型 2 查询,其具体操作如下:

  1. 创建新的用户记录。
  2. 删除用户记录。
  3. 更新用户记录。 在 SCD 类型 1 示例中,最后的 UPDATE 操作延迟到达并从目标表中删除,展示了无序事件的处理。

以下所有示例假设你知道如何配置和更新增量实时表管道。 请参阅教程:运行第一个增量实时表管道

若要运行这些示例,必须首先创建一个示例数据集。 请参阅生成测试数据

下面是这些示例的输入记录:

userId name city operation sequenceNum
124 Raul 瓦哈卡 INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 Null null DELETE 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel 奇瓦瓦 UPDATE 5

如果你取消注释示例数据中的最后一行,则会插入以下记录用于指定记录的截断位置:

userId name city operation sequenceNum
null Null null TRUNCATE 3

注意

以下所有示例都包含用于指定 DELETETRUNCATE 操作的选项,但其中的每个选项都是可选的。

处理 SCD 类型 1 更新

以下代码示例演示如何处理 SCD 类型 1 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

运行 SCD 类型 1 示例后,目标表包含以下记录:

userId name city
124 Raul 瓦哈卡
125 Mercedes Guadalajara
126 Lily Cancun

使用附加的 TRUNCATE 记录运行 SCD 类型 1 示例后,由于 sequenceNum=3 处的 TRUNCATE 操作,124126 的记录将被截断,并且目标表包含以下记录:

userId name city
125 Mercedes Guadalajara

处理 SCD 类型 2 更新

以下代码示例演示如何处理 SCD 类型 2 更新:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

运行 SCD 类型 2 示例后,目标表包含以下记录:

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel 奇瓦瓦 5 6
124 Raul 瓦哈卡 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 Null

SCD 类型 2 查询还可以指定一个输出列的子集,以便跟踪目标表中的历史记录。 对其他列的更改都就地更新,而不是生成新的历史记录。 以下示例演示了如何从跟踪中排除 city 列:

以下示例演示如何通过 SCD 类型 2 使用跟踪历史记录:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

在没有额外 TRUNCATE 记录的情况下运行此示例后,目标表包含以下记录:

userId name city __START_AT __END_AT
123 Isabel 奇瓦瓦 1 6
124 Raul 瓦哈卡 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

生成测试数据

以下代码用于生成可在本教程的示例查询中使用的示例数据集。 假设你拥有创建新架构和新表的适当凭据,你可以使用笔记本或 Databricks SQL 执行这些语句。 以下代码不能作为增量实时表管道的一部分运行:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

在目标流式处理表中添加、更改或删除数据

如果管道将表发布到 Unity Catalog,则可以使用数据操作语言 (DML) 声明(包括插入、更新、删除与合并声明)来修改由 APPLY CHANGES INTO 声明创建的目标流式处理表。

注意

  • 不支持 DML 声明修改流式处理表的表架构。 确保 DML 语句不会尝试修改表架构。
  • 只能在使用 Databricks Runtime 13.3 LTS 及更高版本的 Unity Catalog 共享群集或 SQL 仓库中运行用于更新流式处理表的 DML 语句。
  • 由于流式传输要求仅追加数据源,因此如果你的处理需要从包含更改的源流式传输表进行流式传输(例如,通过 DML 语句),请在读取源流式传输表时设置 skipChangeCommits 标志。 设置 skipChangeCommits 后,将会忽略删除或修改源表上记录的事务。 如果处理不需要流式处理表,则可以使用具体化视图(没有仅追加限制)作为目标表。

由于增量实时表使用指定的 SEQUENCE BY 列并将适当的排序值传播到(供 SCD 类型 2 使用的)目标表的 __START_AT__END_AT 列,因此必须确保 DML 声明使用这些列的有效值,确保记录的顺序正确。 请参阅如何使用增量实时表实现 CDC?

有关如何使用 DML 声明处理流式处理表的详细信息,请参阅在流式处理表中添加、更改或删除数据

以下示例插入一个起始序列为 5 的活动记录:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);