Partager via


CRÉER UN FLUX (pipelines)

Utilisez l’instruction CREATE FLOW pour créer des flux ou des remplissages pour les tables d’un pipeline.

Syntaxe

CREATE FLOW flow_name [COMMENT comment] AS
{
  AUTO CDC INTO target_table create_auto_cdc_flow_spec |
  INSERT INTO [ONCE] target_table BY NAME query
}

Paramètres

  • flow_name

    Nom du flux à créer.

  • COMMENTAIRE

    Description facultative du flux.

  • AUTO CDC INTO

    Instruction AUTO CDC ... INTO qui définit le flux, avec un create_auto_cdc_flow_spec. Vous devez inclure une AUTO CDC ... INTO instruction ou une INSERT INTO instruction. Utilisez AUTO CDC ... INTO quand la requête source utilise la sémantique des données modifiées.

    Pour plus d’informations, consultez AUTO CDC INTO (pipelines).

  • target_table

    Table à mettre à jour. Il doit s’agir d’une table de streaming.

  • INSERT DANS

    Définit une requête de table insérée dans la table cible. Si l’option ONCE n’est pas fournie, la requête doit être une requête de diffusion en continu . Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’option SkipChangeCommits permettant de gérer les erreurs.

    INSERT INTO s’exclue mutuellement avec AUTO CDC ... INTO. Utiliser AUTO CDC ... INTO lorsque les données sources incluent des fonctionnalités de capture de données modifiées (CDC). Utilisez INSERT INTO quand la source ne le fait pas.

    Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.

  • ONCE

    Si vous le souhaitez, définissez le flux comme un flux à usage unique, tel qu’un remblai. L'utilisation de ONCE change le flux de deux manières :

    • La source query ou create_auto_cdc_flow_spec n’est pas une table de flux.
    • Le flux est exécuté une fois par défaut. Si le pipeline est mis à jour avec une actualisation complète, le ONCE flux s’exécute à nouveau pour recréer les données.

Examples

-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;

-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users BY NAME
SELECT * FROM stream(raw_data.users);

-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users BY NAME
SELECT * FROM user_backfill_table;

-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;

-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;