共用方式為


create_auto_cdc_from_snapshot_flow

這很重要

此功能為 公開預覽版。

create_auto_cdc_from_snapshot_flow 函式會建立流程,使用 Lakeflow Spark 宣告式管線變更資料擷取 (CDC) 功能來處理資料庫快照集的來源資料。 請參閱 API AUTO CDC FROM SNAPSHOT 如何實現 CDC。

備註

此函數取代了先前的函數 apply_changes_from_snapshot()。 這兩個函數具有相同的簽章。 Databricks 建議更新以使用新名稱。

這很重要

您必須具備一個目標串流資料表才能進行此操作。

若要建立所需的目標表格,您可以使用 create_streaming_table() 函數。

語法

from pyspark import pipelines as dp

dp.create_auto_cdc_from_snapshot_flow(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

備註

對於 AUTO CDC FROM SNAPSHOT 處理,預設行為是在目標中不存在具有相同索引鍵的相符記錄時插入新列。 如果相符記錄確實存在,則只有在列中的任何值已變更時才會更新該記錄。 目標中存在索引鍵但來源中不再存在的資料列會被刪除。

若要深入瞭解使用快照集進行 CDC 處理,請參閱 自動 CDC API:使用管線簡化變更資料擷取。 如需使用此功能 create_auto_cdc_from_snapshot_flow() 的範例,請參閱 定期快照擷取歷史快照擷取 範例。

參數

參數 類型 Description
target str 必須的。 要更新的資料表名稱。 您可以在執行函數之前使用create_auto_cdc_from_snapshot_flow()函數來建立目標表。
source strlambda function 必須的。 要定期快照的資料表或檢視的名稱,或是一個 Python lambda 函數,該函數會傳回要處理的快照資料框和快照版本。 請參閱 實作 source 引數
keys list 必須的。 唯一定義來源資料中資料列的欄或欄組合。 這可用來識別哪些 CDC 事件套用至目標資料表中的特定記錄。 您可以指定下列其中一項:
  • 字串清單: ["userId", "orderId"]
  • Spark SQL col() 函數清單: [col("userId"), col("orderId"]。 函數的 col() 引數不能包含限定詞。 例如,您可以使用 col(userId),但不能使用 col(source.userId)
stored_as_scd_type strint 是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 設為1 以用於 SCD 類型 1,或設為2 以用於 SCD 類型 2。 預設值為 SCD 類型 1。
track_history_column_listtrack_history_except_column_list list 目標表中被追蹤歷史的輸出欄位子集。 用 track_history_column_list 來指定要追蹤的資料行的完整清單。 用 track_history_except_column_list 來指定要從追蹤中排除的欄。 您可以將任一值宣告為字串清單或 Spark SQL col() 函數:
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

函數的 col() 引數不能包含限定詞。 例如,您可以使用 col(userId),但不能使用 col(source.userId)。 當未傳遞 track_history_column_listtrack_history_except_column_list 參數給函數時,預設為包含目標表中的所有資料行。

實作 source 引數

函數create_auto_cdc_from_snapshot_flow()包含引數source。 為了處理歷史快照, source 引數應該是一個 Python lambda 函數,該函數會向函數 create_auto_cdc_from_snapshot_flow() 傳回兩個值:包含要處理的快照資料的 Python DataFrame 和快照版本。

以下是 lambda 函數的簽名:

lambda Any => Optional[(DataFrame, Any)]
  • lambda 函數的引數是最新處理的快照版本。
  • lambda 函數的傳回值是 None 或兩個值的元組:元組的第一個值是包含要處理的快照的 DataFrame。 元組的第二個值是快照版本,代表快照的邏輯順序。

實作和呼叫 lambda 函數的範例:

def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

create_auto_cdc_from_snapshot_flow(
  # ...
  source = next_snapshot_and_version,
  # ...
)

Lakeflow Spark 宣告式管線執行階段會在每次觸發包含 create_auto_cdc_from_snapshot_flow() 函式的管線時執行下列步驟:

  1. 執行 next_snapshot_and_version 函式以載入下一個快照集 DataFrame 和對應的快照版本。
  2. 如果沒有傳回 DataFrame,則會終止執行,並將管線更新標示為完成。
  3. 偵測新快照中的變更,並以累加方式將它們套用至目標資料表。
  4. 返回步驟 #1 以載入下一個快照及其版本。