Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Lorsque vous traitez de grandes quantités de données, vous avez besoin d’un pipeline qui ne peut traiter que les enregistrements nouveaux et modifiés au lieu de retraiter l’ensemble du jeu de données. C’est ce qu’on appelle l’ETL incrémentiel. Dans Databricks SQL, vous pouvez créer des pipelines ETL incrémentiels à l’aide de tables de streaming et de vues matérialisées, sans écrire de code procédural ou planifier des actualisations manuelles.
Ce tutoriel vous guide dans un modèle courant : suivi des modifications de produit au fil du temps. Vous créez une table source, capturez des événements de modification, créez une table de dimension qui conserve l’historique complet de chaque produit et ajoutez une couche de création de rapports agrégée en haut.
La fonctionnalité clé de ce didacticiel est AUTO CDC. Dans un entrepôt traditionnel, vous écrivez des instructions complexes MERGE INTO pour rapprocher les événements d’insertion, de mise à jour et de suppression dans une table cible. Cette approche est sujette aux erreurs, en particulier lorsque les événements arrivent dans le désordre.
AUTO CDC gère cela pour vous. Vous déclarez la clé métier, la colonne de séquencement et si vous souhaitez que SCD Type 1 (valeur la plus récente uniquement) ou SCD Type 2 (historique complet) et Azure Databricks applique automatiquement la logique de fusion correcte. Pour obtenir une vue d’ensemble de la capture de données modifiées, consultez les API AUTO CDC : Simplifiez la capture de données modifiées avec des pipelines.
À la fin de ce tutoriel, vous aurez :
- Création d'une table source qui suit les modifications avec le flux de données de changement.
- Inspectez les données de modification brutes pour comprendre le flux d’événements CDC.
- Utilisé
AUTO CDCpour générer une table de dimension SCD Type 2 à partir de ces événements. - Traitement des événements de suppression de manière incrémentielle via le pipeline.
- Une vue matérialisée a été créée qui gère de façon incrémentielle un rapport d’agrégation.
- Configuré
SCHEDULE REFRESH EVERY 1 DAYde sorte que les modifications se propagent automatiquement via le pipeline.
Exigences
Pour suivre ce tutoriel, vous devez répondre aux exigences suivantes :
- Espace de travail Azure Databricks avec Unity Catalog activé.
- Un entrepôt SQL (serverless ou pro).
- Disposez de l’autorisation de créer une ressource de calcul ou d’accéder à une ressource de calcul.
- Calcul serverless activé pour votre compte. Consultez Fonctionnalités avec une disponibilité régionale limitée.
Étape 1 : Configurer votre catalogue et votre schéma
Ouvrez l’éditeur Databricks SQL et définissez votre catalogue de travail et votre schéma. Vous devez disposer de l’autorisation pour USE le catalogue et le schéma que vous sélectionnez :
USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;
Étape 2 : Créer une table source et charger des données
Créez une table products avec le flux de données de changement Delta Lake activé sur Azure Databricks (CDF). CDF est une fonctionnalité Delta Lake qui enregistre chaque insertion, mise à jour et suppression en tant que journal des modifications interrogeable. Il s’agit d’un flux CDC à partir du système source transactionnel, sauf que les modifications sont capturées directement dans la table Delta plutôt qu’à partir d’un journal externe. Vous utilisez CDF ici pour générer les événements de modification que le pipeline en aval consommera.
Créez la table et chargez les enregistrements initiaux :
CREATE OR REPLACE TABLE products ( product_id INT, product_name STRING, category STRING, warehouse STRING ) TBLPROPERTIES (delta.enableChangeDataFeed = true); INSERT INTO products VALUES (1, 'Spoon', 'Cutlery', 'Seattle'), (2, 'Fork', 'Cutlery', 'Portland'), (3, 'Knife', 'Cutlery', 'Denver'), (4, 'Chair', 'Furniture', 'Austin'), (5, 'Table', 'Furniture', 'Chicago'), (6, 'Lamp', 'Lighting', 'Boston'), (7, 'Mug', 'Kitchenware', 'Seattle'), (8, 'Plate', 'Kitchenware', 'Atlanta'), (9, 'Bowl', 'Kitchenware', 'Dallas'), (10, 'Glass', 'Kitchenware', 'Phoenix');Simuler des modifications en amont, notamment de nouveaux produits, un déplacement d’entrepôt et une réaffectation de catégorie :
INSERT INTO products VALUES (11, 'Napkin', 'Dining', 'San Francisco'), (12, 'Coaster', 'Dining', 'New York'); UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1; UPDATE products SET category = 'Dining' WHERE product_id = 2;
Étape 3 : Interroger le flux de données modifiées
Avant de construire le pipeline en aval, il est utile d’examiner les événements de changement bruts afin de comprendre ce qui AUTO CDC va traiter. La table_changes() fonction lit le journal CDF et retourne chaque opération capturée avec les colonnes de métadonnées :
SELECT
product_id, product_name, warehouse,
_change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;
Par exemple, la Cuillère a trois événements : un insert (Seattle), un update_preimage (Seattle) et un update_postimage (Los Angeles).
Notez qu’une seule modification logique (par exemple, le déplacement de la cuillère vers un autre entrepôt) produit plusieurs événements : une préimage et une postimage. Dans un entrepôt traditionnel, vous devez écrire une MERGE instruction pour rapprocher tous ces événements dans une table cible, gérer les insertions, les mises à jour et les suppressions avec une logique distincte et vérifier que les événements sont appliqués dans l’ordre correct. Il s’agit exactement de la complexité qui AUTO CDC élimine à l’étape suivante.
Étape 4 : Générer une dimension SCD Type 2 avec AUTO CDC
Important
AUTO CDC est en version bêta. Nécessite Databricks Runtime 17.3 ou version ultérieure.
Une table de diffusion en continu traite les données de manière incrémentielle. À chaque actualisation, elle lit uniquement les nouvelles lignes depuis la dernière exécution. Il n’est donc pas nécessaire de retraiter le jeu de données complet. Cela permet de s’adapter parfaitement à un volume élevé ou à des sources fréquemment changeantes.
AUTO CDC ajoute le traitement de capture de données modifiées au-dessus d’une table de diffusion en continu. Au lieu d’écrire une instruction MERGE INTO qui gère manuellement les insertions, les mises à jour et les suppressions, vous déclarez la clé métier et la colonne de séquencement et laissez Azure Databricks appliquer la logique appropriée.
AUTO CDC gère également automatiquement les événements hors commande, ce qui est un problème courant lors de l’utilisation MERGE INTO d’événements arrivant à partir de systèmes distribués ou de chargements par lots avec des horodatages qui se chevauchent.
L’instruction suivante crée une table SCD Type 2 qui conserve l’historique de version complet de chaque produit. Chaque version reçoit __START_AT et __END_AT horodatages. C'est une NULL dans __END_AT qui marque la version actuelle.
CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
-
SCHEDULE REFRESH EVERY 1 DAY: actualise la table selon une planification quotidienne. -
FLOW AUTO CDC: déclare ceci en tant que flux de données CDC. Azure Databricks applique automatiquement la sémantique d’insertion, de mise à jour et de suppression. -
KEYS (product_id): clé d’entreprise. Les événements avec la même clé sont fusionnés dans des lignes versionnées. -
APPLY AS DELETE WHEN _change_type = 'delete': ferme la version actuelle lorsqu’un événement delete arrive. Cela vous permet de définir la condition qui identifie un événement delete. -
SEQUENCE BY _commit_timestamp: établit l’ordre des événements. Gère correctement les arrivées hors commande. -
STORED AS SCD TYPE 2: conserve l’historique complet.AUTO CDCprend en charge les types SCD 1 et SCD Type 2.
Interrogez la table de dimension :
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
- Cuillère : deux versions. Seattle (fermé,
__END_ATdéfini) et Los Angeles (actuel,__END_AT = NULL). - Fourche : deux versions. Catégorie Cutlery (fermée) et catégorie Repas (actuel).
- Napkin et Coaster : une version chacune (nouvellement insérée,
__END_AT = NULL). - Tous les autres produits : une version chacune (
__END_AT = NULL).
Étape 5 : Traiter les opérations de suppression via le pipeline
À présent, simulez deux produits supprimés en les supprimant de la table source :
DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;
Ces événements de suppression sont enregistrés dans le journal CDF, mais la table de diffusion en continu ne les a pas encore vues. Actualisez la table de diffusion en continu pour traiter les nouveaux événements :
REFRESH STREAMING TABLE products_history;
Interrogez la table de dimension pour vérifier que les suppressions ont été appliquées :
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
Le bol et le verre sont maintenant fermés avec __END_AT un ensemble, les marquant comme abandonnés. Tous les autres produits actuels restent inchangés. La table de diffusion en continu a traité uniquement les nouveaux événements de suppression sans retraiter les insertions et les mises à jour de l’actualisation précédente.
Étape 6 : Créer une vue matérialisée agrégée
Maintenant que vous disposez d’une table de dimension qui reste à jour avec les modifications sources, vous pouvez ajouter une couche de création de rapports en haut.
Une vue matérialisée stocke les résultats de requête précalcalisés sous forme de table physique. Contrairement à une vue régulière, qui réexécute la requête chaque fois que vous y lisez, une vue matérialisée conserve les résultats et recompute uniquement les lignes affectées par les modifications en amont sur chaque actualisation. Cela permet de s’adapter parfaitement aux tableaux de bord et aux rapports où les performances des requêtes sont importantes.
CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
category,
COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;
SCHEDULE REFRESH EVERY 1 DAY signifie que cette vue est actualisée selon une planification quotidienne. Combiné avec la même planification sur la table de diffusion en continu, vous disposez maintenant d’un pipeline en trois étapes où les modifications apportées à la table source se produisent en cascade dans la dimension et dans l’agrégat sur chaque cycle d’actualisation. Il n’existe aucune actualisation manuelle à exécuter.
SELECT * FROM products_by_category ORDER BY active_products DESC;
Étape 7 : Vérifier la cascade de bout en bout
Pour vérifier la cascade de pipeline complète, apportez une modification à la table source :
UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;
Le couteau passe de Denver à Seattle. Cette modification DML unique déclenche la cascade complète du pipeline, ce qui montre comment les trois étapes fonctionnent ensemble :
-
productsenregistre l’événement de modification via CDF. -
products_historytraite l’événement et ajoute une nouvelle version pour le Couteau. -
products_by_categoryrecompute uniquement la ligne Cutlery affectée.
Vérifier:
SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;
SELECT * FROM products_by_category ORDER BY active_products DESC;
Nettoyage
Pour nettoyer les ressources créées par ce didacticiel, utilisez le code SQL suivant :
DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;