共用方式為


使用 Delta Lake 數據表歷程記錄

修改 Delta Lake 數據表的每個作業都會建立新的數據表版本。 您可以使用歷程記錄資訊來稽核作業、復原數據表,或使用時間移動在特定時間點查詢數據表。

注意

Databricks 不建議使用 Delta Lake 資料表歷程記錄作為資料封存的長期備份解決方案。 除非您已將資料和記錄保留設定為較大的值,否則 Databricks 建議只使用過去 7 天進行時間移動作業。

擷取差異數據表歷程記錄

您可以執行 history 命令,以擷取資訊,包括每個寫入 Delta 資料表的作業、使用者和時間戳。 作業會以反向時間順序傳回。

數據表記錄保留是由數據表設定 delta.logRetentionDuration所決定,預設為30天。

注意

時間移動和資料表歷程記錄是由不同的保留期閾值所控制。 請參閱什麼是 Delta Lake 時間移動?

DESCRIBE HISTORY table_name       -- get the full history of the table

DESCRIBE HISTORY table_name LIMIT 1  -- get the last operation only

如需 Spark SQL 語法詳細資料,請參閱 DESCRIBE HISTORY

如需 Scala/Java/Python 語法詳細數據,請參閱 Delta Lake API 檔

目錄總 管提供這個詳細數據表資訊和 Delta 數據表記錄的可視化檢視。 除了數據表架構和範例數據之外,您也可以按兩下 [歷程記錄 ] 索引標籤來查看顯示資料表 DESCRIBE HISTORY記錄。

歷程記錄架構

作業的 history 輸出具有下列數據行。

資料行 類型​ 描述
版本 作業所產生的數據表版本。
時間戳 時間戳 認可此版本時。
使用者ID 字串 執行作業的使用者識別碼。
使用者名稱 字串 執行作業的用戶名稱。
作業 字串 作業名稱。
操作參數 地圖 工作的參數(例如述詞。)
工作 結構體 執行作業之作業的詳細數據。
筆記本 結構體 執行作業的筆記本詳細數據。
clusterId 字串 作業執行所在的叢集標識碼。
讀取版本 讀取以執行寫入作業的數據表版本。
隔離級別 字串 用於這項作業的隔離等級。
isBlindAppend 布爾值 此作業是否附加數據。
operationMetrics 地圖 作業的計量(例如,修改的數據列和檔案數目。)
使用者元資料 字串 若已指定使用者定義認可元數據,則為
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

注意

作業計量索引鍵

作業會 history 傳回數據行對應中的 operationMetrics 作業計量集合。

下表列出依作業的對應索引鍵定義。

作業 度量名稱 描述
寫入、CREATE TABLE AS SELECT、替換 TABLE AS SELECT、COPY INTO
numFiles 寫入的檔案數目。
numOutputBytes 寫入內容的位元組大小。
numOutputRows (輸出行數) 寫入的數據列數目。
串流 UPDATE
新增檔案數量 新增的檔案數目。
已刪除檔案數量 已移除的檔案數目。
numOutputRows 寫入的數據列數目。
numOutputBytes 以位元組為單位寫入的大小。
刪除
新增檔案數量 新增的檔案數目。 刪除資料表的資料分割時, 不會提供 。
已刪除檔案數量 已移除的檔案數目。
numDeletedRows 已移除的數據列數目。 刪除資料表的資料分割時, 不會提供 。
numCopiedRows 在刪除檔案的過程中複製的數據列數目。
執行時間毫秒 執行整個作業所花費的時間。
scanTimeMs 掃描檔案是否有相符專案所花費的時間。
rewriteTimeMs 重寫相符檔案所花費的時間。
截短
已刪除檔案數量 已移除的檔案數目。
執行時間毫秒 執行整個作業所花費的時間。
合併
numSourceRows 源數據框架中的數據列數目。
numTargetRowsInserted 插入目標數據表的數據列數目。
更新目標行數 目標數據表中更新的數據列數目。
刪除的目標列數 目標數據表中刪除的數據列數目。
目標行數複製數量 複製的目標數據列數目。
numOutputRows 寫出的數據列總數。
新增目標檔案數量 (numTargetFilesAdded) 新增至接收(target) 的檔案數目。
目標檔案移除數量 已從 sink(target) 移除的檔案數目。
執行時間毫秒 執行整個作業所花費的時間。
scanTimeMs 掃描檔案是否有相符專案所花費的時間。
rewriteTimeMs 重寫相符檔案所花費的時間。
UPDATE
新增檔案數量 新增的檔案數目。
已刪除檔案數量 已移除的檔案數目。
已更新行數 更新的數據列數目。
複製行數量 在更新檔案的過程中,剛複製的數據列數目。
執行時間毫秒 執行整個作業所花費的時間。
scanTimeMs 掃描檔案是否有相符專案所花費的時間。
rewriteTimeMs 重寫相符檔案所花費的時間。
FSCK(檔案系統一致性檢查) 已刪除檔案數量 已移除的檔案數目。
轉換 已轉換檔案數量 已轉換的 Parquet 檔案數目。
OPTIMIZE
新增檔案數量 新增的檔案數目。
已刪除檔案數量 優化檔案數目。
新增字節數 (numAddedBytes) 優化數據表之後新增的位元元組數目。
numRemovedBytes(移除的位元組數量) 已移除的位元元組數目。
最小檔案大小 優化數據表之後最小檔案的大小。
p25FileSize 優化數據表之後的第25個百分位數檔案大小。
p50FileSize 數據表優化后的檔案大小中位數。
p75檔案大小 優化數據表之後的第75個百分位數檔案大小。
最大檔案大小 優化數據表之後的最大檔案大小。
克隆
資料表大小 被複製版本的源表格大小(以位元組計算)。
檔案個數源 複製版本之源數據表中的檔案數目。
已刪除檔案數量 如果取代了先前的 Delta 數據表,則從目標數據表移除的檔案數目。
removedFilesSize 如果取代了先前的 Delta 數據表,則從目標數據表移除的檔案大小總計,以位元組為單位。
複製檔案數量 複製到新位置的檔案數目。 0 表示淺層複製。
已複製檔案大小 複製到新位置的檔案大小總計,以位元組為單位。 0 表示淺層複製。
RESTORE
還原後的表格大小 還原後以位元組為單位的數據表大小。
還原後的文件數量 (numOfFilesAfterRestore) 還原之後數據表中的檔案數目。
已刪除檔案數量 還原作業移除的檔案數目。
恢復的檔案數量 還原所新增的檔案數目。
removedFilesSize 還原移除的檔案位元組大小。
已復原檔案大小 還原所新增檔案的位元組大小。
VACUUM
已刪除文件數量 已刪除的檔案數目。
清理掉的目錄數量 清理目錄的數目。
要刪除的檔案數量 要刪除的檔案數目。

