Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Pomocí příkazu CREATE FLOW vytvořte toky nebo backfilly pro tabulky v kanálu.
Syntaxe
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parametry
flow_name
Název toku, který chcete vytvořit.
KOMENTÁŘ
Volitelný popis toku.
-
Výrok
AUTO CDC ... INTO, který definuje tok, screate_auto_cdc_flow_spec. Musíte buď zahrnoutAUTO CDC ... INTOpříkaz, nebo příkazINSERT INTO. PoužíváAUTO CDC ... INTOse, když zdrojový dotaz používá sémantiku změn dat.Další informace najdete v tématu AUTO CDC INTO (pipelines).
target_table
Tabulka, která se má aktualizovat. Musí se jednat o streamovací tabulku.
INSERT DO
Definuje dotaz tabulky, který se vloží do cílové tabulky. Pokud tato
ONCEmožnost není zadána, musí být dotaz streamovaným dotazem. Pomocí klíčového slova STREAM můžete ke čtení ze zdroje použít sémantiku streamování. Pokud čtení narazí na změnu nebo odstranění existujícího záznamu, vyvolá se chyba. Je nejbezpečnější číst ze statických nebo doplňovacích zdrojů. K ingestci dat, která mají commity změn, můžete použít Python a možnostSkipChangeCommitspro zpracování chyb.INSERT INTOse vzájemně vylučuje sAUTO CDC ... INTO. Používá seAUTO CDC ... INTO, když zdrojová data obsahují funkci pro zachytávání dat změn (CDC). PoužijteINSERT INTO, pokud zdroj ne.Další informace o streamovaných datech najdete v tématu Transformace dat pomocí kanálů.
JEDNOU
Volitelně můžete tok definovat jako jednorázový tok, například jako backfill. Použití
ONCEzmění tok dvěma způsoby:- Zdroj
querynebocreate_auto_cdc_flow_specnení streamovací tabulka. - Proud je ve výchozím nastavení spuštěn jednou. Pokud je piplina aktualizována kompletním obnovením,
ONCEtok se znovu spustí, aby znovu vytvořil data.
- Zdroj
Examples
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users BY NAME
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users BY NAME
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;