Lakeflow Spark 宣告式管線透過 AUTO CDC 和 AUTO CDC FROM SNAPSHOT API 簡化了變更資料擷取(CDC)。 這些 API 自動化計算來自 CDC 資料流或資料庫快照的類型 1 和類型 2 的緩慢變化維度(SCD)複雜度。 想了解更多這些概念,請參閱 變更資料擷取與快照。
備註
API AUTO CDC 取代了 APPLY CHANGES API,語法相同。
APPLY CHANGES API 仍然可用,但 Databricks 建議使用 AUTO CDC API 取而代之。
你使用的 API 取決於變更資料的來源:
-
AUTO CDC:當來源資料庫啟用 CDC 訂閱時,請使用此方法。AUTO CDC處理變更資料流(CDF)中的變更。 它同時支援於管線 SQL 與 Python 介面中。 -
AUTO CDC FROM SNAPSHOT:當來源資料庫未啟用 CDC,且僅有快照時,請使用此方法。 此 API 會比較快照以判斷變更,然後進行處理。 它僅支援於 Python 介面中。
這兩個 API 都支援使用 SCD 類型 1 和類型 2 來更新資料表:
- 使用 SCD Type 1 直接更新紀錄。 更新記錄不會保存歷史。
- 使用 SCD 類型 2 來保留記錄歷史,無論是所有更新還是指定欄位集合的更新。
Apache Spark 宣告式管線不支援這些 AUTO CDC API。
關於語法及其他參考,請參見 AUTO CDC INTO (pipelines)、create_auto_cdc_flow 和 create_auto_cdc_from_snapshot_flow。
備註
本頁說明如何根據來源資料的變更更新管線中的資料表。 若要瞭解如何記錄和查詢 Delta 資料表的資料列層級變更資訊,請參閱在 Azure Databricks 上使用 Delta Lake 變更資料摘要。
需求
若要使用 CDC API,您的管線必須設定為使用 無伺服器 SDP 或 SDP Pro 或 Advanced版本。
AUTO CDC 的運作方式
要使用 AUTO CDC 來執行 CDC 處理,建立串流資料表,然後使用 SQL 中的 AUTO CDC ... INTO 陳述式或 Python 函式 create_auto_cdc_flow() 來指定變更串流的來源、鍵與序列。 關於定序與 SCD 邏輯的說明,請參見 變更資料擷取與快照。 請參考 AUTO CDC 的範例。
若從有變更提要的來源初始水合,請先用AUTO CDConce流,然後繼續處理變更提要。 請參考 使用 AUTO CDC 複製外部 RDBMS 資料表。
語法細節請參見 AUTO CDC INTO (pipelines) 或 create_auto_cdc_flow。
快照生成的自動CDC工作原理
AUTO CDC FROM SNAPSHOT 透過比較順序快照來判斷來源資料的變更。 它僅支援於 Python 管線介面中。 你可以直接從 Delta 表格、雲端儲存檔案或 JDBC 讀取快照。
要使用 AUTO CDC FROM SNAPSHOT 進行 CDC 處理,請建立串流表,然後使用 create_auto_cdc_from_snapshot_flow() 函式指定快照、鍵值及其他參數。 關於兩種資料擷取模式及何時使用,請參見 快照處理模式。 請查看 AUTO CDC FROM SNAPSHOT 範例。
語法細節請參見 create_auto_cdc_from_snapshot_flow。
使用多欄進行定序
若要依多個欄位排序(例如使用時間戳記和識別碼分辨前後順序),請使用 STRUCT 將它們合併。 API 會先依據第一個欄位排序,若出現平手則考慮第二個欄位,依此類推。
SQL
SEQUENCE BY STRUCT(timestamp_col, id_col)
Python
sequence_by = struct("timestamp_col", "id_col")
AUTO CDC 範例
以下範例展示了利用變更資料來源進行 SCD Type 1 與 Type 2 的處理。 範例資料會建立新的使用者紀錄、刪除使用者記錄,並更新使用者紀錄。 在 SCD Type 1 範例中,最後 UPDATE 的操作會延遲到達,並從目標表中移除,顯示出事件處理順序錯亂。
以下是這些範例中使用的輸入記錄。 這些資料是透過在 「建立樣本資料 」區塊執行查詢而產生的。
| userId | 名稱 | city | 作業 | 序列編號 |
|---|---|---|---|---|
| 124 | 勞爾 | 瓦哈卡州 | INSERT | 1 |
| 123 | Isabel | 蒙特雷 | INSERT | 1 |
| 125 | 梅賽德斯 | 蒂華納 | INSERT | 2 |
| 126 | 莉莉 | 坎昆 | INSERT | 2 |
| 123 | null | null | 刪除 | 6 |
| 125 | 梅賽德斯 | Guadalajara | UPDATE | 6 |
| 125 | 梅賽德斯 | 墨西卡利 | UPDATE | 5 |
| 123 | Isabel | Chihuahua | UPDATE | 5 |
如果你在樣本資料生成查詢中取消註解最後一行,它會插入以下記錄,指定要在sequenceNum=3截斷該資料表(清除該資料表)。
| userId | 名稱 | city | 作業 | 序列編號 |
|---|---|---|---|---|
| null | null | null | 截斷 | 3 |
備註
下列所有範例都包含指定DELETE和TRUNCATE作業的選項,但每個選項都是可選的。
建立範例資料
執行以下語句以建立範例資料集。 此程式碼並非設計用於管線定義的一部分。 從管線的探索資料夾執行,而不是從 transformations 資料夾裡執行。
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
col1 AS userId,
col2 AS name,
col3 AS city,
col4 AS operation,
col5 AS sequenceNum
FROM (
VALUES
-- Initial load.
(124, "Raul", "Oaxaca", "INSERT", 1),
(123, "Isabel", "Monterrey", "INSERT", 1),
-- New users.
(125, "Mercedes", "Tijuana", "INSERT", 2),
(126, "Lily", "Cancun", "INSERT", 2),
-- Isabel is removed from the system and Mercedes moved to Guadalajara.
(123, null, null, "DELETE", 6),
(125, "Mercedes", "Guadalajara", "UPDATE", 6),
-- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
(125, "Mercedes", "Mexicali", "UPDATE", 5),
(123, "Isabel", "Chihuahua", "UPDATE", 5)
-- Uncomment to test TRUNCATE.
-- ,(null, null, null, "TRUNCATE", 3)
);
程序 SCD 類型 1 更新
SCD Type 1 只保留每筆紀錄的最新版本。 以下範例是從上述建立的變更資料串流中讀取,並將變更套用到串流資料表目標上。 開發 Lakeflow Spark 宣告式管線 來執行此程式碼。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_current")
dp.create_auto_cdc_flow(
target = "users_current",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
apply_as_truncates = expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = 1
)
SQL
CREATE OR REFRESH STREAMING TABLE users_current;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_current
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
APPLY AS TRUNCATE WHEN
operation = "TRUNCATE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 1;
執行 SCD 類型 1 範例後,目標資料表包含以下記錄:
| userId | 名稱 | city |
|---|---|---|
| 124 | 勞爾 | 瓦哈卡州 |
| 125 | 梅賽德斯 | Guadalajara |
| 126 | 莉莉 | 坎昆 |
用戶 123(Isabel)已被刪除,未出現。 使用者 125(Mercedes)只顯示最新的城市(瓜達拉哈拉),因為 SCD Type 1 會覆蓋先前的值。 之前 UPDATE 的 at sequenceNum=5 被刪除,因為後來有更新 at sequenceNum=6 。
在執行範例並確保 TRUNCATE 記錄未被註解後,該表在 sequenceNum=3 會被清除。 這表示記錄 124 和 126 不在表格中,最終目標表格僅包含以下記錄:
| userId | 名稱 | city |
|---|---|---|
| 125 | 梅賽德斯 | Guadalajara |
處理 SCD 型 2 更新
SCD 類型 2 透過為每個版本的記錄__START_AT__END_AT建立新列,並以欄位表示每個版本的啟用時間,從而保留完整的變更歷史。
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2"
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2;
執行 SCD Type 2 範例後,目標資料表包含以下記錄:
| userId | 名稱 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | 蒙特雷 | 1 | 5 |
| 123 | Isabel | Chihuahua | 5 | 6 |
| 124 | 勞爾 | 瓦哈卡州 | 1 | null |
| 125 | 梅賽德斯 | 蒂華納 | 2 | 5 |
| 125 | 梅賽德斯 | 墨西卡利 | 5 | 6 |
| 125 | 梅賽德斯 | Guadalajara | 6 | null |
| 126 | 莉莉 | 坎昆 | 2 | null |
這張桌子保存了完整的歷史。 使用者 123 有兩個版本(刪除時止於序列 6)。 用戶 125 有三個版本顯示城市變更。 目前相關 __END_AT = null 紀錄仍在運作中。
用 SCD 類型 2 追蹤欄位子集
預設情況下,SCD Type 2 會在任何欄位值變更時建立新版本。 你可以指定一部分欄位要追蹤,這樣其他欄位的變更會即時更新當前版本,而不是產生新的歷史紀錄。
以下範例排除該 city 欄位的歷史追蹤:
Python
from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr
@dp.view
def users():
return spark.readStream.table("main.cdc_tutorial.users_cdf")
dp.create_streaming_table("users_history")
dp.create_auto_cdc_flow(
target = "users_history",
source = "users",
keys = ["userId"],
sequence_by = col("sequenceNum"),
apply_as_deletes = expr("operation = 'DELETE'"),
except_column_list = ["operation", "sequenceNum"],
stored_as_scd_type = "2",
track_history_except_column_list = ["city"]
)
SQL
CREATE OR REFRESH STREAMING TABLE users_history;
CREATE FLOW apply_cdc AS AUTO CDC INTO
users_history
FROM
stream(main.cdc_tutorial.users_cdf)
KEYS
(userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY
sequenceNum
COLUMNS * EXCEPT
(operation, sequenceNum)
STORED AS
SCD TYPE 2
TRACK HISTORY ON * EXCEPT
(city)
因為 city 變更不會被追蹤,城市更新會覆蓋目前的列,而不是建立新版本。 目標表包含以下紀錄:
| userId | 名稱 | city | __START_AT | __END_AT |
|---|---|---|---|---|
| 123 | Isabel | Chihuahua | 1 | 6 |
| 124 | 勞爾 | 瓦哈卡州 | 1 | null |
| 125 | 梅賽德斯 | Guadalajara | 2 | null |
| 126 | 莉莉 | 坎昆 | 2 | null |
快照中的自動CDC範例
以下章節提供使用 AUTO CDC FROM SNAPSHOT 將快照處理至 SCD 類型 1 或類型 2 目標資料表的範例。 關於何時使用此 API 的背景,請參閱 變更資料擷取與快照。
範例:使用管線匯入時間的程序快照
當快照定期且有序地到達時,請使用此方法,且你可以依賴管線執行時間戳來進行版本管理。 每次管線更新時都會匯入一個新的快照。
你可以從多種來源類型讀取快照,包括 Delta 表格、雲端儲存檔案和 JDBC 連線。
步驟 1:建立範例資料
建立一個包含快照資料的資料表。 請從你的管線資料夾中的筆記本或 Databricks SQL explorations 執行以下程式碼:
CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;
CREATE TABLE main.cdc_tutorial.snapshot (
userId INT,
city STRING
);
INSERT INTO main.cdc_tutorial.snapshot VALUES
(1, 'Oaxaca'),
(2, 'Monterrey'),
(3, 'Tijuana');
步驟 2:從快照中運行自動 CDC
開發 Lakeflow Spark 宣告式管線 以執行此步驟的程式碼。
選擇快照視圖的來源類型(樣本建立程式碼會產生 Delta 表格):
選項A:從Delta表讀取
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.table("main.cdc_tutorial.snapshot")
選項B:從雲端儲存讀取資料
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return spark.read.format("csv").option("header", True).load("<snapshot-path>")
選項 C:從 JDBC 讀取(僅限經典運算)
from pyspark import pipelines as dp
@dp.view(name="source")
def source():
return (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
所有選項,寫入到目標
接著加入目標表,並設定流程:
dp.create_streaming_table("target")
dp.create_auto_cdc_from_snapshot_flow(
target = "target",
source = "source",
keys = ["userId"],
stored_as_scd_type = 2
)
在第一次管線執行後,所有紀錄都會以活動列的形式插入:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | 瓦哈卡州 | 0 | null |
| 2 | 蒙特雷 | 0 | null |
| 3 | 蒂華納 | 0 | null |
備註
若要改用 SCD Type 1 並只保留當前狀態,請設 stored_as_scd_type=1。 此時,目標資料表不包含 __START_AT 和 __END_AT 欄位。
步驟 3:模擬新的快照並重執行
更新來源資料表以模擬新的快照到達(從你的 pipeline 資料夾中的 explorations 筆記本或 SQL 檔案執行此程式碼):
TRUNCATE TABLE main.cdc_tutorial.snapshot;
INSERT INTO main.cdc_tutorial.snapshot VALUES
(2, 'Carmel'),
(3, 'Los Angeles'),
(4, 'Death Valley'),
(6, 'Kings Canyon');
再次執行資料處理流程。
AUTO CDC FROM SNAPSHOT 將新的快照與先前的快照比較,發現使用者 1 已被刪除,使用者 2 和 3 已更新,使用者 4 和 6 已插入。 這會產生變更資料流,並用 AUTO CDC 來建立輸出表。
在第二次使用 SCD Type 2 執行後,目標表包含以下記錄:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | 瓦哈卡州 | 0 | 1 |
| 2 | 蒙特雷 | 0 | 1 |
| 2 | 卡梅爾 | 1 | null |
| 3 | 蒂華納 | 0 | 1 |
| 3 | 洛杉磯 | 1 | null |
| 4 | 死亡谷 | 1 | null |
| 6 | 國王峽谷 | 1 | null |
使用者 1 被終止(刪除)。 使用者 2 和 3 各自有兩個版本顯示他們的城市變更。 使用者 4 和 6 是新加入的。
在第二次使用 SCD Type 1 後,目標表僅顯示當前狀態:
| userId | city |
|---|---|
| 2 | 卡梅爾 |
| 3 | 洛杉磯 |
| 4 | 死亡谷 |
| 6 | 國王峽谷 |
範例:使用版本函數進行程序快照
當你需要明確控制快照排序時,請使用此方法。 例如,當多個快照同時抵達或快照順序錯亂時,可以使用此方法。 你寫一個函式,指定下一個要處理的快照及其版本號。 API 依版本依序從低到高處理快照:
- 如果有多個快照在儲存中,則會依序處理。
- 若快照抵達順序錯亂(例如在
snapshot_4之後snapshot_3到達),則會跳過該快照。 - 若無新的快照,函式會回傳
None,且不會進行處理。
步驟 1:準備快照檔案
建立包含快照資料的 CSV 檔案,並將其加入磁碟區或雲端儲存空間。 檔案名稱依時間順序排列(例如, snapshot_1.csv, snapshot_2.csv)。
每個檔案都應包含userId和city的欄位。 例如:
snapshot_1.csv:
| userId | city |
|---|---|
| 1 | 瓦哈卡州 |
| 2 | 蒙特雷 |
| 3 | 蒂華納 |
snapshot_2.csv:
| userId | city |
|---|---|
| 2 | 卡梅爾 |
| 3 | 洛杉磯 |
| 4 | 死亡谷 |
步驟 2:從快照執行 AUTO CDC 並使用版本函式
建立一個新的筆記本,並貼上以下管線程式碼。 接著 開發 Lakeflow Spark 宣告式管線。
from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data
files = dbutils.fs.ls(snapshot_dir)
snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]
snapshot_versions = []
for filename in snapshot_files:
try:
version = int(filename.replace("snapshot_", "").replace(".csv", ""))
snapshot_versions.append(version)
except ValueError:
continue
snapshot_versions.sort()
if latest_snapshot_version is None:
if snapshot_versions:
next_version = snapshot_versions[0]
else:
return None
else:
next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
if next_versions:
next_version = next_versions[0]
else:
return None
snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
df = spark.read.format("csv").option("header", True).load(snapshot_path)
return (df, next_version)
dp.create_streaming_table("main.cdc_tutorial.target_versioned")
dp.create_auto_cdc_from_snapshot_flow(
target = "main.cdc_tutorial.target_versioned",
source = next_snapshot_and_version,
keys = ["userId"],
stored_as_scd_type = 2
)
備註
若要改用 SCD Type 1,請設定 stored_as_scd_type=1。
處理 snapshot_1.csv後,目標資料表包含以下記錄:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | 瓦哈卡州 | 1 | null |
| 2 | 蒙特雷 | 1 | null |
| 3 | 蒂華納 | 1 | null |
處理 snapshot_2.csv後,目標資料表包含以下記錄:
| userId | city | __START_AT | __END_AT |
|---|---|---|---|
| 1 | 瓦哈卡州 | 1 | 2 |
| 2 | 蒙特雷 | 1 | 2 |
| 2 | 卡梅爾 | 2 | null |
| 3 | 蒂華納 | 1 | 2 |
| 3 | 洛杉磯 | 2 | null |
| 4 | 死亡谷 | 2 | null |
備註
請記得,對於 SCD Type 1,表格看起來和最近的快照完全一樣。 差別在於下游查詢可以只處理變更記錄。
步驟 3:新增快照
在儲存位置新增一個 CSV 檔案,並包含修改後的資料(例如:更改城市值、增加新的資料列或刪除現有的資料列)。 接著再跑一次管線來處理新的快照。
局限性
- 序列欄位必須是可排序的資料型態。
NULL不支援排序值。 -
AUTO CDC FROM SNAPSHOT僅支援於 Python 管線介面;SQL 介面不被支援。
其他資源
- 變更資料擷取與快照:了解CDC的概念、快照及SCD類型。
-
透過以下方式
AUTO CDC複製外部關聯式資料庫管理系統表格:學習如何使用once流程進行初始數據準備,然後繼續處理修改。 - 進階的自動CDC主題:了解針對自動CDC目標的變更操作、讀取變更資料流及處理指標。
- 教學課程:使用變更資料擷取建置 ETL 管線