在 Azure Databricks 上使用 Delta Lake 更改数据馈送

更改数据馈送允许 Azure Databricks 跟踪 Delta 表版本之间的行级别更改。 对 Delta 表启用此功能后,运行时会记录写入该表的所有数据的“更改事件”。 这包括行数据以及指示已插入、已删除还是已更新指定行的元数据。

重要

更改数据馈送与表历史记录协同工作,以提供更改信息。 由于克隆 Delta 表会创建单独的历史记录,因此克隆表上的更改数据馈送与原始表的更改数据馈送不匹配。

以增量方式处理更改数据

Databricks 建议结合使用更改数据馈送和结构化流式处理,以增量处理 Delta 表中的更改。 必须使用 Azure Databricks 的结构化流式处理来自动跟踪表更改数据馈送的版本。

注意

增量实时表提供的功能可用于轻松传播更改数据并将结果存储为 SCD(渐变维度)类型 1 或类型 2 表。 请参阅 APPLY CHANGES API:使用增量实时表简化变更数据捕获

要从表中读取更改数据馈送,必须在该表上启用更改数据馈送。 请参阅启用更改数据馈送

当配置针对表的流来读取更改数据馈送时,将 readChangeFeed 选项设置为 true,如以下语法示例所示:

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("myDeltaTable")

默认情况下,流在首次启动时将表的最新快照作为 INSERT 返回,并将未来的更改作为更改数据返回。

更改数据作为 Delta Lake 事务的一部分的提交,并在向表提交新数据的同时变为可用。

可根据需要指定起始版本。 请参阅是否应指定起始版本?

更改数据馈送还支持批量执行,这需要指定起始版本。 请参阅在批量查询中读取更改

读取更改数据时还支持速率限制(maxFilesPerTriggermaxBytesPerTrigger)和 excludeRegex 等选项。

对于除起始快照版本之外的版本,速率限制可以是原子性的。 也就是说,整个提交版本将受到速率限制,或者将返回整个提交。

是否应指定起始版本?

如果想要忽略在特定版本之前发生的更改,可选择指定起始版本。 可以使用时间戳或 Delta 事务日志中记录的版本 ID 号来指定版本。

注意

批量读取需要起始版本,许多批处理模式可以从设置可选的结束版本中受益。

重新配置涉及更改数据馈送的结构化流式处理工作负载时,请务必了解指定起始版本会如何影响处理。

许多流式处理工作负载(尤其是新的数据处理管道)受益于默认行为。 在默认行为中,当流第一次将表中的所有现有记录记载为更改数据反馈中的 INSERT 操作时,会处理第一个批。

如果目标表已经包含所有记录,且有到特定时间点的适当更改,请执行起始版本,以避免将源表状态作为 INSERT 事件进行处理。

下面的示例语法从检查点损坏的流式处理失败中恢复。 在本例中,假设条件如下:

  1. 在创建表时,源表上启用了更改数据馈送。
  2. 目标下游表已处理直到版本 75(含)的所有更改。
  3. 源表的版本历史记录可用于版本 70 及更高版本。

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 76)
  .table("source_table")

在此示例中,还必须指定新的检查点位置。

重要

如果指定起始版本,那么当表历史记录中不再有起始版本时,流无法从新的检查点启动。 Delta Lake 会自动清理历史版本,这意味着最终会删除所有指定的起始版本。

请参阅是否可使用更改数据馈送来重播表的整个历史记录?

在批量查询中读取更改

可以使用批量查询语法来读取从特定版本开始的所有更改,或者读取指定版本范围中的更改。

将版本指定为整数,将时间戳指定为字符串,格式为 yyyy-MM-dd[ HH:mm:ss[.SSS]]

开始和结束版本包含在查询中。 若要读取从表的特定开始版本到最新版本的更改,请仅指定起始版本。

如果提供的版本较低或提供的时间戳早于已记录更改事件的时间戳,那么启用更改数据馈送时,会引发错误,指示未启用更改数据馈送。

以下语法示例演示如何对批量读取使用起始和结束版本选项:

SQL

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')

Python

# version as ints or longs
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 10) \
  .table("myDeltaTable")

# timestamps as formatted timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingTimestamp", '2021-04-21 05:45:46') \
  .option("endingTimestamp", '2021-05-21 12:00:00') \
  .table("myDeltaTable")

# providing only the startingVersion/timestamp
spark.read.format("delta") \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .table("myDeltaTable")

Scala

// version as ints or longs
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 10)
  .table("myDeltaTable")

// timestamps as formatted timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2021-04-21 05:45:46")
  .option("endingTimestamp", "2021-05-21 12:00:00")
  .table("myDeltaTable")

// providing only the startingVersion/timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("myDeltaTable")

注意

默认情况下,如果用户传入的版本或时间戳超过了表的最后一次提交,则会引发错误 timestampGreaterThanLatestCommit。 在 Databricks Runtime 11.3 LTS 及更高版本中,如果用户将以下配置设置为 true,则更改数据馈送可以处理范围外版本的情况:

