這很重要
此功能為 公開預覽版。
此 create_auto_cdc_from_snapshot_flow 函式會建立流程,使用 Lakeflow Spark 宣告式管線變更資料擷取 (CDC) 功能來處理資料庫快照集的來源資料。 請參閱 API AUTO CDC FROM SNAPSHOT 如何實現 CDC。。
備註
此函數取代了先前的函數 apply_changes_from_snapshot()。 這兩個函數具有相同的簽章。 Databricks 建議更新以使用新名稱。
語法
from pyspark import pipelines as dp
dp.create_auto_cdc_from_snapshot_flow(
target = "<target-table>",
source = Any,
keys = ["key1", "key2", "keyN"],
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None
)
備註
對於 AUTO CDC FROM SNAPSHOT 處理,預設行為是在目標中不存在具有相同索引鍵的相符記錄時插入新列。 如果相符記錄確實存在,則只有在列中的任何值已變更時才會更新該記錄。 目標中存在索引鍵但來源中不再存在的資料列會被刪除。
若要深入瞭解使用快照集進行 CDC 處理,請參閱 自動 CDC API:使用管線簡化變更資料擷取。 如需使用此功能 create_auto_cdc_from_snapshot_flow() 的範例,請參閱 定期快照擷取 和 歷史快照擷取 範例。
參數
| 參數 | 類型 | Description |
|---|---|---|
target |
str |
必須的。 要更新的資料表名稱。 您可以在執行函數之前使用create_auto_cdc_from_snapshot_flow()函數來建立目標表。 |
source |
str 或 lambda function |
必須的。 要定期快照的資料表或檢視的名稱,或是一個 Python lambda 函數,該函數會傳回要處理的快照資料框和快照版本。 請參閱 實作 source 引數。 |
keys |
list |
必須的。 唯一定義來源資料中資料列的欄或欄組合。 這可用來識別哪些 CDC 事件套用至目標資料表中的特定記錄。 您可以指定下列其中一項:
|
stored_as_scd_type |
str 或 int |
是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。 設為1 以用於 SCD 類型 1,或設為2 以用於 SCD 類型 2。 預設值為 SCD 類型 1。 |
track_history_column_list 或 track_history_except_column_list |
list |
目標表中被追蹤歷史的輸出欄位子集。 用 track_history_column_list 來指定要追蹤的資料行的完整清單。 用 track_history_except_column_list 來指定要從追蹤中排除的欄。 您可以將任一值宣告為字串清單或 Spark SQL col() 函數:
函數的 col() 引數不能包含限定詞。 例如,您可以使用 col(userId),但不能使用 col(source.userId)。 當未傳遞 track_history_column_list 或 track_history_except_column_list 參數給函數時,預設為包含目標表中的所有資料行。 |
實作 source 引數
函數create_auto_cdc_from_snapshot_flow()包含引數source。 為了處理歷史快照, source 引數應該是一個 Python lambda 函數,該函數會向函數 create_auto_cdc_from_snapshot_flow() 傳回兩個值:包含要處理的快照資料的 Python DataFrame 和快照版本。
以下是 lambda 函數的簽名:
lambda Any => Optional[(DataFrame, Any)]
- lambda 函數的引數是最新處理的快照版本。
- lambda 函數的傳回值是
None或兩個值的元組:元組的第一個值是包含要處理的快照的 DataFrame。 元組的第二個值是快照版本,代表快照的邏輯順序。
實作和呼叫 lambda 函數的範例:
def next_snapshot_and_version(latest_snapshot_version: Optional[int]) -> Tuple[Optional[int], DataFrame]:
if latest_snapshot_version is None:
return (spark.read.load("filename.csv"), 1)
else:
return None
create_auto_cdc_from_snapshot_flow(
# ...
source = next_snapshot_and_version,
# ...
)
Lakeflow Spark 宣告式管線執行階段會在每次觸發包含 create_auto_cdc_from_snapshot_flow() 函式的管線時執行下列步驟:
- 執行
next_snapshot_and_version函式以載入下一個快照集 DataFrame 和對應的快照版本。 - 如果沒有傳回 DataFrame,則會終止執行,並將管線更新標示為完成。
- 偵測新快照中的變更,並以累加方式將它們套用至目標資料表。
- 返回步驟 #1 以載入下一個快照及其版本。