SKAPA FLÖDE (pipelines)

Använd -instruktionen CREATE FLOW för att skapa flöden eller återfyllnad för tabeller i en pipeline.

Syntax

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Parameterar

  • flow_name

    Namnet på flödet som ska skapas.

  • KOMMENTAR

    En valfri beskrivning av flödet.

  • AUTO CDC INTO

    En AUTO CDC ... INTO -instruktion som definierar flödet med en create_auto_cdc_flow_spec. Du måste antingen inkludera en AUTO CDC ... INTO -instruktion eller en INSERT INTO -instruktion. Använd AUTO CDC ... INTO när källfrågan använder ändra datasemantik.

    Mer information finns i AUTO CDC INTO (pipelines).

  • target_table

    Tabellen som ska uppdateras. Detta måste vara en strömningstabell.

  • INSERT IN

    Definierar en tabellfråga som infogas i måltabellen. Om alternativet ONCE inte anges måste frågan vara en direktuppspelningsfråga . Använd stream-nyckelordet för att använda strömmande semantik för att läsa från källan. Om läsningen påträffar en ändring eller borttagning av en befintlig post utlöses ett fel. Det är säkrast att läsa från statiska eller endast tilläggskällor. Om du vill mata in data som har ändringsincheckningar kan du använda Python och SkipChangeCommits alternativet för att hantera fel.

    INSERT INTO kan inte kombineras med AUTO CDC ... INTO. Använd AUTO CDC ... INTO när källdata innehåller CDC-funktionalitet (change data capture). Använd INSERT INTO när källan inte gör det.

    Mer information om strömmande data finns i Transformera data med pipelines.

  • En gång

    Du kan också definiera flödet som ett engångsflöde, till exempel en återfyllnad. Om du använder ONCE ändras flödet på två sätt:

    • Källan query eller create_auto_cdc_flow_spec är inte en strömmande tabell.
    • Flödet körs en gång som standard. Om en pipelinen uppdateras med en fullständig uppdatering, kommer ONCE-flödet att köras igen för att återskapa data.

Examples

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;