Capture simplifiée des changements de données avec l’API APPLY CHANGES dans Delta Live Tables

Delta Live Tables simplifie la capture de changements de données (CDC) avec l’API APPLY CHANGES. Auparavant, l’instruction MERGE INTO était couramment utilisée pour traiter les enregistrements CDC sur Azure Databricks. Toutefois, MERGE INTO peut produire des résultats incorrects en raison d’enregistrements hors séquence ou requiert une logique complexe pour réorganiser les enregistrements.

En gérant automatiquement les enregistrements hors séquence, l’API APPLY CHANGES dans Delta Live Tables garantit le traitement correct des enregistrements CDC et supprime la nécessité de développer une logique complexe pour gérer les enregistrements hors séquence.

L’API APPLY CHANGES est prise en charge dans les interfaces SQL et Python Delta Live Tables, notamment la prise en charge de la mise à jour des tables avec le type 1 et le type 2 SCD :

  • Utilisez la méthode SCD de type 1 pour mettre à jour les enregistrements directement. L’historique n’est pas conservé pour les enregistrements mis à jour.
  • Utilisez SCD type 2 pour conserver un historique des enregistrements, soit sur toutes les mises à jour, soit sur les mises à jour d’un ensemble de colonnes spécifié.

Pour connaître la syntaxe et d’autres références, consultez :

Remarque

Cet article explique comment mettre à jour les tables dans votre pipeline Delta Live Tables en fonction des modifications apportées aux données sources. Pour savoir comment enregistrer et interroger des informations sur les changements au niveau des lignes pour les tables Delta, consultez Utiliser le flux des changements de données Delta Lake sur Azure Databricks.

Comment CDC est-elle implémentée avec Delta Live Tables ?

Vous devez spécifier une colonne sur laquelle séquencer des enregistrements dans les données sources, que Delta Live Tables interprète comme une représentation monotone croissante de l’ordre correct des données sources. Delta Live Tables gère automatiquement les données qui arrivent dans le désordre. Pour les modifications de type SCD 2, les tables dynamiques Delta propagent les valeurs de séquencement appropriées aux colonnes __START_AT et __END_AT de la table cible. Il devrait y avoir une mise à jour distincte par clé pour chaque valeur de séquencement, et les valeurs de séquencement NULL ne sont pas prises en charge.

Pour effectuer le traitement CDC avec Delta Live Tables, créez d’abord une table de diffusion, puis utilisez une instruction APPLY CHANGES INTO pour spécifier la source, les clés et le séquencement du flux de modification. Pour créer la table de diffusion cible, utilisez l’instruction CREATE OR REFRESH STREAMING TABLE dans SQL ou la fonction create_streaming_table() dans Python. Pour créer l’instruction définissant le traitement CDC, utilisez l’instruction APPLY CHANGES dans SQL ou la fonction apply_changes() dans Python. Pour plus d’informations sur la syntaxe, consultez Capture de données modifiées avec SQL dans les tables dynamiques Delta ou Capture de données modifiées avec Python dans Delta Live Tables.

Quels objets de données sont utilisés pour le traitement CDC Delta Live Tables ?

Lorsque vous déclarez la table cible dans le metastore Hive, deux structures de données sont créées :

  • Une vue utilisant le nom attribué à la table cible.
  • Une table de stockage interne utilisée par Delta Live Tables pour gérer le traitement CDC. Cette table est nommée en rajoutant __apply_changes_storage_ au début du nom de la table cible.

Par exemple, si vous déclarez une table cible nommée dlt_cdc_target, vous verrez une vue nommée dlt_cdc_target et une table nommée __apply_changes_storage_dlt_cdc_target dans le metastore. La création d’une vue permet à Delta Live Tables de filtrer les informations supplémentaires (par exemple, les objets tombstone et les versions) requises pour gérer les données désordonnées. Pour afficher les données traitées, interrogez la vue cible. Étant donné que le schéma de la table __apply_changes_storage_ peut changer pour prendre en charge les fonctionnalités ou améliorations futures, vous ne devez pas interroger la table pour une utilisation en production. Si vous ajoutez manuellement des données à la table, les enregistrements sont supposés venir avant d’autres modifications, car les colonnes de version sont manquantes.

Si un pipeline publie quelque chose dans le catalogue Unity, les tables de stockage internes ne sont pas accessibles aux utilisateurs.

Obtenir des données sur les enregistrements traités par une requête CDC Delta Live Tables

Les métriques suivantes sont capturées par les requêtes apply changes :

  • num_upserted_rows : nombre de lignes de sortie mises à jour/insérées dans le jeu de données.
  • num_deleted_rows : nombre de lignes de sortie existantes supprimées du jeu de données pendant une mise à jour.

La métrique num_output_rows, sortie pour les flux non CDC, n’est pas capturée pour les requêtes apply changes.

Limites

La cible de la requête APPLY CHANGES INTO ou de la fonction apply_changes ne peut pas être utilisée comme source pour une table de diffusion en continu. Une table qui lit la cible d’une requête APPLY CHANGES INTO ou d’une fonction apply_changes doit être une vue matérialisée.

SCD type 1 et SCD type 2 sur Azure Databricks

