使用陳述 CREATE FLOW 式為管線中的資料表建立流程或回填。
語法
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
參數
flow_name
要建立的流程名稱。
評論
流程的可選說明。
-
一個
AUTO CDC ... INTO陳述式用來定義流程,並具有create_auto_cdc_flow_spec。 您必須包含AUTO CDC ... INTO陳述式,或是INSERT INTO陳述式。 當來源查詢使用變更資料語意時使用AUTO CDC ... INTO。如需詳細資訊,請參閱 AUTO CDC INTO (管線)。
target_table
要更新的資料表。 這必須是串流資料表。
INSERT 轉換為
定義插入至目標資料表的資料庫查詢。 如果未提供選項,則
ONCE查詢必須是 串流 查詢。 使用 STREAM 關鍵詞來使用串流語意從來源讀取。 如果讀取遇到現有記錄的變更或刪除,則會拋出錯誤。 閱讀靜態或只能添加的來源是最安全的。 若要匯入包含變更提交的數據,您可以使用 Python 和SkipChangeCommits選項來處理錯誤。INSERT INTO與AUTO CDC ... INTO互斥。 當來源資料包含變更資料擷取 (CDC) 功能時使用AUTO CDC ... INTO。 當來源沒有時使用INSERT INTO。如需串流資料的詳細資訊,請參閱 用管線轉換資料。
ONCE
選擇性地將流程定義為一次性流程,例如回填。 使用
ONCE將以兩種方式改變流程:- 來源
query或create_auto_cdc_flow_spec不是串流資料表。 - 預設情況下,流程會執行一次。 如果管線已更新並進行完整重新整理,則
ONCE流程將再次執行以重新建立資料。
- 來源
範例
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;