Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Använd -instruktionen CREATE FLOW för att skapa flöden eller återfyllnad för tabeller i en pipeline.
Syntax
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Parameterar
flow_name
Namnet på flödet som ska skapas.
KOMMENTAR
En valfri beskrivning av flödet.
-
En
AUTO CDC ... INTO-instruktion som definierar flödet med encreate_auto_cdc_flow_spec. Du måste antingen inkludera enAUTO CDC ... INTO-instruktion eller enINSERT INTO-instruktion. AnvändAUTO CDC ... INTOnär källfrågan använder ändra datasemantik.Mer information finns i AUTO CDC INTO (pipelines).
target_table
Tabellen som ska uppdateras. Detta måste vara en strömningstabell.
INSERT IN
Definierar en tabellfråga som infogas i måltabellen. Om alternativet
ONCEinte anges måste frågan vara en direktuppspelningsfråga . Använd stream-nyckelordet för att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor. Om du vill mata in data som har ändringsincheckningar kan du använda Python ochSkipChangeCommitsalternativet för att hantera fel.INSERT INTOkan inte kombineras medAUTO CDC ... INTO. AnvändAUTO CDC ... INTOnär källdata innehåller CDC-funktionalitet (change data capture). AnvändINSERT INTOnär källan inte gör det.Mer information om strömmande data finns i Transformera data med pipelines.
En gång
Du kan också definiera flödet som ett engångsflöde, till exempel en återfyllnad. Om du använder
ONCEändras flödet på två sätt:- Källan
queryellercreate_auto_cdc_flow_specär inte en strömmande tabell. - Flödet körs en gång som standard. Om en pipelinen uppdateras med en fullständig uppdatering, kommer
ONCE-flödet att köras igen för att återskapa data.
- Källan
Examples
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;