在 Azure Databricks 上使用 Delta Lake 變更數據摘要
注意
- 本文說明如何使用變更數據摘要功能來記錄和查詢差異數據表的數據列層級變更資訊。 若要瞭解如何根據源數據中的變更來更新 Delta Live Tables 管線中的數據表,請參閱 APPLY CHANGES API:簡化差異實時數據表中的異動數據擷取。
變更數據摘要可讓 Azure Databricks 追蹤差異數據表版本之間的數據列層級變更。 在 Delta 資料表上啟用時,執行時間會記錄 寫入數據表中所有資料的變更事件 。 這包括數據列數據以及元數據,指出指定的數據列是否已插入、刪除或更新。
您可以使用Spark SQL、Apache Spark DataFrame 和結構化串流,在批次查詢中讀取變更事件。
重要
變更數據摘要會與數據表歷程記錄一起運作,以提供變更資訊。 因為複製 Delta 數據表會建立個別的歷程記錄,因此複製數據表上的變更數據摘要與原始數據表的記錄不符。
使用案例
預設不會啟用變更數據摘要。 當您啟用變更數據摘要時,下列使用案例應該會驅動。
- Silver 和 Gold 數據表:只處理初始
MERGE
、UPDATE
或DELETE
作業之後的數據列層級變更,以加速和簡化 ETL 和 ELT 作業,以改善 Delta Lake 效能。 - 具體化檢視:建立最新的匯總信息檢視,以用於 BI 和分析,而不需要重新處理完整的基礎表,而是只更新變更已通過的地方。
- 傳輸變更:將變更數據摘要傳送至下游系統,例如 Kafka 或 RDBMS,以累加方式處理數據管線的後續階段。
- 稽核記錄數據表:以 Delta 數據表的形式擷取變更數據摘要,可提供永久記憶體和有效率的查詢功能,以查看一段時間的所有變更,包括何時發生刪除,以及進行哪些更新。
啟用變更數據摘要
您必須使用下列其中一種方法,明確啟用變更資料摘要選項:
新增資料表:在 命令中
CREATE TABLE
設定資料表屬性delta.enableChangeDataFeed = true
。CREATE TABLE student (id INT, name STRING, age INT) TBLPROPERTIES (delta.enableChangeDataFeed = true)
現有資料表:在 命令中
ALTER TABLE
設定資料表屬性delta.enableChangeDataFeed = true
。ALTER TABLE myDeltaTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
所有新的資料表:
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;
重要
只會記錄啟用變更數據摘要之後所做的變更;不會擷取數據表的過去變更。
變更數據記憶體
Azure Databricks 會記錄資料表目錄下資料夾中的、 DELETE
和 MERGE
作業_change_data
變更資料UPDATE
。 某些作業,例如僅插入作業和完整分割區刪除,不會在目錄中產生數據 _change_data
,因為 Azure Databricks 可以直接從事務歷史記錄中有效地計算變更數據摘要。
資料夾中的 _change_data
檔案會遵循數據表的保留原則。 因此,如果您執行 VACUUM 命令,也會刪除變更資料摘要數據。
讀取批次查詢中的變更
您可以提供開始和結束的版本或時間戳。 查詢中包含開始和結束版本和時間戳。 若要讀取從特定起始版本到 最新 版數據表的變更,請只指定起始版本或時間戳。
您可以將版本指定為整數,並以 格式 yyyy-MM-dd[ HH:mm:ss[.SSS]]
指定時間戳做為字串。
如果您提供的版本較低或時間戳早於記錄變更事件的版本,也就是啟用變更數據摘要時,就會擲回錯誤,指出變更數據摘要未啟用。
SQL
-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)
-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2021-05-21 12:00:00')
-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)
-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2021-04-21 06:45:46' , '2021-05-21 12:00:00')
-- path based tables
SELECT * FROM table_changes_by_path('\path', '2021-04-21 05:45:46')
Python
# version as ints or longs
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.option("endingVersion", 10) \
.table("myDeltaTable")
# timestamps as formatted timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.option("endingTimestamp", '2021-05-21 12:00:00') \
.table("myDeltaTable")
# providing only the startingVersion/timestamp
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# path based tables
spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-04-21 05:45:46') \
.load("pathToMyDeltaTable")
Scala
// version as ints or longs
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 10)
.table("myDeltaTable")
// timestamps as formatted timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.option("endingTimestamp", "2021-05-21 12:00:00")
.table("myDeltaTable")
// providing only the startingVersion/timestamp
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// path based tables
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingTimestamp", "2021-04-21 05:45:46")
.load("pathToMyDeltaTable")
讀取串流查詢中的變更
Python
# providing a starting version
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 0) \
.table("myDeltaTable")
# providing a starting timestamp
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2021-04-21 05:35:43") \
.load("/pathToMyDeltaTable")
# not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("myDeltaTable")
Scala
// providing a starting version
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("myDeltaTable")
// providing a starting timestamp
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", "2021-04-21 05:35:43")
.load("/pathToMyDeltaTable")
// not providing a starting version/timestamp will result in the latest snapshot being fetched first
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.table("myDeltaTable")
若要在讀取資料表時取得變更資料,請將 選項 readChangeFeed
設定為 true
。
或 startingVersion
startingTimestamp
是選擇性的,如果未提供數據流,則會在串流時傳回數據表的最新快照集, INSERT
而未來變更為變更數據。
讀取變更數據時也支援速率限制等maxFilesPerTrigger
maxBytesPerTrigger
excludeRegex
選項。
注意
啟動快照集版本以外的版本,速率限制可以是不可部分完成的。 也就是說,整個認可版本會受到速率限制,否則會傳回整個認可。
根據預設,如果使用者傳入的版本或時間戳超過數據表上最後一個認可,就會擲回錯誤 timestampGreaterThanLatestCommit
。 在 Databricks Runtime 11.3 LTS 和更新版本中,如果使用者將下列 true
設定設為 ,變更數據摘要可以處理超出範圍的版本案例:
set spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled = true;
如果您提供的開始版本大於數據表的最後一個認可,或比數據表上最後一次認可還新的開始時間戳,則啟用上述設定時,會傳回空的讀取結果。
如果您提供大於數據表上最後一次認可的結束版本,或比數據表上最後一次認可還新的結束時間戳,則在批次讀取模式中啟用上述設定時,會傳回開始版本與最後一個認可之間的所有變更。
變更數據摘要的架構為何?
當您從數據表的變更數據摘要讀取時,會使用最新數據表版本的架構。
注意
大部分的架構變更和演進作業都完全受到支援。 已啟用數據行對應的數據表不支援所有使用案例,並示範不同的行為。 請參閱 變更已啟用數據行對應之數據表的數據摘要限制。
除了 Delta 資料表架構中的數據行之外,變更數據摘要還包含可識別變更事件類型的元數據行:
資料行名稱 | 類型 | 值 |
---|---|---|
_change_type |
String | insert 、 、 update_preimage 、 delete update_postimage (1) |
_commit_version |
Long | 包含變更的 Delta 記錄檔或數據表版本。 |
_commit_timestamp |
時間戳記 | 建立認可時相關聯的時間戳。 |
(1)preimage
是更新前的值, postimage
是更新之後的值。
注意
如果架構包含與這些新增數據行名稱相同的數據行,則您無法在數據表上啟用變更數據摘要。 在嘗試啟用變更數據摘要之前,重新命名數據表中的數據行,以解決此衝突。
變更已啟用數據行對應之數據表的數據摘要限制
在 Delta 資料表上啟用資料行對應時,您可以卸載或重新命名數據表中的數據行,而不需重寫現有數據的數據檔。 啟用數據行對應之後,變更數據摘要在執行非加總架構變更之後有限制,例如重新命名或卸除數據行、變更數據類型或可 Null 性變更。
重要
- 您無法讀取使用批次語意發生非加總架構變更的交易或範圍變更數據摘要。
- 在 Databricks Runtime 12.2 LTS 和以下的數據表中,已啟用數據行對應且發生非加總架構變更的數據表不支援變更數據摘要上的串流讀取。 請參閱使用資料行對應和結構描述變更進行串流。
- 在 Databricks Runtime 11.3 LTS 中,您無法讀取已啟用數據行重新命名或卸除之數據行對應之數據表的變更數據摘要。
在 Databricks Runtime 12.2 LTS 和更新版本中,您可以針對已啟用數據行對應且發生非加總架構變更的數據表,對變更數據摘要執行批次讀取。 讀取作業會使用查詢中指定的數據表結束版本架構,而不是使用最新版數據表的架構。 如果指定的版本範圍跨越非加總架構變更,查詢仍會失敗。
常見問題集 (FAQ)
啟用變更數據摘要的額外負荷為何?
沒有任何重大影響。 變更數據記錄會在查詢執行程式期間以一行方式產生,而且通常小於重寫檔案的大小總計。
變更記錄的保留原則為何?
變更記錄會遵循與過期數據表版本相同的保留原則,如果記錄超出指定的保留期限,則會透過 VACUUM 加以清除。
變更數據摘要中的新記錄何時可供使用?
變更數據會與 Delta Lake 交易一起認可,而且會在數據表中提供新數據的同時變成可用。
筆記本範例:使用差異變更數據摘要傳播變更
此筆記本示範如何將對絕對疫苗接種數量的銀表所做的變更傳播到疫苗接種率的黃金表。