共用方式為


AUTO CDC API:使用管道簡化變更資料擷取

Lakeflow Spark 宣告式管線透過 AUTO CDCAUTO CDC FROM SNAPSHOT API 簡化了變更資料擷取(CDC)。 這些 API 自動化計算來自 CDC 資料流或資料庫快照的類型 1 和類型 2 的緩慢變化維度(SCD)複雜度。 想了解更多這些概念,請參閱 變更資料擷取與快照

備註

API AUTO CDC 取代了 APPLY CHANGES API,語法相同。 APPLY CHANGES API 仍然可用,但 Databricks 建議使用 AUTO CDC API 取而代之。

你使用的 API 取決於變更資料的來源:

  • AUTO CDC:當來源資料庫啟用 CDC 訂閱時,請使用此方法。 AUTO CDC 處理變更資料流(CDF)中的變更。 它同時支援於管線 SQL 與 Python 介面中。
  • AUTO CDC FROM SNAPSHOT:當來源資料庫未啟用 CDC,且僅有快照時,請使用此方法。 此 API 會比較快照以判斷變更,然後進行處理。 它僅支援於 Python 介面中。

這兩個 API 都支援使用 SCD 類型 1 和類型 2 來更新資料表:

  • 使用 SCD Type 1 直接更新紀錄。 更新記錄不會保存歷史。
  • 使用 SCD 類型 2 來保留記錄歷史,無論是所有更新還是指定欄位集合的更新。

Apache Spark 宣告式管線不支援這些 AUTO CDC API。

關於語法及其他參考,請參見 AUTO CDC INTO (pipelines)、create_auto_cdc_flowcreate_auto_cdc_from_snapshot_flow

備註

本頁說明如何根據來源資料的變更更新管線中的資料表。 若要瞭解如何記錄和查詢 Delta 資料表的資料列層級變更資訊,請參閱在 Azure Databricks 上使用 Delta Lake 變更資料摘要

需求

若要使用 CDC API,您的管線必須設定為使用 無伺服器 SDP 或 SDP ProAdvanced版本

AUTO CDC 的運作方式

要使用 AUTO CDC 來執行 CDC 處理,建立串流資料表,然後使用 SQL 中的 AUTO CDC ... INTO 陳述式或 Python 函式 create_auto_cdc_flow() 來指定變更串流的來源、鍵與序列。 關於定序與 SCD 邏輯的說明,請參見 變更資料擷取與快照。 請參考 AUTO CDC 的範例

若從有變更提要的來源初始水合,請先用AUTO CDConce流,然後繼續處理變更提要。 請參考 使用 AUTO CDC 複製外部 RDBMS 資料表

語法細節請參見 AUTO CDC INTO (pipelines)create_auto_cdc_flow

快照生成的自動CDC工作原理

AUTO CDC FROM SNAPSHOT 透過比較順序快照來判斷來源資料的變更。 它僅支援於 Python 管線介面中。 你可以直接從 Delta 表格、雲端儲存檔案或 JDBC 讀取快照。

要使用 AUTO CDC FROM SNAPSHOT 進行 CDC 處理,請建立串流表,然後使用 create_auto_cdc_from_snapshot_flow() 函式指定快照、鍵值及其他參數。 關於兩種資料擷取模式及何時使用,請參見 快照處理模式。 請查看 AUTO CDC FROM SNAPSHOT 範例

語法細節請參見 create_auto_cdc_from_snapshot_flow

使用多欄進行定序

若要依多個欄位排序(例如使用時間戳記和識別碼分辨前後順序),請使用 STRUCT 將它們合併。 API 會先依據第一個欄位排序,若出現平手則考慮第二個欄位,依此類推。

SQL

SEQUENCE BY STRUCT(timestamp_col, id_col)

Python

sequence_by = struct("timestamp_col", "id_col")

AUTO CDC 範例

以下範例展示了利用變更資料來源進行 SCD Type 1 與 Type 2 的處理。 範例資料會建立新的使用者紀錄、刪除使用者記錄,並更新使用者紀錄。 在 SCD Type 1 範例中,最後 UPDATE 的操作會延遲到達,並從目標表中移除,顯示出事件處理順序錯亂。

