共用方式為


使用 Delta Lake 數據表歷程記錄

修改 Delta Lake 數據表的每個作業都會建立新的數據表版本。 您可以使用歷程記錄資訊來稽核作業、復原數據表,或使用時間移動在特定時間點查詢數據表。

注意

Databricks 不建議使用 Delta Lake 資料表歷程記錄作為資料封存的長期備份解決方案。 除非您已將資料和記錄保留設定為較大的值,否則 Databricks 建議只使用過去 7 天進行時間移動作業。

擷取差異數據表歷程記錄

您可以執行 history 命令,以擷取資訊,包括每個寫入 Delta 資料表的作業、使用者和時間戳。 作業會以反向時間順序傳回。

數據表記錄保留是由數據表設定 delta.logRetentionDuration所決定,預設為30天。

注意

時間移動和資料表歷程記錄是由不同的保留期閾值所控制。 請參閱什麼是 Delta Lake 時間移動?

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

如需Spark SQL語法詳細數據,請參閱 DESCRIBE HISTORY

如需 Scala/Java/Python 語法詳細數據,請參閱 Delta Lake API 檔

目錄總 管提供這個詳細數據表資訊和 Delta 數據表記錄的可視化檢視。 除了數據表架構和範例數據之外,您也可以按兩下 [歷程記錄 ] 索引標籤來查看顯示資料表 DESCRIBE HISTORY記錄。

歷程記錄架構

作業的 history 輸出具有下列數據行。

資料行 類型​ 描述
version long 作業所產生的數據表版本。
timestamp timestamp 認可此版本時。
userId 字串 執行作業的使用者識別碼。
userName 字串 執行作業的用戶名稱。
作業 字串 作業名稱。
operationParameters map 工作的參數(例如述詞。)
job struct 執行作業之作業的詳細數據。
notebook struct 執行作業的筆記本詳細數據。
clusterId 字串 作業執行所在的叢集標識碼。
readVersion long 讀取以執行寫入作業的數據表版本。
isolationLevel 字串 用於這項作業的隔離等級。
isBlindAppend boolean 此作業是否附加數據。
operationMetrics map 作業的計量(例如,修改的數據列和檔案數目。)
userMetadata 字串 若已指定使用者定義認可元數據,則為
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

注意

作業計量索引鍵

作業會 history 傳回數據行對應中的 operationMetrics 作業計量集合。

下表列出依作業的對應索引鍵定義。

