共用方式為


使用 AUTO CDC 複製外部 RDBMS 資料表

本頁會逐步引導您瞭解如何使用管線中的 API ,將 AUTO CDC 資料表從外部關聯式資料庫管理系統 (RDBMS) 複寫至 Azure Databricks。 您將學到:

  • 設定來源的一般模式。
  • 如何使用 once 流程執行現有資料的一次性完整複製。
  • 如何使用 change 流程擷取持續新變更。

此型樣非常適合建置緩慢變更的維度 (SCD) 表格,或讓目標表格與外部記錄系統保持同步。

開始之前

本指南假設您可以從來源存取下列資料集:

  • 雲端儲存中來源資料表的完整快照。 此資料集用於初始載入。
  • 連續變更數據流,匯入相同的雲端儲存位置(例如,使用 Debezium、Kafka 或日誌型 CDC)。 此饋送是運行中 AUTO CDC 程序的資料輸入。

設定來源檢視

首先,定義兩個來源檢視,以從雲端儲存路徑rdbms_orders填入orders_snapshot_path目標資料表。 兩者都是在雲端儲存中的原始數據上構建的串流視圖。 使用檢視可提供更高的效率,因為在程式中使用 AUTO CDC 之前不需要寫入資料。

  • 第一個來源視圖是完整快照 (full_orders_snapshot
  • 第二個是連續變更摘要 (rdbms_orders_change_feed)。

本指南中的範例使用雲端儲存作為來源,但您可以使用串流資料表支援的任何來源。

full_orders_snapshot()

此步驟會建立一個資料處理管線,該管線中包含一個檢視,該檢視會讀取訂單資料的初始完整快照。

Python

下列 Python 範例:

  • spark.readStream與自動裝載機一起使用 (format("cloudFiles")
  • 從由 orders_snapshot_path 定義的目錄中讀取 JSON 檔案
  • 設定 includeExistingFilestrue 確保處理路徑中已存在的歷史資料
  • 設定 inferColumnTypestrue 自動推斷模式結構
  • 傳回所有具有 .select("\*") 的欄位
@dp.view()
def full_orders_snapshot():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.includeExistingFiles", "true")
        .option("cloudFiles.inferColumnTypes", "true")
        .load(orders_snapshot_path)
        .select("*")
    )

SQL

下列 SQL 範例會將選項傳遞為字串鍵值對的映射。 orders_snapshot_path 應該可作為 SQL 變數使用(例如,使用管線參數定義或手動插值)。

CREATE OR REFRESH VIEW full_orders_snapshot
AS SELECT *
FROM STREAM read_files("${orders_snapshot_path}", "json", map(
  "cloudFiles.includeExistingFiles", "true",
  "cloudFiles.inferColumnTypes", "true"
));

rdbms_orders_change_feed()

此步驟會建立第二個視圖,以讀取增量變更資料(例如,從 CDC 日誌或變更資料表)。 它會 orders_cdc_path 讀取並假設 CDC 樣式的 JSON 檔案會定期放入此路徑。

Python

@dp.view()
def rdbms_orders_change_feed():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.includeExistingFiles", "true")
.option("cloudFiles.inferColumnTypes", "true")
.load(orders_cdc_path)

SQL

在下列 SQL 範例中, ${orders_cdc_path} 是變數,可在管線設定中設定值,或在程式碼中明確設定變數,以進行內插。

CREATE OR REFRESH VIEW rdbms_orders_change_feed
AS SELECT *
FROM STREAM read_files("${orders_cdc_path}", "json", map(
"cloudFiles.includeExistingFiles", "true",
"cloudFiles.inferColumnTypes", "true"
));

初始水合(單次流動)

現在已設定來源, AUTO CDC 邏輯會將兩個來源合併到目標串流表格中。 首先,使用一次性 AUTO CDC 流程將 ONCE=TRUE RDBMS 資料表的完整內容複製到串流資料表中。 這會使用歷史資料準備目標資料表,且不會在未來的更新中重播資料。

Python

from pyspark import pipelines as dp

# Step 1: Create the target streaming table

dp.create_streaming_table("rdbms_orders")

# Step 2: Once Flow — Load initial snapshot of full RDBMS table

dp.create_auto_cdc_flow(
  flow_name = "initial_load_orders",
  once = True,  # one-time load
  target = "rdbms_orders",
  source = "full_orders_snapshot",  # e.g., ingested from JDBC into bronze
  keys = ["order_id"],
  sequence_by = "timestamp",
  stored_as_scd_type = "1"
)

SQL


-- Step 1: Create the target streaming table
CREATE OR REFRESH STREAMING TABLE rdbms_orders;

-- Step 2: Once Flow for initial snapshot
CREATE FLOW rdbms_orders_hydrate
AS AUTO CDC ONCE INTO rdbms_orders
FROM stream(full_orders_snapshot)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

once流程只會執行一次。 系統會忽略在管線建立之後新增的新 full_orders_snapshot 檔案。

這很重要

rdbms_orders 串流資料表上執行完整重新整理會重新執行 once 流程。 如果雲端儲存中的初始快照資料已被移除,則會導致資料遺失。

持續變更饋送(變更流程)

在初始快照載入之後,使用另一個 AUTO CDC 流程從 RDBMS 的 CDC feed 持續匯入變更。 這可讓您的 rdbms_orders 表格保持最新狀態,包括插入、更新和刪除。

Python

from pyspark import pipelines as dp

# Step 3: Change Flow — Ingest ongoing CDC stream from source system

dp.create_auto_cdc_flow(
flow_name = "orders_incremental_cdc",
target = "rdbms_orders",
source = "rdbms_orders_change_feed", # e.g., ingested from Kafka or Debezium
keys = ["order_id"],
sequence_by = "timestamp",
stored_as_scd_type = "1"
)

SQL

-- Step 3: Continuous CDC ingestion
CREATE FLOW rdbms_orders_continuous
AS AUTO CDC INTO rdbms_orders
FROM stream(rdbms_orders_change_feed)
KEYS (order_id)
SEQUENCE BY timestamp
STORED AS SCD TYPE 1;

考慮事項

回填冪等 once只有在目標資料表完全重新整理時,流程才會重新執行。
多個流程 您可以使用多個變更流程來合併更正、延遲到達的資料或替代資料源,但所有變更流程都必須共用結構和索引鍵。
完整重新整理 串流資料表的完整 rdbms_orders 重新整理會重新執行 once 操作流程。 如果初始雲端儲存位置已修剪掉初始快照資料,這可能會導致資料遺失。
流程執行順序 流程執行的順序並不重要。 最終結果是一樣的。

其他資源