Les sections suivantes illustrent les requêtes SCD Delta Live Tables de type 1 et de type 2 qui mettent à jour les tables cibles en fonction des événements sources qui :

  1. Créent des enregistrements utilisateur.
  2. Suppriment un enregistrement utilisateur.
  3. Mettent à jour les enregistrements utilisateur. Dans la méthode SCD de type 1, les dernières opérations UPDATE arrivent en retard et sont supprimées de la table cible, montrant ainsi la gestion des événements qui se produisent dans le désordre.

Les exemples suivants supposent que vous connaissez bien la configuration et la mise à jour des pipelines Delta Live Tables. Consulter Tutoriel : Exécuter votre premier pipeline Delta Live Tables.

Pour exécuter ces exemples, vous devez commencer par créer un exemple de jeu de données. Consulter Générer des données de test.

Voici les enregistrements d’entrée de ces exemples :

userId name city opération sequenceNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lily Cancun INSERT 2
123 null null Suppression 6
125 Mercedes Guadalajara UPDATE 6
125 Mercedes Mexicali UPDATE 5
123 Isabel Chihuahua UPDATE 5

Si vous supprimez les marques de commentaire de la ligne finale dans les exemples de données, l’enregistrement suivant qui spécifie où les enregistrements doivent être tronqués sera inséré :

userId name city opération sequenceNum
null null null TRUNCATE 3

Remarque

Tous les exemples suivants incluent des options permettant de spécifier les opérations DELETE et TRUNCATE , mais chacune d’elles est facultative.

Traiter les mises à jour de SCD de type 1

L’exemple de code suivant illustre le traitement des mises à jour de SCD de type 1 :

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Après exécution de l’exemple SCD de type 1, la table cible contient les enregistrements suivants :

userId name city
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lily Cancun

Après avoir exécuté l’exemple du type SCD 1 avec l’enregistrement TRUNCATE supplémentaire, les enregistrements 124 et 126 sont tronqués en raison de l’opération TRUNCATE sur sequenceNum=3, et la table cible contient l’enregistrement suivant :

userId name city
125 Mercedes Guadalajara

Traiter les mises à jour de SCD de type 2

L’exemple de code suivant illustre le traitement des mises à jour de SCD de type 2 :

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Après exécution de l’exemple SCD de type 2, la table cible contient les enregistrements suivants :

userId name city __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 null
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 null
126 Lily Cancun 2 null

Une requête de type 2 SCD peut aussi indiquer un sous-ensemble de colonnes de sortie à suivre pour l’historique dans la table cible. Les modifications apportées à d’autres colonnes sont mises à jour au lieu de générer de nouveaux enregistrements d’historique. L’exemple suivant illustre l’exclusion de la colonne city du suivi :

L’exemple suivant illustre l’utilisation de l’historique de suivi avec le SCD de type 2 :

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.format("delta").table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Après exécution de cet exemple sans l’enregistrement TRUNCATE supplémentaire, la table cible contient les enregistrements suivants :

userId name city __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 null
125 Mercedes Guadalajara 2 null
126 Lily Cancun 2 null

Générer des données de test

Le code ci-dessous est fourni pour générer un exemple de jeu de données à utiliser dans les exemples de requêtes présents dans ce tutoriel. En supposant que vous disposez des informations d’identification appropriées pour créer un schéma et créer une table, vous pouvez exécuter ces instructions avec un notebook ou Databricks SQL. Le code suivant n’est pas destiné à être exécuté dans le cadre d’un pipeline Delta Live Tables :

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Ajouter, modifier ou supprimer des données dans une table de streaming

Si votre pipeline publie des tables sur le catalogue Unity, vous pouvez utiliser des instructions de langage de manipulation de données (DML), notamment les instructions d’insertion, de mise à jour, de suppression et de fusion, pour modifier les tables de diffusion cible créées par des instructions APPLY CHANGES INTO.

Remarque

  • Les instructions DML qui modifient le schéma de table d’une table de streaming ne sont pas prises en charge. Assurez-vous que vos instructions DML ne tentent pas de faire évoluer le schéma de table.
  • Les instructions DML qui mettent à jour une table de streaming ne peuvent être exécutées que dans un cluster Unity Catalog partagé ou un entrepôt SQL à l’aide de Databricks Runtime 13.1 et versions ultérieures.
  • Étant donné que le streaming nécessite des sources de données en ajout uniquement, si votre traitement nécessite le streaming à partir d'une table source de streaming avec des modifications (par exemple, via des instructions DML), définissez l'indicateur skipChangeCommits lors de la lecture de la table de streaming source. Lorsque skipChangeCommits est défini, les transactions qui suppriment ou modifient des enregistrements sur la table source sont ignorées. Si votre traitement ne nécessite pas de table de diffusion, vous pouvez utiliser une vue matérialisée (qui n’a pas la restriction d’ajout uniquement) comme table cible.

Étant donné que Delta Live Tables utilise une colonne SEQUENCE BY spécifiée et propage les valeurs de séquencement appropriées aux colonnes __START_AT et __END_AT de la table cible (pour le SCD de type 2), vous devez vous assurer que les instructions DML utilisent des valeurs valides pour ces colonnes afin de conserver l’ordre approprié des enregistrements. Consulter Comment CDC est-elle implémentée avec Delta Live Tables ?.

Pour plus d’informations sur l’utilisation d’instructions DML avec des tables de diffusion, consultez Ajouter, modifier ou supprimer des données dans une table de diffusion.

L’exemple suivant insère un enregistrement actif avec une séquence de début de 5 :

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);