培训
使用 Delta Lake 表历史记录
修改 Delta Lake 表的每个操作都会创建新的表版本。 可以使用历史记录信息来审核操作、回滚表或查询特定时间点的表(使用按时间顺序查看)。
备注
Databricks 不建议使用 Delta Lake 表历史记录作为数据存档的长期备份解决方案。 Databricks 建议仅使用过去 7 天进行“按时间顺序查看”操作,除非你已将数据和日志保留配置设置为更大的值。
可以通过运行 history
命令来检索信息,包括每次将内容写入 Delta 表时对应的操作、用户和时间戳。 按时间倒序返回返回操作。
表历史记录保留期取决于表设置 delta.logRetentionDuration
,后者默认为 30 天。
备注
按时间顺序查看和表历史记录由不同的保留期阈值控制。 请参阅什么是 Delta Lake 按时间顺序查看?。
DESCRIBE HISTORY table_name -- get the full history of the table
DESCRIBE HISTORY table_name LIMIT 1 -- get the last operation only
有关 Spark SQL 语法的详细信息,请参阅 DESCRIBE HISTORY。
请参阅 Delta Lake API 文档,了解 Scala/Java/Python 语法详细信息。
目录资源管理器提供此详细表信息的视觉对象视图和 Delta 表的历史记录。 除了表架构和示例数据之外,你还可以单击“历史记录”选项卡以查看随 DESCRIBE HISTORY
一起显示的表历史记录。
history
操作的输出包含以下列。
列 | 类型 | 说明 |
---|---|---|
版本 | long | 通过操作生成的表版本。 |
timestamp | timestamp | 提交此版本的时间。 |
userId | 字符串 | 运行操作的用户的 ID。 |
userName | 字符串 | 运行操作的用户的姓名。 |
operation | 字符串 | 操作的名称。 |
operationParameters | map | 操作的参数(例如谓词。) |
作业 (job) | struct | 运行操作的作业的详细信息。 |
笔记本 | struct | 运行操作的笔记本的详细信息。 |
clusterId | 字符串 | 运行操作的群集的 ID。 |
readVersion | long | 读取以执行写入操作的表的版本。 |
isolationLevel | 字符串 | 用于此操作的隔离级别。 |
isBlindAppend | boolean | 此操作是否追加数据。 |
operationMetrics | map | 操作的指标(例如已修改的行数和文件数。) |
userMetadata | 字符串 | 用户定义的提交元数据(如果已指定) |
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version| timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion| isolationLevel|isBlindAppend| operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
| 5|2019-07-29 14:07:47| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 4|WriteSerializable| false|[numTotalRows -> ...|
| 4|2019-07-29 14:07:41| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 3|WriteSerializable| false|[numTotalRows -> ...|
| 3|2019-07-29 14:07:29| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 2|WriteSerializable| false|[numTotalRows -> ...|
| 2|2019-07-29 14:06:56| ###| ###| UPDATE|[predicate -> (id...|null| ###| ###| 1|WriteSerializable| false|[numTotalRows -> ...|
| 1|2019-07-29 14:04:31| ###| ###| DELETE|[predicate -> ["(...|null| ###| ###| 0|WriteSerializable| false|[numTotalRows -> ...|
| 0|2019-07-29 14:01:40| ###| ###| WRITE|[mode -> ErrorIfE...|null| ###| ###| null|WriteSerializable| true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
备注
- 如果使用以下方法写入 Delta 表,则其他一些列不可用:
- 将来添加的列将始终添加到最后一列的后面。
history
操作返回 operationMetrics
列映射中操作指标的集合。
下表按操作列出了映射键定义。
操作 | 指标名称 | 说明 |
---|---|---|
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO | ||
numFiles | 写入的文件数。 | |
numOutputBytes | 已写入的内容的大小(以字节为单位)。 | |
numOutputRows | 写入的行数。 | |
STREAMING UPDATE | ||
numAddedFiles | 添加的文件数。 | |
numRemovedFiles | 删除的文件数。 | |
numOutputRows | 写入的行数。 | |
numOutputBytes | 写入大小(以字节为单位)。 | |
DELETE | ||
numAddedFiles | 添加的文件数。 删除表的分区时未提供。 | |
numRemovedFiles | 删除的文件数。 | |
numDeletedRows | 删除的行数。 删除表的分区时未提供。 | |
numCopiedRows | 在删除文件期间复制的行数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
rewriteTimeMs | 重写匹配文件所用的时间。 | |
TRUNCATE | ||
numRemovedFiles | 删除的文件数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
MERGE | ||
numSourceRows | 源数据帧中的行数。 | |
numTargetRowsInserted | 插入到目标表的行数。 | |
numTargetRowsUpdated | 目标表中更新的行数。 | |
numTargetRowsDeleted | 目标表中删除的行数。 | |
numTargetRowsCopied | 复制的目标行数。 | |
numOutputRows | 写出的总行数。 | |
numTargetFilesAdded | 添加到接收器(目标)的文件数。 | |
numTargetFilesRemoved | 从接收器(目标)删除的文件数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
rewriteTimeMs | 重写匹配文件所用的时间。 | |
UPDATE | ||
numAddedFiles | 添加的文件数。 | |
numRemovedFiles | 删除的文件数。 | |
numUpdatedRows | 更新的行数。 | |
numCopiedRows | 刚才在更新文件期间复制的行数。 | |
executionTimeMs | 执行整个操作所用的时间。 | |
scanTimeMs | 扫描文件来查找匹配项所用的时间。 | |
rewriteTimeMs | 重写匹配文件所用的时间。 | |
FSCK | numRemovedFiles | 删除的文件数。 |
CONVERT | numConvertedFiles | 已转换的 Parquet 文件数。 |
OPTIMIZE | ||
numAddedFiles | 添加的文件数。 | |
numRemovedFiles | 优化的文件数。 | |
numAddedBytes | 优化表后添加的字节数。 | |
numRemovedBytes | 删除的字节数。 | |
minFileSize | 优化表后最小文件的大小。 | |
p25FileSize | 优化表后第 25 个百分位文件的大小。 | |
p50FileSize | 优化表后的文件大小中值。 | |
p75FileSize | 优化表后第 75 个百分位文件的大小。 | |
maxFileSize | 优化表后最大文件的大小。 | |
克隆 | ||
sourceTableSize | 所克隆版本的源表的大小(以字节为单位)。 | |
sourceNumOfFiles | 源表中已克隆版本的文件数。 | |
numRemovedFiles | 目标表中删除的文件数(如果替换了先前的 Delta 表)。 | |
removedFilesSize | 如果替换了先前的 Delta 表,则为目标表中删除文件的总大小(以字节为单位)。 | |
numCopiedFiles | 复制到新位置的文件数。 如果是浅表克隆,则为 0。 | |
copiedFilesSize | 复制到新位置的文件总大小(以字节为单位)。 如果是浅表克隆,则为 0。 | |
RESTORE | ||
tableSizeAfterRestore | 还原后的表大小(字节)。 | |
numOfFilesAfterRestore | 还原后表中的文件数。 | |
numRemovedFiles | 还原操作删除的文件数。 | |
numRestoredFiles | 由于还原而添加的文件数。 | |
removedFilesSize | 还原操作删除的文件的大小(字节)。 | |
restoredFilesSize | 还原操作添加的文件的大小(字节)。 | |
VACUUM | ||
numDeletedFiles | 已删除的文件数。 | |
numVacuumedDirectories | 已清空的目录数。 | |
numFilesToDelete | 要删除的文件数。 |
Delta Lake 按时间顺序查看支持根据时间戳或表版本(正如事务日志中记录的那样)查询以前的表版本。 可以对应用程序使用“按时间顺序查看”,如下所述:
- 重新创建分析、报表或输出(例如,机器学习模型的输出)。 这对于调试或审核非常有用,尤其是在管控行业中。
- 编写复杂的时态查询。
- 修复数据中的错误。
- 为针对快速变化表的一组查询提供快照隔离。
重要
可通过“按时间顺序查看”访问的表版本取决于事务日志文件的保留期阈值以及 VACUUM
操作的频率和指定保留期。 如果使用默认值每天运行 VACUUM
,则 7 天的数据可用于“按时间顺序查看”。
可以通过在表名规范后添加子句,来使用“按时间顺序查看”查询 Delta 表。
timestamp_expression
可以是下列项中的任意一项:'2018-10-18T22:15:12.013Z'
,即可以强制转换为时间戳的字符串cast('2018-10-18 13:36:32 CEST' as timestamp)
'2018-10-18'
,即日期字符串current_timestamp() - interval 12 hours
date_sub(current_date(), 1)
- 本身就是时间戳或可强制转换为时间戳的任何其他表达式
version
是可以从DESCRIBE HISTORY table_spec
的输出中获取的 long 值。
timestamp_expression
和 version
都不能是子查询。
只接受日期或时间戳字符串。 例如,"2019-01-01"
和 "2019-01-01T00:00:00.000Z"
。 有关示例语法,请参阅以下代码:
SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;
df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")
还可以使用 @
语法将时间戳或版本指定为表名称的一部分。 时间戳必须采用 yyyyMMddHHmmssSSS
格式。 你可以通过在版本前附加一个 v
在 @
后指定版本。 有关示例语法,请参阅以下代码:
SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123
spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")
Delta Lake 将表版本记录为 _delta_log
目录中的 JSON 文件,该目录与表数据一起存储。 为了优化检查点查询,Delta Lake 将表版本聚合到 Parquet 检查点文件,因此无需读取表历史记录的所有 JSON 版本。 Azure Databricks 针对数据大小和工作负荷优化检查点操作频率。 用户应该不需要直接与检查点交互。 检查点频率可能会更改,恕不另行通知。
若要查询以前的表版本,必须同时保留该版本的日志文件和数据文件。
针对表运行 VACUUM
时,会删除数据文件。 Delta Lake 会在对表版本执行检查点操作后自动管理日志文件删除操作。
由于会针对大多数 Delta 表定期运行 VACUUM
,因此时间点查询应认可 VACUUM
的保留期阈值(默认为 7 天)。
若要提高 Delta 表的数据保留期阈值,必须配置以下表属性:
delta.logRetentionDuration = "interval <interval>"
:控制表的历史记录的保留时间长度。 默认为interval 30 days
。delta.deletedFileRetentionDuration = "interval <interval>"
:确定由VACUUM
用来删除当前表版本中不再引用的数据文件的阈值。 默认为interval 7 days
。
可以在创建表期间指定 Delta 属性,也可使用 ALTER TABLE
语句设置它们。 请参阅 Delta 表属性参考。
备注
必须设置这两个属性,以确保对于频繁执行 VACUUM
操作的表,表历史记录可以保留较长时间。 例如,若要访问 30 天的历史数据,请设置 delta.deletedFileRetentionDuration = "interval 30 days"
(与 delta.logRetentionDuration
的默认设置匹配)。
随着维护的数据文件的增多,提高数据保留期阈值可能会导致存储成本上升。
你可以使用 RESTORE
命令将 Delta 表还原到其以前的状态。 Delta 表在内部维护该表的历史版本,使其能够还原到以前的状态。
RESTORE
命令支持使用一个与早期状态相对应的版本作为选项,或支持使用一个表明早期状态何时创建的时间戳作为选项。
重要
- 可以还原已经还原的表。
- 可以还原克隆的表。
- 对于要还原的表,你必须拥有
MODIFY
权限。 - 不能将表还原为通过手动方式或
vacuum
方式删除数据文件的旧版本。 如果将spark.sql.files.ignoreMissingFiles
设置为true
,则仍可部分还原为此版本。 - 用于还原到早期状态的时间戳格式是
yyyy-MM-dd HH:mm:ss
。 还支持仅提供 date(yyyy-MM-dd
) 字符串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;
有关语法详细信息,请参阅 RESTORE。
重要
还原被视为一种数据更改操作。 RESTORE
命令添加的 Delta Lake 日志条目包含设置为 true 的 dataChange。 如果存在下游应用程序,例如用于处理对 Delta Lake 表的更新的结构化流式处理作业,则还原操作添加的数据更改日志条目将被视为新的数据更新,处理这些更新可能导致重复数据。
例如:
表版本 | 操作 | 增量日志更新 | 数据更改日志更新中的记录 |
---|---|---|---|
0 | INSERT | AddFile(/path/to/file-1, dataChange = true) | (name = Viktor, age = 29, (name = George, age = 55) |
1 | INSERT | AddFile(/path/to/file-2, dataChange = true) | (name = George, age = 39) |
2 | OPTIMIZE | AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) | (不会生成记录,因为“优化压缩”不会更改表中的数据) |
3 | RESTORE(version=1) | RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) | (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39) |
在上面的示例中,RESTORE
命令导致在读取增量表版本 0 和 1 时已经可以看到更新。 如果流式处理查询当时正在读取此表,则这些文件将被视为新添加的数据,因此会再次对其进行处理。
RESTORE
在操作完成后以单行数据帧的形式报告以下指标:
table_size_after_restore
:还原后表的大小。num_of_files_after_restore
:还原后表中的文件数。num_removed_files
:从表中删除(逻辑删除)的文件数。num_restored_files
:由于回退而还原的文件数。removed_files_size
:从表中删除的文件的总大小(以字节为单位)。restored_files_size
:已还原的文件的总大小(以字节为单位)。
为用户
111
修复对表的意外删除问题:INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111
修复对表的意外错误更新:
MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *
查询在过去一周内增加的新客户的数量。
SELECT count(distinct userId) FROM my_table - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
若要获取当前 SparkSession
在所有线程和所有表中写入的最后一个提交的版本号,请查询 SQL 配置 spark.databricks.delta.lastCommitVersionInSession
。
SET spark.databricks.delta.lastCommitVersionInSession
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")
如果 SparkSession
未进行任何提交,则查询该键时将返回空值。
备注
如果在多个线程之间共享相同的 SparkSession
,则类似于在多个线程之间共享变量;你可能会达到争用条件,因为配置值会同时更新。