使用
本頁會逐步引導您瞭解如何使用管線中的 API ,將 AUTO CDC 資料表從外部關聯式資料庫管理系統 (RDBMS) 複寫至 Azure Databricks。 您將學到:
- 設定來源的一般模式。
- 如何使用
once流程執行現有資料的一次性完整複製。 - 如何使用
change流程擷取持續新變更。
此型樣非常適合建置緩慢變更的維度 (SCD) 表格,或讓目標表格與外部記錄系統保持同步。
開始之前
本指南假設您可以從來源存取下列資料集:
- 雲端儲存中來源資料表的完整快照。 此資料集用於初始載入。
- 連續變更數據流,匯入相同的雲端儲存位置(例如,使用 Debezium、Kafka 或日誌型 CDC)。 此饋送是運行中
AUTO CDC程序的資料輸入。
設定來源檢視
首先,定義兩個來源檢視,以從雲端儲存路徑rdbms_orders填入orders_snapshot_path目標資料表。 兩者都是在雲端儲存中的原始數據上構建的串流視圖。 使用檢視可提供更高的效率,因為在程式中使用 AUTO CDC 之前不需要寫入資料。
- 第一個來源視圖是完整快照 (
full_orders_snapshot) - 第二個是連續變更摘要 (
rdbms_orders_change_feed)。
本指南中的範例使用雲端儲存作為來源,但您可以使用串流資料表支援的任何來源。
full_orders_snapshot()
此步驟會建立一個資料處理管線,該管線中包含一個檢視,該檢視會讀取訂單資料的初始完整快照。
Python
下列 Python 範例:
-
spark.readStream與自動裝載機一起使用 (format("cloudFiles")) - 從由
orders_snapshot_path定義的目錄中讀取 JSON 檔案 - 設定
includeExistingFiles為true確保處理路徑中已存在的歷史資料 - 設定
inferColumnTypes為true自動推斷模式結構 - 傳回所有具有
.select("\*")的欄位
@dp.view()
def full_orders_snapshot():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_snapshot_path)
.select("*")
)
SQL
下列 SQL 範例會將選項傳遞為字串鍵值對的映射。
orders_snapshot_path 應該可作為 SQL 變數使用(例如,使用管線參數定義或手動插值)。
CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
rdbms_orders_change_feed()
此步驟會建立第二個視圖,以讀取增量變更資料(例如,從 CDC 日誌或變更資料表)。 它會 orders_cdc_path 讀取並假設 CDC 樣式的 JSON 檔案會定期放入此路徑。
Python
@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)
SQL
在下列 SQL 範例中, ${orders_cdc_path} 是變數,可在管線設定中設定值,或在程式碼中明確設定變數,以進行內插。
CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));
初始水合(單次流動)
現在已設定來源, AUTO CDC 邏輯會將兩個來源合併到目標串流表格中。 首先,使用一次性 AUTO CDC 流程將 ONCE=TRUE RDBMS 資料表的完整內容複製到串流資料表中。 這會使用歷史資料準備目標資料表,且不會在未來的更新中重播資料。
Python
from pyspark import pipelines as dp
# Step 1: Create the target streaming table
dp.create_streaming_table("rdbms_orders")
# Step 2: Once Flow — Load initial snapshot of full RDBMS table
dp.create_auto_cdc_flow(
flow_name = "initial_load_orders",
once = True, # one-time load
target = "rdbms_orders",
source = "full_orders_snapshot", # e.g., ingested from JDBC into bronze
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;
-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
once流程只會執行一次。 系統會忽略在管線建立之後新增的新 full_orders_snapshot 檔案。
這很重要
在 rdbms_orders 串流資料表上執行完整重新整理會重新執行 once 流程。 如果雲端儲存中的初始快照資料已被移除,則會導致資料遺失。
持續變更饋送(變更流程)
在初始快照載入之後,使用另一個 AUTO CDC 流程從 RDBMS 的 CDC feed 持續匯入變更。 這可讓您的 rdbms_orders 表格保持最新狀態,包括插入、更新和刪除。
Python
from pyspark import pipelines as dp
# Step 3: Change Flow — Ingest ongoing CDC stream from source system
dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)
SQL
-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;
考慮事項
| 回填冪等 |
once只有在目標資料表完全重新整理時,流程才會重新執行。 |
|---|---|
| 多個流程 | 您可以使用多個變更流程來合併更正、延遲到達的資料或替代資料源,但所有變更流程都必須共用結構和索引鍵。 |
| 完整重新整理 | 串流資料表的完整 rdbms_orders 重新整理會重新執行 once 操作流程。 如果初始雲端儲存位置已修剪掉初始快照資料,這可能會導致資料遺失。 |
| 流程執行順序 | 流程執行的順序並不重要。 最終結果是一樣的。 |
其他資源
- Lakeflow Connect 中的完全受控 SQL Server 連接器