Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Utilisez l'instruction AUTO CDC ... INTO pour créer un flux qui exploite la fonctionnalité CDC (capture de données modifiées) des pipelines déclaratifs Lakeflow Spark. Cette instruction lit les modifications d’une source CDC et les applique à une cible de diffusion en continu.
- Pour en savoir plus sur la capture de données modifiées, consultez Qu’est-ce que la capture de données modifiées (CDC) ?.
- Pour plus d’informations sur l’utilisation
AUTO CDC, consultez les API AUTO CDC : Simplify change data capture with pipelines. - Pour plus d’informations sur
CREATE FLOW, consultez CREATE FLOW (pipelines).
Syntaxe
CREATE OR REFRESH STREAMING TABLE table_name;
CREATE FLOW flow_name AS AUTO CDC INTO table_name
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Vous définissez des contraintes de qualité des données pour la cible à l’aide de la même CONSTRAINT clause que d’autres requêtes de pipeline. Voir Gérer la qualité des données avec les attentes de la chaîne de traitement.
Le comportement par défaut pour INSERT et UPDATE les événements consiste à mettre à jour les événements CDC à partir de la source : mettez à jour toutes les lignes de la table cible qui correspondent aux clés spécifiées ou insère une nouvelle ligne lorsqu’un enregistrement correspondant n’existe pas dans la table cible.
DELETE La gestion des événements peut être spécifiée par la condition APPLY AS DELETE WHEN.
Important
Vous devez déclarer une table de diffusion en continu cible dans laquelle appliquer les modifications. Vous pouvez éventuellement spécifier le schéma de votre table cible. Pour les tables SCD type 2, lorsque vous spécifiez le schéma de la table cible, vous devez également inclure les colonnes __START_AT et __END_AT avec le même type de données que le champ sequence_by.
Consultez les API AUTO CDC : Simplifiez la capture de données modifiées avec des pipelines.
Paramètres
flow_nameNom du flux à créer.
sourceSource des données. La source doit être une source de diffusion en continu . Utilisez le mot clé STREAM pour utiliser la sémantique de diffusion en continu pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement. Pour ingérer des données ayant des validations de modification, vous pouvez utiliser Python et l’option
SkipChangeCommitspermettant de gérer les erreurs.Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.
KEYSColonne ou combinaison de colonnes qui identifient de manière unique une ligne dans les données sources. Les valeurs de ces colonnes sont utilisées pour identifier les événements CDC qui s’appliquent à des enregistrements spécifiques dans la table cible.
Pour définir une combinaison de colonnes, utilisez une liste séparée par des virgules de colonnes.
Cette clause est obligatoire.
IGNORE NULL UPDATESPermet d’ingérer des mises à jour contenant un sous-ensemble des colonnes cibles. Lorsqu’un événement CDC correspond à une ligne existante et que IGNORE NULL UPDATES est spécifié, les colonnes ayant une valeur
nullconservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec unenullvaleur.Cette clause est facultative.
La valeur par défaut consiste à remplacer les colonnes existantes avec des valeurs
null.APPLY AS DELETE WHENSpécifie quand un événement CDC doit être traité comme une
DELETEinsertion/mise à jour plutôt qu'une.Pour les sources SCD de type 2, pour traiter les données hors séquence, la ligne supprimée est temporairement conservée en tant que marqueur de suppression dans la table Delta sous-jacente, et une vue est créée dans le metastore pour filtrer ces marqueurs de suppression. L’intervalle de rétention peut être configuré avec la
pipelines.cdc.tombstoneGCThresholdInSecondspropriété de table.Cette clause est facultative.
APPLY AS TRUNCATE WHENSpécifie quand un événement CDC doit être traité comme une table
TRUNCATEcomplète. Étant donné que cette clause supprime toutes les données de la table cible, elle doit être utilisée uniquement pour des cas d'utilisation spécifiques nécessitant cette fonctionnalité.La
APPLY AS TRUNCATE WHENclause est prise en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge l’opération de troncation.Cette clause est facultative.
SEQUENCE BYNom de colonne spécifiant l’ordre logique des événements CDC dans les données sources. Le traitement du pipeline utilise ce séquencement pour gérer les événements de modification qui arrivent hors de commande.
Si plusieurs colonnes sont nécessaires pour l'ordonnancement, utilisez une expression
STRUCT: elle ordonnera d'abord par le premier champ de struct, puis par le deuxième champ s'il y a égalité, et ainsi de suite.Les colonnes spécifiées doivent être des types de données triables.
Cette clause est requise.
COLUMNSSpécifie un sous-ensemble de colonnes à inclure dans la table cible. Vous pouvez :
- Spécifiez la liste complète des colonnes à inclure :
COLUMNS (userId, name, city). - Spécifiez une liste de colonnes à exclure :
COLUMNS * EXCEPT (operation, sequenceNum)
Cette clause est facultative.
La valeur par défaut consiste à inclure toutes les colonnes de la table cible lorsque la
COLUMNSclause n’est pas spécifiée.- Spécifiez la liste complète des colonnes à inclure :
STORED ASIndique s’il faut stocker des enregistrements en tant que type SCD 1 ou SCD type 2.
Cette clause est facultative.
La valeur par défaut est la méthode SCD de type 1.
TRACK HISTORY ONSpécifie un sous-ensemble de colonnes de sortie pour générer des enregistrements d’historique lorsqu’il existe des modifications apportées à ces colonnes spécifiées. Vous pouvez :
- Spécifiez la liste complète des colonnes à suivre :
COLUMNS (userId, name, city). - Spécifiez une liste de colonnes à exclure du suivi :
COLUMNS * EXCEPT (operation, sequenceNum)
Cette clause est facultative. La valeur par défaut consiste à suivre l’historique de toutes les colonnes de sortie lorsqu’il existe des modifications, équivalentes à
TRACK HISTORY ON *.- Spécifiez la liste complète des colonnes à suivre :
Examples
-- Create a streaming table, then use AUTO CDC to populate it:
CREATE OR REFRESH STREAMING TABLE target;
CREATE FLOW flow
AS AUTO CDC INTO
target
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);