以下是這些範例中使用的輸入記錄。 這些資料是透過在 「建立樣本資料 」區塊執行查詢而產生的。

userId 名稱 city 作業 序列編號
124 勞爾 瓦哈卡州 INSERT 1
123 Isabel 蒙特雷 INSERT 1
125 梅賽德斯 蒂華納 INSERT 2
126 莉莉 坎昆 INSERT 2
123 null null 刪除 6
125 梅賽德斯 Guadalajara UPDATE 6
125 梅賽德斯 墨西卡利 UPDATE 5
123 Isabel Chihuahua UPDATE 5

如果你在樣本資料生成查詢中取消註解最後一行,它會插入以下記錄,指定要在sequenceNum=3截斷該資料表(清除該資料表)。

userId 名稱 city 作業 序列編號
null null null 截斷 3

備註

下列所有範例都包含指定DELETETRUNCATE作業的選項,但每個選項都是可選的。

建立範例資料

執行以下語句以建立範例資料集。 此程式碼並非設計用於管線定義的一部分。 從管線的探索資料夾執行,而不是從 transformations 資料夾裡執行。

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.users_cdf
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The batch at sequenceNum 6 is the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

程序 SCD 類型 1 更新

SCD Type 1 只保留每筆紀錄的最新版本。 以下範例是從上述建立的變更資料串流中讀取,並將變更套用到串流資料表目標上。 開發 Lakeflow Spark 宣告式管線 來執行此程式碼。

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_current")

