Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Utilisez l’instruction CREATE FLOW pour créer des flux ou des remplissages pour les tables d’un pipeline.
Syntaxe
CREATE FLOW flow_name [COMMENT comment] AS
{
AUTO CDC INTO target_table create_auto_cdc_flow_spec |
INSERT INTO [ONCE] target_table BY NAME query
}
Paramètres
flow_name
Nom du flux à créer.
COMMENTAIRE
Description facultative du flux.
-
Instruction
AUTO CDC ... INTOqui définit le flux, avec uncreate_auto_cdc_flow_spec. Vous devez inclure uneAUTO CDC ... INTOinstruction ou uneINSERT INTOinstruction. UtilisezAUTO CDC ... INTOquand la requête source utilise la sémantique des données modifiées.Pour plus d’informations, consultez AUTO CDC INTO (pipelines).
target_table
Table à mettre à jour. Il doit s’agir d’une table de streaming.
INSERT DANS
Définit une requête de table insérée dans la table cible. Si l’option
ONCEn’est pas fournie, la requête doit être une requête de diffusion en continu . Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’optionSkipChangeCommitspermettant de gérer les erreurs.INSERT INTOs’exclue mutuellement avecAUTO CDC ... INTO. UtiliserAUTO CDC ... INTOlorsque les données sources incluent des fonctionnalités de capture de données modifiées (CDC). UtilisezINSERT INTOquand la source ne le fait pas.Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.
ONCE
Si vous le souhaitez, définissez le flux comme un flux à usage unique, tel qu’un remblai. L'utilisation de
ONCEchange le flux de deux manières :- La source
queryoucreate_auto_cdc_flow_specn’est pas une table de flux. - Le flux est exécuté une fois par défaut. Si le pipeline est mis à jour avec une actualisation complète, le
ONCEflux s’exécute à nouveau pour recréer les données.
- La source
Examples
-- EXAMPLE 1:
-- Create a streaming table, and add two flows that append data to it:
CREATE OR REFRESH STREAMING TABLE users;
-- first flow into target_table:
CREATE FLOW users_flow AS
INSERT INTO users BY NAME
SELECT * FROM stream(raw_data.users);
-- second flow into target_table:
CREATE FLOW backfill_users AS
INSERT INTO ONCE users BY NAME
SELECT * FROM user_backfill_table;
-- EXAMPLE 2:
-- Create a streaming table, and add a flow that applies CDC changes to it:
CREATE OR REFRESH STREAMING TABLE admins_cdc_target_table;
-- first flow into target_table:
CREATE FLOW admin_cdc_flow AS
AUTO CDC INTO admins_cdc_target_table
FROM stream(cdc_data.admins)
KEYS (userId)
APPLY AS DELETE WHEN
operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;