作業 度量名稱 描述
WRITE、CREATE TABLE AS SELECT、REPLACE TABLE AS SELECT、COPY INTO
numFiles 寫入的檔案數目。
numOutputBytes 寫入內容的位元組大小。
numOutputRows 寫入的數據列數目。
STREAMING UPDATE
numAddedFiles 新增的檔案數目。
numRemovedFiles 已移除的檔案數目。
numOutputRows 寫入的數據列數目。
numOutputBytes 以位元組為單位寫入的大小。
DELETE
numAddedFiles 新增的檔案數目。 刪除資料表的資料分割時, 不會提供 。
numRemovedFiles 已移除的檔案數目。
numDeletedRows 已移除的數據列數目。 刪除資料表的資料分割時, 不會提供 。
numCopiedRows 在刪除檔案的過程中複製的數據列數目。
executionTimeMs 執行整個作業所花費的時間。
scanTimeMs 掃描檔案是否有相符專案所花費的時間。
rewriteTimeMs 重寫相符檔案所花費的時間。
TRUNCATE
numRemovedFiles 已移除的檔案數目。
executionTimeMs 執行整個作業所花費的時間。
MERGE
numSourceRows 源數據框架中的數據列數目。
numTargetRowsInserted 插入目標數據表的數據列數目。
numTargetRowsUpdated 目標數據表中更新的數據列數目。
numTargetRowsDeleted 目標數據表中刪除的數據列數目。
numTargetRowsCopied 複製的目標數據列數目。
numOutputRows 寫出的數據列總數。
numTargetFilesAdded 新增至接收(target) 的檔案數目。
numTargetFilesRemoved 已從 sink(target) 移除的檔案數目。
executionTimeMs 執行整個作業所花費的時間。
scanTimeMs 掃描檔案是否有相符專案所花費的時間。
rewriteTimeMs 重寫相符檔案所花費的時間。
UPDATE
numAddedFiles 新增的檔案數目。
numRemovedFiles 已移除的檔案數目。
numUpdatedRows 更新的數據列數目。
numCopiedRows 在更新檔案的過程中,剛複製的數據列數目。
executionTimeMs 執行整個作業所花費的時間。
scanTimeMs 掃描檔案是否有相符專案所花費的時間。
rewriteTimeMs 重寫相符檔案所花費的時間。
Fsck numRemovedFiles 已移除的檔案數目。
CONVERT numConvertedFiles 已轉換的 Parquet 檔案數目。
最佳化
numAddedFiles 新增的檔案數目。
numRemovedFiles 優化檔案數目。
numAddedBytes 優化數據表之後新增的位元元組數目。
numRemovedBytes 已移除的位元元組數目。
minFileSize 優化數據表之後最小檔案的大小。
p25FileSize 優化數據表之後的第25個百分位數檔案大小。
p50FileSize 數據表優化后的檔案大小中位數。
p75FileSize 優化數據表之後的第75個百分位數檔案大小。
maxFileSize 優化數據表之後的最大檔案大小。
克隆
sourceTableSize 複製版本之源數據表的位元組大小。
sourceNumOfFiles 複製版本之源數據表中的檔案數目。
numRemovedFiles 如果取代了先前的 Delta 數據表,則從目標數據表移除的檔案數目。
removedFilesSize 如果取代了先前的 Delta 數據表,則從目標數據表移除的檔案大小總計,以位元組為單位。
numCopiedFiles 複製到新位置的檔案數目。 0 表示淺層複製。
copiedFilesSize 複製到新位置的檔案大小總計,以位元組為單位。 0 表示淺層複製。
RESTORE
tableSizeAfterRestore 還原後以位元組為單位的數據表大小。
numOfFilesAfterRestore 還原之後數據表中的檔案數目。
numRemovedFiles 還原作業移除的檔案數目。
numRestoredFiles 還原所新增的檔案數目。
removedFilesSize 還原移除的檔案位元組大小。
restoredFilesSize 還原所新增檔案的位元組大小。
真空
numDeletedFiles 已刪除的檔案數目。
numVacuumedDirectories 清理目錄的數目。
numFilesToDelete 要刪除的檔案數目。

什麼是 Delta Lake 時間旅行?

Delta Lake time travel 支援根據時間戳或數據表版本查詢先前的數據表版本(如事務歷史記錄中所記錄)。 您可以針對應用程式使用時間移動,例如:

  • 重新建立分析、報告或輸出 (例如,機器學習模型的輸出)。 這可能適用於偵錯或稽核,特別是在受管制的產業中。
  • 撰寫複雜的時態性查詢。
  • 修正資料中的錯誤。
  • 為快速變更資料表的一組查詢提供快照集隔離。

重要

可透過時間移動存取的數據表版本是由事務歷史記錄檔的保留臨界值以及作業的頻率和指定的保留 VACUUM 時間所決定。 如果您使用預設值每天執行 VACUUM ,7 天的數據可供時間移動。

Delta 時間移動語法

您可以藉由在數據表名稱規格後面新增 子句,以查詢具有時間移動的 Delta 資料表。

  • timestamp_expression 可以是下列任一項:
    • '2018-10-18T22:15:12.013Z',也就是可以轉換成時間戳的字串
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',也就是日期字串
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • 任何其他可轉換成時間戳的表達式
  • version 是一個長值,可從的 DESCRIBE HISTORY table_spec輸出取得。

versiontimestamp_expression都不能是子查詢。

只接受日期或時間戳字串。 例如,"2019-01-01""2019-01-01T00:00:00.000Z"。 如需範例語法,請參閱下列程序代碼:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

您也可以使用 @ 語法來指定時間戳或版本做為資料表名稱的一部分。 時間戳的格式必須為 yyyyMMddHHmmssSSS 。 您可以在 之後 @ 指定版本,方法是在 版本前面加上 v 。 如需範例語法,請參閱下列程序代碼:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

什麼是事務歷史記錄檢查點?

Delta Lake 會將數據表版本記錄為目錄內的 _delta_log JSON 檔案,其會與數據表數據一起儲存。 為了優化檢查點查詢,Delta Lake 會將數據表版本匯總至 Parquet 檢查點檔案,以避免讀取數據表歷程記錄的所有 JSON 版本。 Azure Databricks 會將數據大小和工作負載的檢查點頻率優化。 使用者不需要直接與檢查點互動。 檢查點頻率可能會變更而不通知。

設定時間旅行查詢的數據保留

若要查詢先前的數據表版本,您必須保留該版本的記錄檔和數據檔。

