共用方式為


CREATE FLOW (管線)

使用陳述 CREATE FLOW 式為管線中的資料表建立流程或回填。

語法

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

參數

  • flow_name

    要建立的流程名稱。

  • 評論

    流程的可選說明。

  • 自動 CDC 啟用

    一個AUTO CDC ... INTO陳述式用來定義流程,並具有create_auto_cdc_flow_spec。 您必須包含 AUTO CDC ... INTO 陳述式,或是 INSERT INTO 陳述式。 當來源查詢使用變更資料語意時使用 AUTO CDC ... INTO

    如需詳細資訊,請參閱 AUTO CDC INTO (管線)。

  • target_table

    要更新的資料表。 這必須是串流資料表。

  • INSERT 轉換為

    定義插入至目標資料表的資料庫查詢。 如果未提供選項,則 ONCE 查詢必須是 串流 查詢。 使用 STREAM 關鍵詞來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。 若要匯入包含變更提交的數據,您可以使用 Python 和 SkipChangeCommits 選項來處理錯誤。

    INSERT INTOAUTO CDC ... INTO互斥。 當來源資料包含變更資料擷取 (CDC) 功能時使用 AUTO CDC ... INTO 。 當來源沒有時使用 INSERT INTO

    如需串流資料的詳細資訊,請參閱 用管線轉換資料

  • ONCE

    選擇性地將流程定義為一次性流程,例如回填。 使用 ONCE 將以兩種方式改變流程:

    • 來源 querycreate_auto_cdc_flow_spec 不是串流資料表。
    • 預設情況下,流程會執行一次。 如果管線已更新並進行完整重新整理,則 ONCE 流程將再次執行以重新建立資料。

範例

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;