使用語句 AUTO CDC ... INTO 來建立一個流程,該流程利用 Lakeflow Spark 宣告式管線中的變更資料擷取(CDC)功能。 此陳述式會從 CDC 來源讀取變更,並將其套用至串流目標。
- 若要瞭解 CDC,請參閱 什麼是變更資料擷取 (CDC)?。
- 如需使用
AUTO CDC的詳細資訊,請參閱 AUTO CDC API:使用管線簡化變更資料擷取。 - 如需
CREATE FLOW的詳細資訊,請參閱 CREATE FLOW(管線)。
語法
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
您可以使用與其他管線查詢相同的 CONSTRAINT 子句來定義目標的資料品質限制。 請參閱 使用管線期望來管理資料品質。
INSERT 和 UPDATE 事件的預設行為是從來源進行 CDC 事件的更新或插入:如果目標資料表中有符合指定索引鍵的資料列,則進行更新;若否,則插入新資料列。 使用DELETE條件可以指定APPLY AS DELETE WHEN事件的處理。
這很重要
您必須指定要套用變更的目標串流資料表。 您可以選擇性地指定目標資料表的結構描述。 對於 SCD 類型 2 表格,在指定目標表格的結構時,您必須包含 __START_AT 和 __END_AT 欄位,且這些欄位的資料類型必須與 sequence_by 欄位相同。
請參閱 AUTOTO CDC API:使用管線簡化變更資料擷取。
參數
flow_name要建立的流程名稱。
source資料的來源。 來源必須是 串流 來源。 使用 STREAM 關鍵詞來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。 若要匯入包含變更提交的數據,您可以使用 Python 和
SkipChangeCommits選項來處理錯誤。如需串流資料的詳細資訊,請參閱 用管線轉換資料。
KEYS唯一定義來源資料中資料列的欄或欄組合。 這些直欄中的值可用來識別哪些 CDC 事件套用至目標表格中的特定記錄。
若要定義直欄組合,請使用逗號分隔的直欄清單。
本條款是必需的。
IGNORE NULL UPDATES允許匯入包含目標欄位子集的更新。 當 CDC 事件符合現有資料列,且指定 IGNORE NULL UPDATES 時,具有值
null的資料行會保留其在目標中的現有值。 這也適用於具有null這個值的巢狀資料行。此子句是選用的。
預設值是以
null值覆蓋現有資料行。APPLY AS DELETE WHEN指定何時應將 CDC 事件視為
DELETE而不是更新插入。針對 SCD 類型 2 來源,若要處理亂序資料,已刪除的資料列會暫時保留為基礎的 Delta 資料表中的墓碑標記,並在中繼存放區中建立一個檢視,以篩選掉這些墓碑標記。 保留間隔可以使用
pipelines.cdc.tombstoneGCThresholdInSeconds來配置。此子句是選用的。
APPLY AS TRUNCATE WHEN指定何時應將 CDC 事件視為完整表格
TRUNCATE。 由於此子句會觸發目標資料表的完整截斷,因此它應該僅用於需要此功能的特定使用案例。APPLY AS TRUNCATE WHEN此子句僅支援 SCD 類型 1。 SCD 類型 2 不支援截斷作業。此子句是選用的。
SEQUENCE BY指定來源資料中 CDC 事件邏輯順序的欄位名稱。 管線處理會使用此排序來處理無序抵達的變更事件。
如果排序需要多個資料行,請使用
STRUCT運算式:它會先依第一個結構欄位排序,如果有平局,則依第二個欄位排序,依此類推。指定的資料行必須是可排序的資料類型。
此條款是必需的。
COLUMNS指定要包含在目標表格中的欄位子集。 您可以選擇以下其中一項:
- 指定要包含的欄的完整清單:
COLUMNS (userId, name, city)。 - 指定要排除的資料行清單:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是選用的。
預設是當未指定
COLUMNS子句時,會併入目標表格中的所有欄。- 指定要包含的欄的完整清單:
STORED AS是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。
此子句是選用的。
預設值為 SCD 類型 1。
TRACK HISTORY ON指定輸出直欄的子集,以便在那些指定的直欄發生任何變更時產生歷程記錄。 您可以選擇以下其中一項:
- 指定要追蹤的欄位的完整清單:
COLUMNS (userId, name, city)。 - 指定要從追蹤中排除的資料行清單:
COLUMNS * EXCEPT (operation, sequenceNum)
此子句是選用的。 預設值是在有任何變更時會追蹤所有輸出欄位的歷程記錄,這相當於
TRACK HISTORY ON *。- 指定要追蹤的欄位的完整清單:
範例
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);