什麼是 Delta Lake 時間旅行?

Delta Lake time travel 支援根據時間戳或數據表版本查詢先前的數據表版本(如事務歷史記錄中所記錄)。 您可以針對應用程式使用時間移動,例如:

  • 重新建立分析、報告或輸出 (例如,機器學習模型的輸出)。 這可能適用於偵錯或稽核,特別是在受管制的產業中。
  • 撰寫複雜的時態性查詢。
  • 修正資料中的錯誤。
  • 為快速變更資料表的一組查詢提供快照集隔離。

重要

可透過時間移動存取的數據表版本是由事務歷史記錄檔的保留臨界值以及作業的頻率和指定的保留 VACUUM 時間所決定。 如果您使用預設值每天執行 VACUUM ,7 天的數據可供時間移動。

Delta 時間移動語法

您可以藉由在數據表名稱規格後面新增 子句,以查詢具有時間移動的 Delta 資料表。

  • timestamp_expression 可以是下列任一項:
    • '2018-10-18T22:15:12.013Z',也就是可以轉換成時間戳的字串
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18',也就是日期字串
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • 任何其他可轉換成時間戳的表達式
  • version 是一個長值,可從的 DESCRIBE HISTORY table_spec輸出取得。

timestamp_expressionversion都不能是子查詢。

只接受日期或時間戳字串。 例如,"2019-01-01""2019-01-01T00:00:00.000Z"。 如需範例語法,請參閱下列程序代碼:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z';
SELECT * FROM people10m VERSION AS OF 123;

Python(程式語言)

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).table("people10m")

您也可以使用 @ 語法來指定時間戳或版本做為資料表名稱的一部分。 時間戳的格式必須為 yyyyMMddHHmmssSSS 。 您可以在 之後 @ 指定版本,方法是在 版本前面加上 v 。 如需範例語法,請參閱下列程序代碼:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python(程式語言)

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

什麼是事務歷史記錄檢查點?

Delta Lake 會將數據表版本記錄為目錄內的 _delta_log JSON 檔案,其會與數據表數據一起儲存。 為了優化檢查點查詢,Delta Lake 會將數據表版本匯總至 Parquet 檢查點檔案,以避免讀取數據表歷程記錄的所有 JSON 版本。 Azure Databricks 會將數據大小和工作負載的檢查點頻率優化。 使用者不需要直接與檢查點互動。 檢查點頻率可能會變更而不通知。

設定時間旅行查詢的數據保留

若要查詢先前的數據表版本,您必須保留該版本的記錄檔和數據檔。

針對資料表執行時 VACUUM ,會刪除資料檔。 Delta Lake 會在檢查點數據表版本之後自動管理記錄檔移除。

