Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Découvrez comment créer et déployer un pipeline ETL (extraire, transformer et charger) pour l’orchestration des données à l’aide de Pipelines déclaratifs Lakeflow et du chargeur automatique. Un pipeline ETL implémente les étapes permettant de lire des données à partir de systèmes sources, de transformer ces données en fonction des exigences, telles que les vérifications de qualité des données et la déduplication des enregistrements, et d’écrire les données dans un système cible, comme un entrepôt de données ou un lac de données.
Dans ce tutoriel, vous allez utiliser les pipelines déclaratifs Lakeflow et le chargeur automatique pour :
- Ingérer des données sources brutes dans une table cible.
- Transformez les données sources brutes et écrivez les données transformées en deux vues matérialisées cibles.
- Interrogez les données transformées.
- Automatisez le pipeline ETL avec une tâche Databricks.
Pour plus d’informations sur les pipelines déclaratifs Lakeflow et le chargeur automatique, consultez Pipelines déclaratifs Lakeflow et Qu’est-ce que le chargeur automatique ?
Spécifications
Pour suivre ce tutoriel, vous devez répondre aux exigences suivantes :
- Connectez-vous à un espace de travail Azure Databricks.
- Assurez-vous que le catalogue Unity est activé pour votre espace de travail.
- Le calcul serverless est activé pour votre compte. Les pipelines déclaratifs de Serverless Lakeflow ne sont pas disponibles dans certaines régions de l’espace de travail. Consultez Fonctionnalités avec une disponibilité régionale limitée pour les régions disponibles.
- Disposez de l’autorisation de créer une ressource de calcul ou d’accéder à une ressource de calcul.
- Disposez des autorisations nécessaires pour créer un schéma dans un catalogue. Les autorisations nécessaires sont
ALL PRIVILEGES
ouUSE CATALOG
etCREATE SCHEMA
. - Disposez des autorisations nécessaires pour créer un volume dans un schéma existant. Les autorisations nécessaires sont
ALL PRIVILEGES
ouUSE SCHEMA
etCREATE VOLUME
.
À propos du jeu de données
Le jeu de données utilisé dans cet exemple est un sous-ensemble du jeu de données Million Song, une collection de caractéristiques et de métadonnées pour des morceaux de musique contemporains. Ce jeu de données est disponible dans les exemples de jeux de données compris dans votre espace de travail Azure Databricks.
Étape 1 : Créer un pipeline
Tout d’abord, vous allez créer un pipeline ETL dans Lakeflow Declarative Pipelines. Lakeflow Declarative Pipelines crée des pipelines en résolvant les dépendances définies dans les notebooks ou les fichiers (appelés code source) en utilisant la syntaxe de Lakeflow Declarative Pipelines. Chaque fichier de code source ne peut contenir qu’une seule langue, mais vous pouvez ajouter plusieurs notebooks ou fichiers spécifiques à la langue dans le pipeline. Pour en savoir plus, consultez Pipelines Déclaratifs Lakeflow
Important
Laissez le champ de code source vide pour créer et configurer automatiquement un bloc-notes pour la création de code source.
Ce tutoriel utilise le calcul serverless et le catalogue Unity. Pour toutes les options de configuration qui ne sont pas spécifiées, utilisez les paramètres par défaut. Si le calcul serverless n’est pas activé ou pris en charge dans votre espace de travail, vous pouvez suivre le didacticiel comme écrit à l’aide des paramètres de calcul par défaut. Si vous utilisez les paramètres de calcul par défaut, vous devez sélectionner manuellement Le catalogue Unity sous Options de stockage dans la section Destination de l’interface utilisateur de création d’un pipeline .
Pour créer un nouveau pipeline ETL dans les pipelines déclaratifs de Lakeflow, procédez comme suit :
- Dans votre espace de travail, cliquez sur
Travaux & Pipelines dans la barre latérale.
- Sous Nouveau, cliquez sur Pipeline ETL.
- Dans Pipeline name, saisissez un nom de pipeline unique.
- Cochez la case Serverless .
- Dans Destination, pour configurer un emplacement de catalogue Unity où les tables sont publiées, sélectionnez un catalogue existant et écrivez un nouveau nom dans Schéma pour créer un schéma dans votre catalogue.
- Cliquez sur Créer.
L’interface utilisateur des pipelines s’affiche pour le nouveau pipeline.
Étape 2 : Développer un pipeline
Important
Les notebooks ne peuvent contenir qu’un seul langage de programmation. Ne mélangez pas le code Python et SQL dans les notebooks de code source de pipeline.
Dans cette étape, vous allez utiliser Databricks Notebooks pour développer et valider du code source pour les pipelines déclaratifs Lakeflow de manière interactive.
Le code utilise le chargeur automatique pour l’ingestion de données incrémentielles. Auto Loader détecte et traite automatiquement les nouveaux fichiers à mesure qu’ils arrivent dans le stockage d’objets cloud. Pour plus d’informations, consultez Qu’est-ce que le chargeur automatique ?
Un bloc-notes de code source vide est automatiquement créé et configuré pour le pipeline. Le bloc-notes est créé dans un nouveau répertoire dans votre répertoire utilisateur. Le nom du nouveau répertoire et du nouveau fichier correspondent au nom de votre pipeline. Par exemple : /Users/someone@example.com/my_pipeline/my_pipeline
.
Lors du développement d’un pipeline, vous pouvez choisir Python ou SQL. Des exemples sont inclus pour les deux langues. En fonction de votre choix de langue, vérifiez que vous sélectionnez la langue du bloc-notes par défaut. Pour en savoir plus sur la prise en charge des notebooks dans le développement de code pour les pipelines déclaratifs Lakeflow, consultez Développer et déboguer des pipelines ETL avec un notebook dans les pipelines déclaratifs Lakeflow.
Un lien permettant d’accéder à ce bloc-notes se trouve sous le champ Code source dans le panneau détails du pipeline . Cliquez sur le lien pour ouvrir le bloc-notes avant de passer à l’étape suivante.
Cliquez sur Se connecter en haut à droite pour ouvrir le menu de configuration du calcul.
Pointez sur le nom du pipeline que vous avez créé à l’étape 1.
Cliquez sur Connecter.
En regard du titre de votre bloc-notes en haut, sélectionnez le langage par défaut du bloc-notes (Python ou SQL).
Copiez et collez le code suivant dans une cellule du bloc-notes.
Python
# Import modules import dlt from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dlt.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .option("inferSchema", True) .load(file_path)) # Define a materialized view that validates data and renames a column @dlt.table( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dlt.expect("valid_artist_name", "artist_name IS NOT NULL") @dlt.expect("valid_title", "song_title IS NOT NULL") @dlt.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dlt.table( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )
SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw ( artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING, value STRING ) COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/'); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC
Cliquez sur Démarrer pour démarrer une mise à jour du pipeline connecté.
Étape 3 : Interroger les données transformées
Dans cette étape, vous allez interroger les données traitées dans le pipeline ETL pour analyser les données de la chanson. Ces requêtes utilisent les enregistrements préparés créés à l’étape précédente.
Tout d’abord, exécutez une requête qui recherche les artistes qui ont sorti le plus de chansons chaque année depuis 1990.
Dans la barre latérale, cliquez sur
Éditeur SQL.
Cliquez sur
de nouvel onglet, puis sélectionnez Créer une nouvelle requête dans le menu.
Entrez les informations suivantes :
-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC
Remplacez
<catalog>
et<schema>
par le nom du catalogue et du schéma dans lequel se trouve la table. Par exemple :data_pipelines.songs_data.top_artists_by_year
.Cliquez sur Exécuter la sélection.
À présent, exécutez une autre requête qui recherche des chansons avec un rythme de 4/4 et un tempo dansable.
Cliquez sur la nouvelle icône d’appui
, puis sélectionnez Créer une requête dans le menu.
Entrez le code suivant :
-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;
Remplacez
<catalog>
et<schema>
par le nom du catalogue et du schéma dans lequel se trouve la table. Par exemple :data_pipelines.songs_data.songs_prepared
.Cliquez sur Exécuter la sélection.
Étape 4 : Créer un travail pour exécuter le pipeline
Ensuite, créez un flux de travail pour automatiser les étapes d’ingestion, de traitement et d’analyse des données à l’aide d’un travail Databricks.
- Dans votre espace de travail, cliquez sur
Travaux & Pipelines dans la barre latérale.
- Sous Nouveau, cliquez sur Travail.
- Dans la zone de titre de la tâche, remplacez la date et l’heure< du nouveau > travail par le nom de votre travail. Par exemple :
Songs workflow
. - Dans Nom de la tâche, entrez un nom pour la première tâche, par exemple,
ETL_songs_data
. - Dans Type, sélectionnez Pipeline.
- Dans Pipeline, sélectionnez le pipeline que vous avez créé à l’étape 1.
- Cliquez sur Créer.
- Pour exécuter le flux de travail, cliquez sur Exécuter maintenant. Pour afficher les détails de l’exécution, cliquez sur l’onglet Exécutions . Cliquez sur la tâche pour afficher les détails de l’exécution de la tâche.
- Pour afficher les résultats lorsque le flux de travail est terminé, cliquez sur Accéder à la dernière exécution réussie ou à l’heure de début de l’exécution du travail. La page Sortie s’affiche et montre les résultats de la requête.
Pour plus d’informations sur l’exécution des tâches, consultez Suivi et observabilité des tâches Lakeflow.
Étape 5 : Planifier le travail du pipeline
Pour exécuter le pipeline ETL selon une planification, procédez comme suit :
- Accédez à l’interface utilisateur Travaux &Pipelines dans le même espace de travail Azure Databricks que le travail.
- Si vous le souhaitez, sélectionnez les travaux et les filtres appartenant à moi .
- Dans la colonne Nom, cliquez sur le nom d’un travail. Le volet latéral affiche les Détails du travail.
- Cliquez sur Ajouter un déclencheur dans le volet Planifications et déclencheurs , puis sélectionnez Planifié dans le type de déclencheur.
- Indiquez la période, l’heure de début et le fuseau horaire.
- Cliquez sur Enregistrer.
En savoir plus
- Pour en savoir plus sur les pipelines de traitement des données avec des pipelines déclaratifs Lakeflow, consultez Pipelines déclaratifs Lakeflow
- Pour en savoir plus sur les notebooks Databricks, consultez Présentation des notebooks Databricks.
- Pour en savoir plus sur les travaux Lakeflow, consultez Qu’est-ce que les travaux ?
- Pour en savoir plus sur Delta Lake, consultez Qu’est-ce que Delta Lake dans Azure Databricks ?