Partage via


CREATE STREAMING TABLE ... FLOW AUTO CDC

S’applique à :case cochée oui Databricks SQL

Important

Cette fonctionnalité est en version bêta. Nécessite Databricks Runtime 17.3 et versions ultérieures.

Utilisez la FLOW AUTO CDC clause avec CREATE STREAMING TABLE pour traiter les enregistrements de capture de données modifiées (CDC) d’une source dans une table de diffusion en continu.

Auparavant, l’instruction MERGE INTO était couramment utilisée pour traiter les enregistrements CDC sur Azure Databricks. Toutefois, MERGE INTO il peut produire des résultats incorrects en raison d’enregistrements hors séquence ou nécessite une logique complexe pour réorganiser les enregistrements.

AUTO CDC simplifie la capture de données modifiées en gérant automatiquement les enregistrements hors commande. Vous spécifiez des clés pour identifier les enregistrements, une colonne de séquence pour l’ordre et si vous souhaitez stocker les résultats en tant que SCD type 1 (mises à jour directes) ou SCD type 2 (suivi de l’historique).

Syntaxe

CREATE OR REFRESH STREAMING TABLE table_name
FLOW AUTO CDC
FROM source
KEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}]
[TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

Le comportement par défaut pour INSERT et UPDATE les événements consiste à mettre à jour les événements CDC à partir de la source : mettez à jour les lignes de la table cible qui correspondent aux clés spécifiées ou insérez une nouvelle ligne lorsqu’un enregistrement correspondant n’existe pas dans la table cible. DELETE La gestion des événements peut être spécifiée par la condition APPLY AS DELETE WHEN.

Paramètres

  • source

    Source des données. La source doit être une source de diffusion en continu. Utilisez le mot clé STREAM pour utiliser la sémantique de streaming pour lire à partir de la source. Si la lecture détecte une modification ou une suppression concernant un enregistrement existant, une erreur est générée. Il est plus sûr de lire depuis des sources statiques ou d’ajout uniquement.

    Pour plus d’informations sur la diffusion en continu des données, consultez Transformer des données avec des pipelines.

  • KEYS

    Colonne ou combinaison de colonnes qui identifient de manière unique une ligne dans les données sources. Les valeurs de ces colonnes sont utilisées pour identifier les événements CDC qui s’appliquent à des enregistrements spécifiques dans la table cible.

    Pour définir une combinaison de colonnes, utilisez une liste séparée par des virgules de colonnes.

    Cette clause est requise.

  • IGNORE NULL UPDATES

    Permet d’ingérer des mises à jour contenant un sous-ensemble des colonnes cibles. Lorsqu’un événement CDC correspond à une ligne existante et IGNORE NULL UPDATES est spécifié, les colonnes avec une null valeur conservent leurs valeurs existantes dans la cible. Cela s’applique également aux colonnes imbriquées avec une null valeur.

    Cette clause est facultative.

    La valeur par défaut consiste à remplacer les colonnes existantes avec des valeurs null.

  • APPLY AS DELETE WHEN

    Spécifie quand un événement CDC doit être traité comme une DELETE insertion/mise à jour plutôt qu'une.

    Pour les sources SCD de type 2, pour traiter les données hors séquence, la ligne supprimée est temporairement conservée en tant que marqueur de suppression dans la table Delta sous-jacente, et une vue est créée dans le metastore pour filtrer ces marqueurs de suppression. L’intervalle de rétention peut être configuré avec la pipelines.cdc.tombstoneGCThresholdInSecondspropriété de table.

    Cette clause est facultative.

  • APPLY AS TRUNCATE WHEN

    Spécifie quand un événement CDC doit être traité comme une table TRUNCATEcomplète. Étant donné que cette clause supprime toutes les données de la table cible, elle doit être utilisée uniquement pour des cas d'utilisation spécifiques nécessitant cette fonctionnalité.

    La APPLY AS TRUNCATE WHEN clause est prise en charge uniquement pour le type SCD 1. Le type SCD 2 ne prend pas en charge l’opération de troncation.

    Cette clause est facultative.

  • SEQUENCE BY

    Nom de colonne spécifiant l’ordre logique des événements CDC dans les données sources. Le traitement du pipeline utilise ce séquencement pour gérer les événements de modification qui arrivent hors de commande.

    Si plusieurs colonnes sont nécessaires pour l'ordonnancement, utilisez une expression STRUCT : elle ordonnera d'abord par le premier champ de struct, puis par le deuxième champ s'il y a égalité, et ainsi de suite.

    Les colonnes spécifiées doivent être des types de données triables.

    Cette clause est requise.

  • COLUMNS

    Spécifie un sous-ensemble de colonnes à inclure dans la table cible. Vous pouvez :

    • Spécifiez la liste complète des colonnes à inclure : COLUMNS (userId, name, city).
    • Spécifiez une liste de colonnes à exclure : COLUMNS * EXCEPT (operation, sequenceNum)

    Cette clause est facultative.

    La valeur par défaut consiste à inclure toutes les colonnes de la table cible lorsque la COLUMNS clause n’est pas spécifiée.

  • STORED AS

    Indique s’il faut stocker des enregistrements en tant que type SCD 1 ou SCD type 2.

    Cette clause est facultative.

    La valeur par défaut est la méthode SCD de type 1.

  • TRACK HISTORY ON

    Spécifie un sous-ensemble de colonnes de sortie pour générer des enregistrements d’historique lorsqu’il existe des modifications apportées à ces colonnes spécifiées. Vous pouvez :

    • Spécifiez la liste complète des colonnes à suivre : COLUMNS (userId, name, city).
    • Spécifiez une liste de colonnes à exclure du suivi : COLUMNS * EXCEPT (operation, sequenceNum)

    Cette clause est facultative. La valeur par défaut consiste à suivre l’historique de toutes les colonnes de sortie lorsqu’il existe des modifications, équivalentes à TRACK HISTORY ON *.

Exemples

-- SCD type 1: apply CDC changes with direct updates (no history)
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  SEQUENCE BY sequenceNum
  STORED AS SCD TYPE 1;

-- SCD type 2: retain a history of changes, with delete handling
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2;

-- SCD type 2 with history tracking on specific columns
> CREATE OR REFRESH STREAMING TABLE target
  TBLPROPERTIES(pipelines.channel = "PREVIEW")
  FLOW AUTO CDC
  FROM stream(cdc_data.users)
  KEYS (userId)
  APPLY AS DELETE WHEN operation = "DELETE"
  SEQUENCE BY sequenceNum
  COLUMNS * EXCEPT (operation, sequenceNum)
  STORED AS SCD TYPE 2
  TRACK HISTORY ON * EXCEPT (city);