Delen via


CREATE FLOW (pijplijnen)

Gebruik de CREATE FLOW instructie om gegevensstromen of backfills te maken voor tabellen in een pijplijn.

Syntaxis

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Parameterwaarden

  • flow_name

    De naam van de werkstroom die moet worden aangemaakt.

  • COMMENTAAR

    Een optionele beschrijving voor de stroom.

  • AUTO CDC INTO

    Een AUTO CDC ... INTO instructie die de flow definieert, met een create_auto_cdc_flow_spec. Je moet een AUTO CDC ... INTO verklaring of een INSERT INTO verklaring opnemen. Gebruik AUTO CDC ... INTO wanneer de bronquery gegevenssemantiek voor wijzigingen hanteert.

    Zie AUTO CDC INTO (pijplijnen) voor meer informatie.

  • target_table

    De tabel die moet worden bijgewerkt. Dit moet een Streaming tabel zijn.

  • INSERT IN

    Hiermee definieert u een query voor een tabel die wordt ingevoegd in de doeltabel. Als de ONCE optie niet is opgegeven, moet de query een streamingquery zijn. Gebruik het trefwoord STREAM om streaming-semantiek te gebruiken om uit de bron te lezen. Als de leesbewerking een wijziging of verwijdering van een bestaande record tegenkomt, wordt er een fout gegenereerd. Het is het veiligst om te lezen uit statische of alleen bij te voegen bronnen. Als u gegevens wilt opnemen die wijzigingen doorvoeren, kunt u Python en de SkipChangeCommits optie gebruiken om fouten te verwerken.

    INSERT INTO is wederzijds exclusief met AUTO CDC ... INTO. Gebruik AUTO CDC ... INTO wanneer de brongegevens de CDC-functionaliteit (Change Data Capture) bevatten. Gebruik INSERT INTO deze functie wanneer de bron dat niet doet.

    Zie Gegevens transformeren met pijplijnenvoor meer informatie over het streamen van gegevens.

  • EENMAAL

    U kunt de stroom desgewenst definiëren als een eenmalige stroom, zoals een backfill. Door ONCE te gebruiken verandert de stroom op twee manieren:

    • De bron query of create_auto_cdc_flow_spec is geen streamingtabel.
    • De stroom wordt standaard één keer uitgevoerd. Als de pijplijn wordt bijgewerkt met een volledig refresh, zal de ONCE stroom opnieuw draaien om de gegevens te reconstrueren.

Voorbeelden

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;