set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;

如果提供的起始版本大于表上最后提交版本,或提供的起始时间戳比表上最后提交时间戳还要新,则启用上述配置后,将返回空的读取结果。

如果提供的最终版本大于表上最后提交版本,或提供的最终时间戳比表上最后提交时间戳还要新,则启用上述配置后,将返回空的读取结果。

什么是更改数据馈送的架构?

当你读取表的更改数据馈送时,将使用最新表版本的架构。

注意

完全支持大多数架构更改和演变操作。 启用了列映射的表不支持所有用例,它们会表现出不同的行为。 请参阅启用了列映射的表的更改数据馈送限制

除了 Delta 表架构中的数据列之外,更改数据馈送还包含用于标识更改事件类型的元数据列:

列名称 类型
_change_type 字符串 insert, update_preimage , update_postimage, delete (1)
_commit_version Long 包含更改的 Delta 日志或表版本。
_commit_timestamp 时间戳 与提交创建关联的时间戳。

(1) preimage 是更新前的值,postimage 是更新后的值

注意

如果架构包含与这些添加的列同名的列,则你无法在表上启用更改数据馈送。 在尝试启用更改数据馈送之前重命名表中的列即可解决此冲突。

启用更改数据馈送

只能读取已启用的表的更改数据馈送。 必须使用以下方法之一显式启用更改数据馈送选项:

  • 新表:在 CREATE TABLE 命令中设置表属性 delta.enableChangeDataFeed = true

    CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 现有表:在 ALTER TABLE 命令中设置表属性 delta.enableChangeDataFeed = true

    ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    
  • 所有新表

    set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
    

重要

仅记录启用更改数据馈送后所做的更改。 不会捕获之前对表所做的更改。

更改数据存储

启用更改数据馈送会导致表的存储成本略有增加。 更改数据记录是在查询运行时生成的,通常比重写文件的总大小要小得多。

Azure Databricks 将 UPDATEDELETEMERGE 操作的更改数据记录在表目录下的 _change_data 文件夹中。 某些操作(例如仅插入操作和完整分区删除)不会在 _change_data 目录中生成数据,因为 Azure Databricks 可以直接从事务日志高效计算更改数据馈送。

所有对 _change_data 文件夹中数据文件进行的读取都应通过受支持的 Delta Lake API 执行。

_change_data 文件夹中的文件遵循表的保留策略。 VACUUM 命令运行时,会删除更改数据馈送数据。

是否可使用更改数据馈送来重播表的整个历史记录?

更改数据馈送不打算用作表的所有更改的永久记录。 更改数据馈送仅记录在启用它后发生的更改。

通过更改数据馈送和 Delta Lake,你可始终重新构造源表的完整快照,这意味着你可以针对启用了更改数据馈送的表启动新的流式处理读取,并捕获该表的当前版本以及之后发生的所有更改。

必须将更改数据馈送中的记录视为暂时性记录,并且只能在指定的保留时段进行访问。 Delta 事务日志会定期删除表版本及其相应的更改数据馈送版本。 从事务日志中删除某个版本后,无法再读取该版本的更改数据馈送。

如果用例需要保留对表的所有更改的永久历史记录,则应使用增量逻辑将记录从更改数据馈送写入新表。 下面的代码示例演示如何使用 trigger.AvailableNow,它利用结构化流式处理的增量处理,但将可用数据作为批处理工作负载进行处理。 可使用主处理管道异步计划此工作负载,以创建更改数据馈送的备份来进行审核或完全可重播。

Python

(spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(availableNow=True)
  .toTable("target_table")
)

Scala

spark.readStream.format("delta")
  .option("readChangeFeed", "true")
  .table("source_table")
  .writeStream
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(Trigger.AvailableNow)
  .toTable("target_table")

启用了列映射的表的更改数据馈送限制

在 Delta 表上启用列映射后,可以删除或重命名表中的列,而无需重写现有数据的数据文件。 如果启用了列映射,在执行非添加性架构更改(例如重命名或删除列、更改数据类型或更改可为 null 性)后,更改数据馈送存在限制。

重要

  • 无法使用批处理语义读取发生非添加性架构更改的事务或范围的更改数据馈送。
  • 在 Databricks Runtime 12.2 LTS 及更低版本中,启用了列映射且经历过非添加性架构更改的表不支持对更改数据馈送执行流式读取。 请参阅使用列映射和架构更改进行流式处理
  • 在 Databricks Runtime 11.3 LTS 及更低版本中,无法读取启用了列映射且经历过列重命名或删除操作的表的更改数据馈送。

在 Databricks Runtime 12.2 LTS 及更高版本中,可以对启用了列映射且经历过非添加性架构更改的表的更改数据馈送执行批量读取。 读取操作不使用表的最新版本的架构,而是使用查询中指定的表的最终版本的架构。 如果指定的版本范围涵盖非添加性架构更改,查询仍会失败。