共用方式為


自動 CDC 輸入(管道)

使用語句 AUTO CDC ... INTO 來建立一個流程,該流程利用 Lakeflow Spark 宣告式管線中的變更資料擷取(CDC)功能。 此陳述式會從 CDC 來源讀取變更,並將其套用至串流目標。

語法

CREATE OR REFRESH STREAMING TABLE table_name;

CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

您可以使用與其他管線查詢相同的 CONSTRAINT 子句來定義目標的資料品質限制。 請參閱 使用管線期望來管理資料品質

INSERTUPDATE 事件的預設行為是從來源進行 CDC 事件的更新或插入:如果目標資料表中有符合指定索引鍵的資料列,則進行更新;若否,則插入新資料列。 使用DELETE條件可以指定APPLY AS DELETE WHEN事件的處理。

這很重要

您必須指定要套用變更的目標串流資料表。 您可以選擇性地指定目標資料表的結構描述。 對於 SCD 類型 2 表格,在指定目標表格的結構時,您必須包含 __START_AT__END_AT 欄位,且這些欄位的資料類型必須與 sequence_by 欄位相同。

請參閱 AUTOTO CDC API:使用管線簡化變更資料擷取

參數

  • flow_name

    要建立的流程名稱。

  • source

    資料的來源。 來源必須是 串流 來源。 使用 STREAM 關鍵詞來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。 若要匯入包含變更提交的數據,您可以使用 Python 和 SkipChangeCommits 選項來處理錯誤。

    如需串流資料的詳細資訊,請參閱 用管線轉換資料

  • KEYS

    唯一定義來源資料中資料列的欄或欄組合。 這些直欄中的值可用來識別哪些 CDC 事件套用至目標表格中的特定記錄。

    若要定義直欄組合,請使用逗號分隔的直欄清單。

    本條款是必需的。

  • IGNORE NULL UPDATES

    允許匯入包含目標欄位子集的更新。 當 CDC 事件符合現有資料列,且指定 IGNORE NULL UPDATES 時,具有值 null 的資料行會保留其在目標中的現有值。 這也適用於具有 null 這個值的巢狀資料行。

    此子句是選用的。

    預設值是以null值覆蓋現有資料行。

  • APPLY AS DELETE WHEN

    指定何時應將 CDC 事件視為 DELETE 而不是更新插入。

    針對 SCD 類型 2 來源,若要處理亂序資料,已刪除的資料列會暫時保留為基礎的 Delta 資料表中的墓碑標記,並在中繼存放區中建立一個檢視,以篩選掉這些墓碑標記。 保留間隔可以使用pipelines.cdc.tombstoneGCThresholdInSeconds配置。

    此子句是選用的。

  • APPLY AS TRUNCATE WHEN

    指定何時應將 CDC 事件視為完整表格 TRUNCATE。 由於此子句會觸發目標資料表的完整截斷,因此它應該僅用於需要此功能的特定使用案例。

    APPLY AS TRUNCATE WHEN此子句僅支援 SCD 類型 1。 SCD 類型 2 不支援截斷作業。

    此子句是選用的。

  • SEQUENCE BY

    指定來源資料中 CDC 事件邏輯順序的欄位名稱。 管線處理會使用此排序來處理無序抵達的變更事件。

    如果排序需要多個資料行,請使用 STRUCT 運算式:它會先依第一個結構欄位排序,如果有平局,則依第二個欄位排序,依此類推。

    指定的資料行必須是可排序的資料類型。

    此條款是必需的。

  • COLUMNS

    指定要包含在目標表格中的欄位子集。 您可以選擇以下其中一項:

    • 指定要包含的欄的完整清單: COLUMNS (userId, name, city)
    • 指定要排除的資料行清單: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是選用的。

    預設是當未指定 COLUMNS 子句時,會併入目標表格中的所有欄。

  • STORED AS

    是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。

    此子句是選用的。

    預設值為 SCD 類型 1。

  • TRACK HISTORY ON

    指定輸出直欄的子集,以便在那些指定的直欄發生任何變更時產生歷程記錄。 您可以選擇以下其中一項:

    • 指定要追蹤的欄位的完整清單: COLUMNS (userId, name, city)
    • 指定要從追蹤中排除的資料行清單: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是選用的。 預設值是在有任何變更時會追蹤所有輸出欄位的歷程記錄,這相當於 TRACK HISTORY ON *

範例

-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;

CREATE FLOW flow
AS AUTO CDC INTO
  target
FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);