針對資料表執行時 VACUUM ,會刪除資料檔。 Delta Lake 會在檢查點數據表版本之後自動管理記錄檔移除。

由於大部分的差異數據表會 VACUUM 定期針對它們執行,因此時間點查詢應該遵守的保留閾值 VACUUM,預設為 7 天。

若要增加 Delta 資料表的數據保留閾值,您必須設定下表屬性:

  • delta.logRetentionDuration = "interval <interval>":控制保留數據表記錄的時間長度。 預設值為 interval 30 days
  • delta.deletedFileRetentionDuration = "interval <interval>":決定臨界值 VACUUM 用來移除目前數據表版本中不再參考的數據檔。 預設值為 interval 7 days

您可以在資料表建立期間指定 Delta 屬性,或使用 語句加以設定 ALTER TABLE 。 請參閱 Delta 資料表屬性參考

注意

您必須設定這兩個屬性,以確保數據表記錄會保留較長的持續時間,以供經常作業的 VACUUM 數據表使用。 例如,若要存取 30 天的歷史數據,請設定 delta.deletedFileRetentionDuration = "interval 30 days" (這符合的預設 delta.logRetentionDuration設定)。

增加數據保留閾值可能會導致記憶體成本增加,因為會維護更多數據檔。

將 Delta 數據表還原至先前的狀態

您可以使用 命令,將 Delta 數據表還原到其先前的狀態 RESTORE 。 Delta 資料表會在內部維護資料表的歷史版本,讓資料表還原至先前的狀態。 與早期狀態相對應的版本,或建立早期狀態的時間戳記,會作為選項由 RESTORE 命令支援。

重要

  • 您可以還原已還原的數據表。
  • 您可以還原複製的資料表。
  • 您必須擁有 MODIFY 正在還原之數據表的許可權。
  • 您無法將數據表還原至舊版,其中數據檔已手動或由 vacuum刪除。 如果 spark.sql.files.ignoreMissingFiles 設定 true為 ,仍然可以部分還原至此版本。
  • 回復到先前狀態的時間戳格式為 yyyy-MM-dd HH:mm:ss。 也僅支援提供 date(yyyy-MM-dd) 字串。
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

如需語法詳細數據,請參閱 RESTORE

重要

還原會被視為數據變更作業。 命令新增的 RESTORE Delta Lake 記錄專案包含 dataChange 設定為 true。 如果有下游應用程式,例如 處理 Delta Lake 資料表更新的結構化串流 作業,還原作業所新增的數據變更記錄專案會被視為新的資料更新,而且處理它們可能會導致重複的數據。

例如:

數據表版本 作業 差異記錄更新 數據變更記錄更新中的記錄
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 最佳化 AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (沒有記錄,因為優化壓縮不會變更數據表中的數據)
3 RESTORE(version=1) RemoveFile(/path/to/file-3)、AddFile(/path/to/file-1、dataChange = true)、AddFile(/path/to/file-2、dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

在上述範例中 RESTORE ,命令會導致讀取 Delta 數據表第 0 版和 1 時已看到更新。 如果串流查詢正在讀取此數據表,則這些檔案會被視為新加入的數據,並會再次處理。

還原計量

RESTORE 在作業完成之後,會將下列計量報告為單一數據列 DataFrame:

  • table_size_after_restore:還原之後數據表的大小。

  • num_of_files_after_restore:還原之後數據表中的檔案數目。

  • num_removed_files:已從資料表中移除的檔案數目(邏輯刪除)。

  • num_restored_files:由於復原而還原的檔案數目。

  • removed_files_size:從數據表中移除之檔案的大小總計,以位元組為單位。

  • restored_files_size:還原之檔案的大小總計,以位元組為單位。

    還原計量範例

使用 Delta Lake 時間移動的範例

  • 修正使用者 111意外刪除資料表:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • 修正資料表意外不正確的更新:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 查詢上周新增的新客戶數目。

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

如何? 在Spark工作階段中尋找最後一個認可的版本嗎?

若要取得目前 SparkSession 在所有線程和所有資料表中寫入的最後一個認可版本號碼,請查詢 SQL 組態 spark.databricks.delta.lastCommitVersionInSession

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

如果 尚未進行 SparkSession任何認可,則查詢索引鍵會傳回空值。

注意

如果您在多個線程之間共用相同 SparkSession ,它類似於在多個線程之間共用變數;您可能會在組態值同時更新時達到競爭條件。