共用方式為


CREATE STREAMING TABLE ...流動自動疾病控制中心

適用於:勾選「是」 Databricks SQL

這很重要

這項功能位於 測試版 (Beta) 中。 需要 Databricks 執行環境 17.3 及以上版本。

使用 FLOW AUTO CDC 子句 與 CREATE STREAMING TABLE 來將變更資料擷取(CDC)記錄從來源處理到串流資料表。

先前,此 MERGE INTO 陳述式通常用於處理 Azure Databricks 上的 CDC 記錄。 不過,可能會 MERGE INTO 因為記錄順序不正確而產生不正確的結果,或需要複雜的邏輯來重新排序記錄。

AUTO CDC 透過自動處理亂序紀錄,簡化 CDC。 你指定用來識別記錄的鍵、一個用於排序的序列欄位,以及是將結果儲存為 SCD 類型 1(直接更新)還是 SCD 類型 2(歷史追蹤)。

語法

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

UPDATE事件的預設行為INSERT是從來源上行 CDC 事件:更新目標資料表中與指定鍵數相符的列,或在目標資料表中不存在匹配紀錄時插入新列。 使用DELETE條件可以指定APPLY AS DELETE WHEN事件的處理。

參數

  • source

    資料的來源。 來源必須是串流來源。 使用STREAM關鍵詞,來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。

    如需串流資料的詳細資訊,請參閱 用管線轉換資料

  • KEYS

    唯一定義來源資料中資料列的欄或欄組合。 這些直欄中的值可用來識別哪些 CDC 事件套用至目標表格中的特定記錄。

    若要定義直欄組合,請使用逗號分隔的直欄清單。

    此條款是必需的。

  • IGNORE NULL UPDATES

    允許匯入包含目標欄位子集的更新。 當 CDC 事件與現有列匹配且 IGNORE NULL UPDATES 被指定時,具有 值的 null 欄位會保留其在目標中原本的值。 這也適用於具有 null 這個值的巢狀資料行。

    此子句是選用的。

    預設值是以null值覆蓋現有資料行。

  • APPLY AS DELETE WHEN

    指定何時應將 CDC 事件視為 DELETE 而不是更新插入。

    針對 SCD 類型 2 來源,若要處理亂序資料,已刪除的資料列會暫時保留為基礎的 Delta 資料表中的墓碑標記,並在中繼存放區中建立一個檢視,以篩選掉這些墓碑標記。 保留間隔可以使用pipelines.cdc.tombstoneGCThresholdInSeconds配置。

    此子句是選用的。

  • APPLY AS TRUNCATE WHEN

    指定何時應將 CDC 事件視為完整表格 TRUNCATE。 由於此子句會觸發目標資料表的完整截斷,因此它應該僅用於需要此功能的特定使用案例。

    APPLY AS TRUNCATE WHEN此子句僅支援 SCD 類型 1。 SCD 類型 2 不支援截斷作業。

    此子句是選用的。

  • SEQUENCE BY

    指定來源資料中 CDC 事件邏輯順序的欄位名稱。 管線處理會使用此排序來處理無序抵達的變更事件。

    如果排序需要多個資料行,請使用 STRUCT 運算式:它會先依第一個結構欄位排序,如果有平局,則依第二個欄位排序,依此類推。

    指定的資料行必須是可排序的資料類型。

    此條款是必需的。

  • COLUMNS

    指定要包含在目標表格中的欄位子集。 您可以選擇以下其中一項:

    • 指定要包含的欄的完整清單: COLUMNS (userId, name, city)
    • 指定要排除的資料行清單: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是選用的。

    預設是當未指定 COLUMNS 子句時,會併入目標表格中的所有欄。

  • STORED AS

    是否要將記錄儲存為 SCD 類型 1 或 SCD 類型 2。

    此子句是選用的。

    預設值為 SCD 類型 1。

  • TRACK HISTORY ON

    指定輸出直欄的子集,以便在那些指定的直欄發生任何變更時產生歷程記錄。 您可以選擇以下其中一項:

    • 指定要追蹤的欄位的完整清單: COLUMNS (userId, name, city)
    • 指定要從追蹤中排除的資料行清單: COLUMNS * EXCEPT (operation, sequenceNum)

    此子句是選用的。 預設值是在有任何變更時會追蹤所有輸出欄位的歷程記錄,這相當於 TRACK HISTORY ON *

Examples

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);