使用 Delta Lake 表历史记录
修改 Delta Lake 表的每个操作都会创建新的表版本。 可以使用历史记录信息来审核操作、回滚表或查询特定时间点的表(使用按时间顺序查看)。
注意
Databricks 不建议使用 Delta Lake 表历史记录作为数据存档的长期备份解决方案。 Databricks 建议仅使用过去 7 天进行“按时间顺序查看”操作,除非你已将数据和日志保留配置设置为更大的值。
检索 Delta 表历史记录
可以通过运行 history
命令来检索信息,包括每次将内容写入 Delta 表时对应的操作、用户和时间戳。 按时间倒序返回返回操作。
表历史记录保留期取决于表设置 delta.logRetentionDuration
,后者默认为 30 天。
注意
按时间顺序查看和表历史记录由不同的保留期阈值控制。 请参阅什么是 Delta Lake 按时间顺序查看?。
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY。
请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。
目录资源管理器提供此详细表信息的视觉对象视图和 Delta 表的历史记录。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 DESCRIBE HISTORY
一起显示的表历史记录。
历史记录架构
history
操作的输出包含以下列。
列 | 类型 | 说明 |
---|---|---|
版本 | long | 通过操作生成的表版本。 |
timestamp | timestamp | 提交此版本的时间。 |
userId | 字符串 | 运行操作的用户的 ID。 |
userName | 字符串 | 运行操作的用户的姓名。 |
operation | 字符串 | 操作的名称。 |
operationParameters | map | 操作的参数(例如谓词。) |
作业 (job) | struct | 运行操作的作业的详细信息。 |
笔记本 | struct | 运行操作的笔记本的详细信息。 |
clusterId | 字符串 | 运行操作的群集的 ID。 |
readVersion | long | 读取以执行写入操作的表的版本。 |
isolationLevel | 字符串 | 用于此操作的隔离级别。 |
isBlindAppend | boolean | 此操作是否追加数据。 |
operationMetrics | map | 操作的指标(例如已修改的行数和文件数。) |
userMetadata | 字符串 | 用户定义的提交元数据(如果已指定) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
注意
- 如果使用以下方法写入 Delta 表,则其他一些列不可用:
- 将来添加的列将始终添加到最后一列的后面。
操作指标说明
history
操作返回 operationMetrics
列映射中操作指标的集合。
下表按操作列出了映射键定义。
操作 | 指标名称 | 说明 |
---|---|---|
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO | ||
numFiles | 写入的文件数。 | |
numOutputBytes | 已写入的内容的大小(以字节为单位)。 | |
numOutputRows | 写入的行数。 | |
STREAMING UPDATE | ||
numAddedFiles | 添加的文件数。 | |
numRemovedFiles | 删除的文件数。 | |
numOutputRows | 写入的行数。 | |
numOutputBytes | 写入大小(以字节为单位)。 | |
DELETE | ||
numAddedFiles | 添加的文件数。 删除表的分区时未提供。 | |
numRemovedFiles | 删除的文件数。 | |
numDeletedRows | 删除的行数。 删除表的分区时未提供。 | |
numCopiedRows | 在删除文件期间复制的行数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
rewriteTimeMs | 重写匹配文件所用的时间。 | |
TRUNCATE | ||
numRemovedFiles | 删除的文件数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
MERGE | ||
numSourceRows | 源数据帧中的行数。 | |
numTargetRowsInserted | 插入到目标表的行数。 | |
numTargetRowsUpdated | 目标表中更新的行数。 | |
numTargetRowsDeleted | 目标表中删除的行数。 | |
numTargetRowsCopied | 复制的目标行数。 | |
numOutputRows | 写出的总行数。 | |
numTargetFilesAdded | 添加到接收器(目标)的文件数。 | |
numTargetFilesRemoved | 从接收器(目标)删除的文件数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
rewriteTimeMs | 重写匹配文件所用的时间。 | |
UPDATE | ||
numAddedFiles | 添加的文件数。 | |
numRemovedFiles | 删除的文件数。 | |
numUpdatedRows | 更新的行数。 | |
numCopiedRows | 刚才在更新文件期间复制的行数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
rewriteTimeMs | 重写匹配文件所用的时间。 | |
FSCK | numRemovedFiles | 删除的文件数。 |
CONVERT | numConvertedFiles | 已转换的 Parquet 文件数。 |
OPTIMIZE | ||
numAddedFiles | 添加的文件数。 | |
numRemovedFiles | 优化的文件数。 | |
numAddedBytes | 优化表后添加的字节数。 | |
numRemovedBytes | 删除的字节数。 | |
minFileSize | 优化表后最小文件的大小。 | |
p25FileSize | 优化表后第 25 个百分位文件的大小。 | |
p50FileSize | 优化表后的文件大小中值。 | |
p75FileSize | 优化表后第 75 个百分位文件的大小。 | |
maxFileSize | 优化表后最大文件的大小。 | |
克隆 | ||
sourceTableSize | 所克隆版本的源表的大小(以字节为单位)。 | |
sourceNumOfFiles | 源表中已克隆版本的文件数。 | |
numRemovedFiles | 目标表中删除的文件数(如果替换了先前的 Delta 表)。 | |
removedFilesSize | 如果替换了先前的 Delta 表,则为目标表中删除文件的总大小(以字节为单位)。 | |
numCopiedFiles | 复制到新位置的文件数。 如果是浅表克隆,则为 0。 | |
copiedFilesSize | 复制到新位置的文件总大小(以字节为单位)。 如果是浅表克隆,则为 0。 | |
RESTORE | ||
tableSizeAfterRestore | 还原后的表大小(字节)。 | |
numOfFilesAfterRestore | 还原后表中的文件数。 | |
numRemovedFiles | 还原操作删除的文件数。 | |
numRestoredFiles | 由于还原而添加的文件数。 | |
removedFilesSize | 还原操作删除的文件的大小(字节)。 | |
restoredFilesSize | 还原操作添加的文件的大小(字节)。 | |
VACUUM | ||
numDeletedFiles | 已删除的文件数。 | |
numVacuumedDirectories | 已清空的目录数。 | |
numFilesToDelete | 要删除的文件数。 |
什么是 Delta Lake 按时间顺序查看?
Delta Lake 按时间顺序查看支持根据时间戳或表版本(正如事务日志中记录的那样)查询以前的表版本。 可以对应用程序使用“按时间顺序查看”,如下所述:
- 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
- 编写复杂的时态查询。
- 修复数据中的错误。
- 为针对快速变化表的一组查询提供快照隔离。
重要
可通过“按时间顺序查看”访问的表版本取决于事务日志文件的保留期阈值以及 VACUUM
操作的频率和指定保留期。 如果使用默认值每天运行 VACUUM
,则 7 天的数据可用于“按时间顺序查看”。
Delta 按时间顺序查看语法
可以通过在表名规范后添加子句,来使用“按时间顺序查看”查询 Delta 表。
timestamp_expression
可以是下列项中的任意一项:'2018-10-18T22:15:12.013Z'
,即可以强制转换为时间戳的字符串cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
,即日期字符串current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- 本身就是时间戳或可强制转换为时间戳的任何其他表达式
version
是可以从DESCRIBE HISTORY table_spec
的输出中获取的 long 值。
timestamp_expression
和 version
都不能是子查询。
只接受日期或时间戳字符串。 例如,"2019-01-01"
和 "2019-01-01T00:00:00.000Z"
。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
Python
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
还可以使用 @
语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS
格式。 你可以通过在版本前附加一个 v
在 @
后指定版本。 有关示例语法,请参阅以下代码:
SQL
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
Python
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
什么是事务日志检查点?
Delta Lake 将表版本记录为 _delta_log
目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,Delta Lake 将表版本聚合到 Parquet 检查点文件,因此无需读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。
为“按时间顺序查看”查询配置数据保留
若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。
针对表运行 VACUUM
时,会删除数据文件。 Delta Lake 会在对表版本执行检查点操作后自动管理日志文件删除操作。
由于会针对大多数 Delta 表定期运行 VACUUM
,因此时间点查询应认可 VACUUM
的保留期阈值(默认为 7 天)。
若要提高 Delta 表的数据保留期阈值,必须配置以下表属性:
delta.logRetentionDuration = "interval <interval>"
:控制表的历史记录的保留时间长度。 默认为interval 30 days
。delta.deletedFileRetentionDuration = "interval <interval>"
:确定由VACUUM
用来删除当前表版本中不再引用的数据文件的阈值。 默认为interval 7 days
。
可以在创建表期间指定 Delta 属性,也可使用 ALTER TABLE
语句设置它们。 请参阅 Delta 表属性参考。
注意
必须设置这两个属性,以确保对于频繁执行 VACUUM
操作的表,表历史记录可以保留较长时间。 例如,若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"
(与 delta.logRetentionDuration
的默认设置匹配)。
随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。
将 Delta 表还原到早期状态
你可以使用 RESTORE
命令将 Delta 表还原到其以前的状态。 Delta 表在内部维护该表的历史版本,使其能够还原到以前的状态。
RESTORE
命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。
重要
- 可以还原已经还原的表。
- 可以还原克隆的表。
- 对于要还原的表,你必须拥有
MODIFY
权限。 - 不能将表还原为通过手动方式或
vacuum
方式删除数据文件的旧版本。 如果将spark.sql.files.ignoreMissingFiles
设置为true
,则仍可部分还原为此版本。 - 用于还原到早期状态的时间戳格式是
yyyy-MM-dd HH:mm:ss
。 还支持仅提供 date(yyyy-MM-dd
) 字符串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
有关语法详细信息,请参阅 RESTORE。
重要
还原被视为一种数据更改操作。 RESTORE
命令添加的 Delta Lake 日志条目包含设置为 true 的 dataChange。 如果存在下游应用程序,例如用于处理对 Delta Lake 表的更新的结构化流式处理作业,则还原操作添加的数据更改日志条目将被视为新的数据更新,处理这些更新可能导致重复数据。
例如:
表版本 | 操作 | 增量日志更新 | 数据更改日志更新中的记录 |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (不会生成记录,因为“优化压缩”不会更改表中的数据) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
在上面的示例中,RESTORE
命令导致在读取增量表版本 0 和 1 时已经可以看到更新。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。
还原指标
RESTORE
在操作完成后以单行数据帧的形式报告以下指标:
table_size_after_restore
:还原后表的大小。num_of_files_after_restore
:还原后表中的文件数。num_removed_files
:从表中删除(逻辑删除)的文件数。num_restored_files
:由于回退而还原的文件数。removed_files_size
:从表中删除的文件的总大小(以字节为单位)。restored_files_size
:已还原的文件的总大小(以字节为单位)。
使用 Delta Lake 按时间顺序查看的示例
为用户
111
修复对表的意外删除问题:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
修复对表的意外错误更新:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
查询在过去一周内增加的新客户的数量。
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
如何在 Spark 会话中找到上次提交的版本?
若要获取当前 SparkSession
在所有线程和所有表中写入的最后一个提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession
。
SQL
SET spark.databricks.delta.lastCommitVersionInSession
Python
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
Scala
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
如果 SparkSession
未进行任何提交,则查询该键时将返回空值。
注意
如果在多个线程之间共享相同的 SparkSession
,则类似于在多个线程之间共享变量;你可能会达到争用条件,因为配置值会同时更新。