dp.create_auto_cdc_flow(
  target = "users_current",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

CREATE OR REFRESH STREAMING TABLE users_current;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_current
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

執行 SCD 類型 1 範例後,目標資料表包含以下記錄:

userId 名稱 city
124 勞爾 瓦哈卡州
125 梅賽德斯 Guadalajara
126 莉莉 坎昆

用戶 123(Isabel)已被刪除,未出現。 使用者 125(Mercedes)只顯示最新的城市(瓜達拉哈拉),因為 SCD Type 1 會覆蓋先前的值。 之前 UPDATE 的 at sequenceNum=5 被刪除,因為後來有更新 at sequenceNum=6

在執行範例並確保 TRUNCATE 記錄未被註解後,該表在 sequenceNum=3 會被清除。 這表示記錄 124126 不在表格中,最終目標表格僅包含以下記錄:

userId 名稱 city
125 梅賽德斯 Guadalajara

處理 SCD 型 2 更新

SCD 類型 2 透過為每個版本的記錄__START_AT__END_AT建立新列,並以欄位表示每個版本的啟用時間,從而保留完整的變更歷史。

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

執行 SCD Type 2 範例後,目標資料表包含以下記錄:

userId 名稱 city __START_AT __END_AT
123 Isabel 蒙特雷 1 5
123 Isabel Chihuahua 5 6
124 勞爾 瓦哈卡州 1 null
125 梅賽德斯 蒂華納 2 5
125 梅賽德斯 墨西卡利 5 6
125 梅賽德斯 Guadalajara 6 null
126 莉莉 坎昆 2 null

這張桌子保存了完整的歷史。 使用者 123 有兩個版本(刪除時止於序列 6)。 用戶 125 有三個版本顯示城市變更。 目前相關 __END_AT = null 紀錄仍在運作中。

用 SCD 類型 2 追蹤欄位子集

預設情況下,SCD Type 2 會在任何欄位值變更時建立新版本。 你可以指定一部分欄位要追蹤,這樣其他欄位的變更會即時更新當前版本,而不是產生新的歷史紀錄。

以下範例排除該 city 欄位的歷史追蹤:

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import col, expr

@dp.view
def users():
  return spark.readStream.table("main.cdc_tutorial.users_cdf")

dp.create_streaming_table("users_history")

dp.create_auto_cdc_flow(
  target = "users_history",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

CREATE OR REFRESH STREAMING TABLE users_history;

CREATE FLOW apply_cdc AS AUTO CDC INTO
  users_history
FROM
  stream(main.cdc_tutorial.users_cdf)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

因為 city 變更不會被追蹤,城市更新會覆蓋目前的列,而不是建立新版本。 目標表包含以下紀錄:

userId 名稱 city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 勞爾 瓦哈卡州 1 null
125 梅賽德斯 Guadalajara 2 null
126 莉莉 坎昆 2 null

快照中的自動CDC範例

以下章節提供使用 AUTO CDC FROM SNAPSHOT 將快照處理至 SCD 類型 1 或類型 2 目標資料表的範例。 關於何時使用此 API 的背景,請參閱 變更資料擷取與快照

範例:使用管線匯入時間的程序快照

當快照定期且有序地到達時,請使用此方法,且你可以依賴管線執行時間戳來進行版本管理。 每次管線更新時都會匯入一個新的快照。

你可以從多種來源類型讀取快照,包括 Delta 表格、雲端儲存檔案和 JDBC 連線。

步驟 1:建立範例資料

建立一個包含快照資料的資料表。 請從你的管線資料夾中的筆記本或 Databricks SQL explorations 執行以下程式碼:

CREATE SCHEMA IF NOT EXISTS main.cdc_tutorial;

CREATE TABLE main.cdc_tutorial.snapshot (
  userId INT,
  city STRING
);

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (1, 'Oaxaca'),
  (2, 'Monterrey'),
  (3, 'Tijuana');

步驟 2:從快照中運行自動 CDC

開發 Lakeflow Spark 宣告式管線 以執行此步驟的程式碼。

選擇快照視圖的來源類型(樣本建立程式碼會產生 Delta 表格):

選項A:從Delta表讀取
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.table("main.cdc_tutorial.snapshot")
選項B:從雲端儲存讀取資料
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return spark.read.format("csv").option("header", True).load("<snapshot-path>")
選項 C:從 JDBC 讀取(僅限經典運算)
from pyspark import pipelines as dp

@dp.view(name="source")
def source():
  return (spark.read
    .format("jdbc")
    .option("url", "<jdbc-url>")
    .option("dbtable", "<table-name>")
    .option("user", "<username>")
    .option("password", "<password>")
    .load()
  )

所有選項,寫入到目標

接著加入目標表,並設定流程:

dp.create_streaming_table("target")

dp.create_auto_cdc_from_snapshot_flow(
  target = "target",
  source = "source",
  keys = ["userId"],
  stored_as_scd_type = 2
)

在第一次管線執行後,所有紀錄都會以活動列的形式插入:

userId city __START_AT __END_AT
1 瓦哈卡州 0 null
2 蒙特雷 0 null
3 蒂華納 0 null

備註

若要改用 SCD Type 1 並只保留當前狀態,請設 stored_as_scd_type=1。 此時,目標資料表不包含 __START_AT__END_AT 欄位。

步驟 3:模擬新的快照並重執行

更新來源資料表以模擬新的快照到達(從你的 pipeline 資料夾中的 explorations 筆記本或 SQL 檔案執行此程式碼):

TRUNCATE TABLE main.cdc_tutorial.snapshot;

INSERT INTO main.cdc_tutorial.snapshot VALUES
  (2, 'Carmel'),
  (3, 'Los Angeles'),
  (4, 'Death Valley'),
  (6, 'Kings Canyon');

再次執行資料處理流程。 AUTO CDC FROM SNAPSHOT 將新的快照與先前的快照比較,發現使用者 1 已被刪除,使用者 2 和 3 已更新,使用者 4 和 6 已插入。 這會產生變更資料流,並用 AUTO CDC 來建立輸出表。

在第二次使用 SCD Type 2 執行後,目標表包含以下記錄:

userId city __START_AT __END_AT
1 瓦哈卡州 0 1
2 蒙特雷 0 1
2 卡梅爾 1 null
3 蒂華納 0 1
3 洛杉磯 1 null
4 死亡谷 1 null
6 國王峽谷 1 null

使用者 1 被終止(刪除)。 使用者 2 和 3 各自有兩個版本顯示他們的城市變更。 使用者 4 和 6 是新加入的。

在第二次使用 SCD Type 1 後,目標表僅顯示當前狀態:

userId city
2 卡梅爾
3 洛杉磯
4 死亡谷
6 國王峽谷

範例:使用版本函數進行程序快照

當你需要明確控制快照排序時,請使用此方法。 例如,當多個快照同時抵達或快照順序錯亂時,可以使用此方法。 你寫一個函式,指定下一個要處理的快照及其版本號。 API 依版本依序從低到高處理快照:

  • 如果有多個快照在儲存中,則會依序處理。
  • 若快照抵達順序錯亂(例如在 snapshot_4 之後 snapshot_3 到達),則會跳過該快照。
  • 若無新的快照,函式會回傳 None ,且不會進行處理。

步驟 1:準備快照檔案

建立包含快照資料的 CSV 檔案,並將其加入磁碟區或雲端儲存空間。 檔案名稱依時間順序排列(例如, snapshot_1.csvsnapshot_2.csv)。

每個檔案都應包含userIdcity的欄位。 例如:

snapshot_1.csv

userId city
1 瓦哈卡州
2 蒙特雷
3 蒂華納

snapshot_2.csv

userId city
2 卡梅爾
3 洛杉磯
4 死亡谷

步驟 2:從快照執行 AUTO CDC 並使用版本函式

建立一個新的筆記本,並貼上以下管線程式碼。 接著 開發 Lakeflow Spark 宣告式管線

from pyspark import pipelines as dp
from typing import Optional, Tuple
from pyspark.sql import DataFrame

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Optional[Tuple[DataFrame, int]]:
  snapshot_dir = "/Volumes/main/cdc_tutorial/snapshots/" # or the location you created the sample data

  files = dbutils.fs.ls(snapshot_dir)
  snapshot_files = [f.name for f in files if f.name.startswith("snapshot_") and f.name.endswith(".csv")]

  snapshot_versions = []
  for filename in snapshot_files:
    try:
      version = int(filename.replace("snapshot_", "").replace(".csv", ""))
      snapshot_versions.append(version)
    except ValueError:
      continue

  snapshot_versions.sort()

  if latest_snapshot_version is None:
    if snapshot_versions:
      next_version = snapshot_versions[0]
    else:
      return None
  else:
    next_versions = [v for v in snapshot_versions if v > latest_snapshot_version]
    if next_versions:
      next_version = next_versions[0]
    else:
      return None

  snapshot_path = f"{snapshot_dir}snapshot_{next_version}.csv"
  df = spark.read.format("csv").option("header", True).load(snapshot_path)
  return (df, next_version)


dp.create_streaming_table("main.cdc_tutorial.target_versioned")

dp.create_auto_cdc_from_snapshot_flow(
  target = "main.cdc_tutorial.target_versioned",
  source = next_snapshot_and_version,
  keys = ["userId"],
  stored_as_scd_type = 2
)

備註

若要改用 SCD Type 1,請設定 stored_as_scd_type=1

處理 snapshot_1.csv後,目標資料表包含以下記錄:

userId city __START_AT __END_AT
1 瓦哈卡州 1 null
2 蒙特雷 1 null
3 蒂華納 1 null

處理 snapshot_2.csv後,目標資料表包含以下記錄:

userId city __START_AT __END_AT
1 瓦哈卡州 1 2
2 蒙特雷 1 2
2 卡梅爾 2 null
3 蒂華納 1 2
3 洛杉磯 2 null
4 死亡谷 2 null

備註

請記得,對於 SCD Type 1,表格看起來和最近的快照完全一樣。 差別在於下游查詢可以只處理變更記錄。

步驟 3:新增快照

在儲存位置新增一個 CSV 檔案,並包含修改後的資料(例如:更改城市值、增加新的資料列或刪除現有的資料列)。 接著再跑一次管線來處理新的快照。

局限性

  • 序列欄位必須是可排序的資料型態。 NULL 不支援排序值。
  • AUTO CDC FROM SNAPSHOT 僅支援於 Python 管線介面;SQL 介面不被支援。

其他資源