Remarque
L’accès à cette page requiert une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page requiert une autorisation. Vous pouvez essayer de modifier des répertoires.
S’applique à :
Databricks SQL
Important
Cette fonctionnalité est en version bêta. Nécessite Databricks Runtime 17.3 et versions ultérieures.
Utilisez la FLOW AUTO CDC clause avec CREATE STREAMING TABLE pour traiter les enregistrements de capture de données modifiées (CDC) d’une source dans une table de diffusion en continu.
Auparavant, l’instruction MERGE INTO était couramment utilisée pour traiter les enregistrements CDC sur Azure Databricks. Toutefois, MERGE INTO il peut produire des résultats incorrects en raison d’enregistrements hors séquence ou nécessite une logique complexe pour réorganiser les enregistrements.
AUTO CDC simplifie la capture de données modifiées en gérant automatiquement les enregistrements hors commande. Vous spécifiez des clés pour identifier les enregistrements, une colonne de séquence pour l’ordre et si vous souhaitez stocker les résultats en tant que SCD type 1 (mises à jour directes) ou SCD type 2 (suivi de l’historique).
Syntaxe
CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]
Le comportement par défaut pour INSERT et UPDATE les événements consiste à mettre à jour les événements CDC à partir de la source : mettez à jour les lignes de la table cible qui correspondent aux clés spécifiées ou insérez une nouvelle ligne lorsqu’un enregistrement correspondant n’existe pas dans la table cible.
DELETE La gestion des événements peut être spécifiée par la condition APPLY AS DELETE WHEN.
Paramètres
sourceSource des données. La source doit être une source de diffusion en continu. Utilisez le mot clé
STREAMpour utiliser la sémantique de streaming pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement.Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.
KEYSColonne ou combinaison de colonnes qui identifient de manière unique une ligne dans les données sources. Les valeurs de ces colonnes sont utilisées pour identifier les événements CDC qui s’appliquent à des enregistrements spécifiques dans la table cible.
Pour définir une combinaison de colonnes, utilisez une liste séparée par des virgules de colonnes.
Cette clause est requise.
IGNORE NULL UPDATESPermet d’ingérer des mises à jour contenant un sous-ensemble des colonnes cibles. Lorsqu’un événement CDC correspond à une ligne existante et
IGNORE NULL UPDATESest spécifié, les colonnes avec unenullvaleur conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec unenullvaleur.Cette clause est facultative.
La valeur par défaut consiste à remplacer les colonnes existantes avec des valeurs
null.APPLY AS DELETE WHENSpécifie quand un événement CDC doit être traité comme une
DELETEinsertion/mise à jour plutôt qu'une.Pour les sources SCD de type 2, pour traiter les données hors séquence, la ligne supprimée est temporairement conservée en tant que marqueur de suppression dans la table Delta sous-jacente, et une vue est créée dans le metastore pour filtrer ces marqueurs de suppression. L’intervalle de rétention peut être configuré avec la
pipelines.cdc.tombstoneGCThresholdInSecondspropriété de table.Cette clause est facultative.
APPLY AS TRUNCATE WHENSpécifie quand un événement CDC doit être traité comme une table
TRUNCATEcomplète. Étant donné que cette clause supprime toutes les données de la table cible, elle doit être utilisée uniquement pour des cas d'utilisation spécifiques nécessitant cette fonctionnalité.La
APPLY AS TRUNCATE WHENclause est prise en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge l’opération de troncation.Cette clause est facultative.
SEQUENCE BYNom de colonne spécifiant l’ordre logique des événements CDC dans les données sources. Le traitement du pipeline utilise ce séquencement pour gérer les événements de modification qui arrivent hors de commande.
Si plusieurs colonnes sont nécessaires pour l'ordonnancement, utilisez une expression
STRUCT: elle ordonnera d'abord par le premier champ de struct, puis par le deuxième champ s'il y a égalité, et ainsi de suite.Les colonnes spécifiées doivent être des types de données triables.
Cette clause est requise.
COLUMNSSpécifie un sous-ensemble de colonnes à inclure dans la table cible. Vous pouvez :
- Spécifiez la liste complète des colonnes à inclure :
COLUMNS (userId, name, city). - Spécifiez une liste de colonnes à exclure :
COLUMNS * EXCEPT (operation, sequenceNum)
Cette clause est facultative.
La valeur par défaut consiste à inclure toutes les colonnes de la table cible lorsque la
COLUMNSclause n’est pas spécifiée.- Spécifiez la liste complète des colonnes à inclure :
STORED ASIndique s’il faut stocker des enregistrements en tant que type SCD 1 ou SCD type 2.
Cette clause est facultative.
La valeur par défaut est la méthode SCD de type 1.
TRACK HISTORY ONSpécifie un sous-ensemble de colonnes de sortie pour générer des enregistrements d’historique lorsqu’il existe des modifications apportées à ces colonnes spécifiées. Vous pouvez :
- Spécifiez la liste complète des colonnes à suivre :
COLUMNS (userId, name, city). - Spécifiez une liste de colonnes à exclure du suivi :
COLUMNS * EXCEPT (operation, sequenceNum)
Cette clause est facultative. La valeur par défaut consiste à suivre l’historique de toutes les colonnes de sortie lorsqu’il existe des modifications, équivalentes à
TRACK HISTORY ON *.- Spécifiez la liste complète des colonnes à suivre :
Exemples
-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
SEQUENCE BY sequenceNum
STORED AS SCD TYPE 1;
-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2;
-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
TBLPROPERTIES(pipelines.channel = "PREVIEW")
FLOW AUTO CDC
FROM stream(cdc_data.users)
KEYS (userId)
APPLY AS DELETE WHEN operation = "DELETE"
SEQUENCE BY sequenceNum
COLUMNS * EXCEPT (operation, sequenceNum)
STORED AS SCD TYPE 2
TRACK HISTORY ON * EXCEPT (city);