使用 AUTO CDC ... INTO
語句來建立利用 Lakeflow 宣告式管線的變更數據擷取(CDC)功能的流程。 此語句的功能是讀取 CDC 來源的變更,並將其應用到串流目標。
- 若要瞭解 CDC,請參閱 什麼是異動數據擷取 (CDC)?。
- 如需使用
AUTO CDC
的詳細資訊,請參閱 AUTO CDC API:使用 Lakeflow 宣告式管線簡化異動數據擷取。 - 如需查看更多
CREATE FLOW
的詳細資訊,請參閱 CREATE FLOW(Lakeflow 宣告性管線)。
語法
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
您可以使用與其他 Lakeflow 宣告式管線查詢相同的 CONSTRAINT
子句來定義目標的數據質量條件約束。 請參閱 使用管線期望來管理資料品質。
INSERT
和 UPDATE
事件的預設行為是從來源 插入或更新 CDC 事件:更新目標資料表中符合指定索引鍵的任何資料列,或在目標資料表中不存在比對記錄時插入新資料列。 您可以使用 DELETE
條件來指定 APPLY AS DELETE WHEN
事件的處理。
這很重要
您必須宣告目標串流資料表,才能將變更套用至 。 您可以選擇性地指定目標資料表的架構。 針對 SCD 類型 2 資料表,在指定目標資料表的架構時,您也必須包含 __START_AT
和 __END_AT
資料行,其資料類型須與 sequence_by
欄位相同。
請參閱 AUTO CDC API:使用 Lakeflow 宣告式管線簡化異動數據擷取。
參數
flow_name
要建立之流程的名稱。
source
數據的來源。 來源必須是 串流 來源。 使用 STREAM 關鍵詞來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。 若要匯入包含變更提交的數據,您可以使用 Python 和
SkipChangeCommits
選項來處理錯誤。如需串流資料的詳細資訊,請參閱 用管線轉換資料。
KEYS
能唯一識別來源資料中某一資料列的資料欄或資料欄組合。 這些數據行中的值可用來識別哪些 CDC 事件會套用至目標數據表中的特定記錄。
若要定義資料行的組合,請使用以逗號分隔的資料行清單。
這個子句是必要的。
IGNORE NULL UPDATES
允許擷取包含目標欄位子集的更新。 當 CDC 事件符合現有的數據列,並指定 IGNORE NULL UPDATES 時,具有
null
值的數據行會在目標中保留其現有值。 這也適用於具有null
值的巢狀數據行。這個子句是選擇性的。
預設值是使用
null
值覆寫現有的數據行。APPLY AS DELETE WHEN
指定何時應將 CDC 事件視為
DELETE
,而非視為 upsert。針對 SCD 類型 2 的來源,為了處理順序錯亂的資料,已刪除的資料列會暫時保留為基礎 Delta 資料表中的墓碑記錄,並在中繼資料庫中建立一個檢視來過濾掉這些墓碑記錄。 您可以使用
pipelines.cdc.tombstoneGCThresholdInSeconds
來設定保留間隔。這個子句是選擇性的。
APPLY AS TRUNCATE WHEN
指定應將 CDC 事件視為完整資料表
TRUNCATE
的時機。 因為這個子句會觸發目標數據表的完整截斷,所以應該只用於需要這項功能的特定使用案例。只有 SCD 類型 1 才支援
APPLY AS TRUNCATE WHEN
子句。 SCD 類型 2 不支援截斷操作。這個子句是選擇性的。
SEQUENCE BY
指定源數據中 CDC 事件邏輯順序的數據行名稱。 Lakeflow 宣告式管線會使用此排序來處理無序抵達的變更事件。
如果需要多個數據行進行排序,請使用表達式:它會先依第一個
STRUCT
結構欄位排序,然後在有系結時依第二個字段排序,依此排序。指定的數據行必須是可排序的數據類型。
此條款是必需的。
COLUMNS
指定要包含在目標數據表中的數據行子集。 您可以選擇以下其中一項:
- 指定要包含之欄位的完整清單:
COLUMNS (userId, name, city)
。 - 指定要排除的欄位清單:
COLUMNS * EXCEPT (operation, sequenceNum)
這個子句是選擇性的。
預設情況下,如果未指定
COLUMNS
子句,將在目標表中包含所有列。- 指定要包含之欄位的完整清單:
STORED AS
是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。
這個子句是選擇性的。
預設值為 SCD 類型 1。
TRACK HISTORY ON
指定輸出資料行的子集,以在這些指定的資料行有任何變更時生成歷史記錄。 您可以選擇以下其中一項:
- 指定要追蹤的欄位的完整清單:
COLUMNS (userId, name, city)
。 - 指定要從追蹤中排除的數據行清單:
COLUMNS * EXCEPT (operation, sequenceNum)
這個子句是選擇性的。 當有任何變更時,預設操作是追蹤所有輸出資料列的歷史記錄,相當於
TRACK HISTORY ON *
。- 指定要追蹤的欄位的完整清單:
範例
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);