由於大部分的差異數據表會 VACUUM 定期針對它們執行,因此時間點查詢應該遵守的保留閾值 VACUUM,預設為 7 天。

若要增加 Delta 資料表的數據保留閾值,您必須設定下表屬性:

  • delta.logRetentionDuration = "interval <interval>":控制保留數據表記錄的時間長度。 預設值為 interval 30 days
  • delta.deletedFileRetentionDuration = "interval <interval>":決定臨界值 VACUUM 用來移除目前數據表版本中不再參考的數據檔。 預設值為 interval 7 days

您可以在資料表建立期間指定 Delta 屬性,或使用 語句加以設定 ALTER TABLE 。 請參閱差異資料表屬性參考

注意

您必須設定這兩個屬性,以確保數據表記錄會保留較長的持續時間,以供經常作業的 VACUUM 數據表使用。 例如,若要存取 30 天的歷史數據,請設定 delta.deletedFileRetentionDuration = "interval 30 days" (這符合的預設 delta.logRetentionDuration設定)。

增加數據保留閾值可能會導致記憶體成本增加,因為會維護更多數據檔。

將 Delta 數據表還原至先前的狀態

您可以使用 命令,將 Delta 數據表還原到其先前的狀態 RESTORE 。 Delta 資料表會在內部維護資料表的歷史版本,讓資料表還原至先前的狀態。 與早期狀態相對應的版本,或建立早期狀態的時間戳記,會作為選項由 RESTORE 命令支援。

重要

  • 您可以還原已還原的數據表。
  • 您可以還原複製的資料表。
  • 您必須擁有 MODIFY 正在還原之數據表的許可權。
  • 您無法將數據表還原至舊版,其中數據檔已手動或由 vacuum刪除。 如果 spark.sql.files.ignoreMissingFiles 設定 true為 ,仍然可以部分還原至此版本。
  • 回復到先前狀態的時間戳格式為 yyyy-MM-dd HH:mm:ss。 也僅支援提供 date(yyyy-MM-dd) 字串。
RESTORE TABLE target_table TO VERSION AS OF <version>;
RESTORE TABLE target_table TO TIMESTAMP AS OF <timestamp>;

如需語法詳細數據,請參閱 RESTORE

重要

還原會被視為數據變更作業。 命令新增的 RESTORE Delta Lake 記錄專案包含 dataChange 設定為 true。 如果有下游應用程式,例如 處理 Delta Lake 資料表更新的結構化串流 作業,還原作業所新增的數據變更記錄專案會被視為新的資料更新,而且處理它們可能會導致重複的數據。

例如:

數據表版本 作業 差異記錄更新 數據變更記錄更新中的記錄
0 INSERT AddFile(/path/to/file-1, dataChange = true) (name = Viktor, age = 29, (name = George, age = 55)
1 INSERT AddFile(/path/to/file-2, dataChange = true) (name = George, age = 39)
2 OPTIMIZE AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (沒有記錄,因為優化壓縮不會變更數據表中的數據)
3 RESTORE(版本=1) RemoveFile(/path/to/file-3)、AddFile(/path/to/file-1、dataChange = true)、AddFile(/path/to/file-2、dataChange = true) (name = Viktor, age = 29), (name = George, age = 55), (name = George, age = 39)

在上述範例中 RESTORE ,命令會導致讀取 Delta 數據表第 0 版和 1 時已看到更新。 如果串流查詢正在讀取此數據表,則這些檔案會被視為新加入的數據,並會再次處理。

還原計量

RESTORE 在作業完成之後,會將下列計量報告為單一數據列 DataFrame:

  • table_size_after_restore:還原之後數據表的大小。

  • num_of_files_after_restore:還原之後數據表中的檔案數目。

  • num_removed_files:已從資料表中移除的檔案數目(邏輯刪除)。

  • num_restored_files:由於復原而還原的檔案數目。

  • removed_files_size:從數據表中移除之檔案的大小總計,以位元組為單位。

  • restored_files_size:還原之檔案的大小總計,以位元組為單位。

    還原計量範例

使用 Delta Lake 時間移動的範例

  • 修正使用者 111意外刪除資料表:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • 修正資料表意外不正確的更新:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • 查詢上周新增的新客戶數目。

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

如何在Spark會話中找到最後一次提交的版本?

若要取得目前 SparkSession 在所有線程和所有資料表中寫入的最後一個認可版本號碼,請查詢 SQL 組態 spark.databricks.delta.lastCommitVersionInSession

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python(程式語言)

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

程式語言 Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

如果 尚未進行 SparkSession任何認可,則查詢索引鍵會傳回空值。

注意

如果您在多個線程之間共用相同的 SparkSession,這就像是在多個線程之間共用變數,這可能會導致當配置值同時更新時出現競爭條件(race condition)。