Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Ce tutoriel explique comment créer et déployer un pipeline ETL (extraire, transformer et charger) pour l’orchestration des données à l’aide de pipelines déclaratifs Spark Lakeflow et du chargeur automatique. Un pipeline ETL implémente les étapes permettant de lire des données à partir de systèmes sources, de transformer ces données en fonction des exigences, telles que les vérifications de qualité des données et la déduplication des enregistrements, et d’écrire les données dans un système cible, comme un entrepôt de données ou un lac de données.
Dans ce tutoriel, vous allez utiliser des pipelines et du chargeur automatique pour :
- Ingérer des données sources brutes dans une table cible.
- Transformez les données sources brutes et écrivez les données transformées en deux vues matérialisées cibles.
- Interrogez les données transformées.
- Automatisez le pipeline ETL avec une tâche Databricks.
Pour plus d’informations sur les pipelines et le chargeur automatique, consultez Pipelines déclaratifs Spark Lakeflow et Qu’est-ce que le chargeur automatique ?
Spécifications
Pour suivre ce tutoriel, vous devez répondre aux exigences suivantes :
- Connectez-vous à un espace de travail Azure Databricks.
- Assurez-vous que le catalogue Unity est activé pour votre espace de travail.
- Le calcul serverless est activé pour votre compte. Les pipelines déclaratifs Serverless Lakeflow Spark ne sont pas disponibles dans toutes les régions de l’espace de travail. Consultez Fonctionnalités avec une disponibilité régionale limitée pour les régions disponibles.
- Disposez de l’autorisation de créer une ressource de calcul ou d’accéder à une ressource de calcul.
- Disposez des autorisations nécessaires pour créer un schéma dans un catalogue. Les autorisations nécessaires sont
ALL PRIVILEGESouUSE CATALOGetCREATE SCHEMA. - Disposez des autorisations nécessaires pour créer un volume dans un schéma existant. Les autorisations nécessaires sont
ALL PRIVILEGESouUSE SCHEMAetCREATE VOLUME.
À propos du jeu de données
Le jeu de données utilisé dans cet exemple est un sous-ensemble du jeu de données Million Song, une collection de caractéristiques et de métadonnées pour des morceaux de musique contemporains. Ce jeu de données est disponible dans les exemples de jeux de données compris dans votre espace de travail Azure Databricks.
Étape 1 : Créer un pipeline
Tout d’abord, créez un pipeline en définissant les jeux de données dans les fichiers (appelés code source) à l’aide de la syntaxe du pipeline. Chaque fichier de code source ne peut contenir qu’une seule langue, mais vous pouvez ajouter plusieurs fichiers spécifiques à la langue dans le pipeline. Pour en savoir plus, consultez Pipelines déclaratifs Spark Lakeflow
Ce tutoriel utilise le calcul serverless et le catalogue Unity. Pour toutes les options de configuration qui ne sont pas spécifiées, utilisez les paramètres par défaut. Si le calcul serverless n’est pas activé ou pris en charge dans votre espace de travail, vous pouvez suivre le didacticiel comme écrit à l’aide des paramètres de calcul par défaut.
Pour créer un pipeline, procédez comme suit :
- Dans votre espace de travail, cliquez sur
Nouveautés de la barre latérale, puis sélectionnez Pipeline ETL.
- Donnez un nom unique à votre pipeline.
- Juste en dessous du nom, sélectionnez le catalogue et le schéma par défaut pour les données que vous générez. Vous pouvez spécifier d’autres destinations dans vos transformations, mais ce didacticiel utilise ces valeurs par défaut. Vous devez disposer d’autorisations pour le catalogue et le schéma que vous créez. Consultez Spécifications.
- Pour ce tutoriel, sélectionnez Démarrer avec un fichier vide.
- Dans le chemin du dossier, spécifiez un emplacement pour vos fichiers sources ou acceptez la valeur par défaut (votre dossier utilisateur).
- Choisissez Python ou SQL comme langage pour votre premier fichier source (un pipeline peut combiner et faire correspondre des langages, mais chaque fichier doit se trouver dans une seule langue).
- Cliquez sur Sélectionner.
L’éditeur de pipeline s’affiche pour le nouveau pipeline. Un fichier source vide pour votre langue est créé, prêt pour votre première transformation.
Étape 2 : Développer votre logique de pipeline
Dans cette étape, vous allez utiliser l’Éditeur de pipelines Lakeflow pour développer et valider du code source pour le pipeline de manière interactive.
Le code utilise le chargeur automatique pour l’ingestion de données incrémentielles. Auto Loader détecte et traite automatiquement les nouveaux fichiers à mesure qu’ils arrivent dans le stockage d’objets cloud. Pour plus d’informations, consultez Qu’est-ce que le chargeur automatique ?
Un fichier de code source vide est automatiquement créé et configuré pour le pipeline. Le fichier est créé dans le dossier transformations de votre pipeline. Par défaut, tous les fichiers *.py et *.sql dans le dossier transformations font partie de la source de votre pipeline.
Copiez et collez le code suivant dans votre fichier source. Veillez à utiliser la langue que vous avez sélectionnée pour le fichier à l’étape 1.
Python
# Import modules from pyspark import pipelines as dp from pyspark.sql.functions import * from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField # Define the path to the source data file_path = f"/databricks-datasets/songs/data-001/" # Define a streaming table to ingest data from a volume schema = StructType( [ StructField("artist_id", StringType(), True), StructField("artist_lat", DoubleType(), True), StructField("artist_long", DoubleType(), True), StructField("artist_location", StringType(), True), StructField("artist_name", StringType(), True), StructField("duration", DoubleType(), True), StructField("end_of_fade_in", DoubleType(), True), StructField("key", IntegerType(), True), StructField("key_confidence", DoubleType(), True), StructField("loudness", DoubleType(), True), StructField("release", StringType(), True), StructField("song_hotnes", DoubleType(), True), StructField("song_id", StringType(), True), StructField("start_of_fade_out", DoubleType(), True), StructField("tempo", DoubleType(), True), StructField("time_signature", DoubleType(), True), StructField("time_signature_confidence", DoubleType(), True), StructField("title", StringType(), True), StructField("year", IntegerType(), True), StructField("partial_sequence", IntegerType(), True) ] ) @dp.table( comment="Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." ) def songs_raw(): return (spark.readStream .format("cloudFiles") .schema(schema) .option("cloudFiles.format", "csv") .option("sep","\t") .load(file_path)) # Define a materialized view that validates data and renames a column @dp.materialized_view( comment="Million Song Dataset with data cleaned and prepared for analysis." ) @dp.expect("valid_artist_name", "artist_name IS NOT NULL") @dp.expect("valid_title", "song_title IS NOT NULL") @dp.expect("valid_duration", "duration > 0") def songs_prepared(): return ( spark.read.table("songs_raw") .withColumnRenamed("title", "song_title") .select("artist_id", "artist_name", "duration", "release", "tempo", "time_signature", "song_title", "year") ) # Define a materialized view that has a filtered, aggregated, and sorted view of the data @dp.materialized_view( comment="A table summarizing counts of songs released by the artists who released the most songs each year." ) def top_artists_by_year(): return ( spark.read.table("songs_prepared") .filter(expr("year > 0")) .groupBy("artist_name", "year") .count().withColumnRenamed("count", "total_number_of_songs") .sort(desc("total_number_of_songs"), desc("year")) )SQL
-- Define a streaming table to ingest data from a volume CREATE OR REFRESH STREAMING TABLE songs_raw COMMENT "Raw data from a subset of the Million Song Dataset; a collection of features and metadata for contemporary music tracks." AS SELECT * FROM STREAM read_files( '/databricks-datasets/songs/data-001/part*', format => "csv", header => "false", delimiter => "\t", schema => """ artist_id STRING, artist_lat DOUBLE, artist_long DOUBLE, artist_location STRING, artist_name STRING, duration DOUBLE, end_of_fade_in DOUBLE, key INT, key_confidence DOUBLE, loudness DOUBLE, release STRING, song_hotnes DOUBLE, song_id STRING, start_of_fade_out DOUBLE, tempo DOUBLE, time_signature INT, time_signature_confidence DOUBLE, title STRING, year INT, partial_sequence STRING """, schemaEvolutionMode => "none"); -- Define a materialized view that validates data and renames a column CREATE OR REFRESH MATERIALIZED VIEW songs_prepared( CONSTRAINT valid_artist_name EXPECT (artist_name IS NOT NULL), CONSTRAINT valid_title EXPECT (song_title IS NOT NULL), CONSTRAINT valid_duration EXPECT (duration > 0) ) COMMENT "Million Song Dataset with data cleaned and prepared for analysis." AS SELECT artist_id, artist_name, duration, release, tempo, time_signature, title AS song_title, year FROM songs_raw; -- Define a materialized view that has a filtered, aggregated, and sorted view of the data CREATE OR REFRESH MATERIALIZED VIEW top_artists_by_year COMMENT "A table summarizing counts of songs released by the artists each year, who released the most songs." AS SELECT artist_name, year, COUNT(*) AS total_number_of_songs FROM songs_prepared WHERE year > 0 GROUP BY artist_name, year ORDER BY total_number_of_songs DESC, year DESC;Cette source inclut du code pour trois requêtes. Vous pouvez également placer ces requêtes dans des fichiers distincts pour organiser les fichiers et coder la façon dont vous préférez.
Cliquez sur
Exécutez le fichier ou Exécutez le pipeline pour lancer une mise à jour du pipeline connecté. Avec un seul fichier source dans votre pipeline, ceux-ci sont fonctionnellement équivalents.
Une fois la mise à jour terminée, l’éditeur est mis à jour avec des informations sur votre pipeline.
- Le graphique de pipeline (DAG), dans la barre latérale à droite de votre code, affiche trois tables,
songs_rawetsongs_preparedtop_artists_by_year. - Un résumé de la mise à jour s’affiche en haut de l'explorateur des actifs du pipeline.
- Les détails des tables qui ont été générées sont affichés dans le volet inférieur, et vous pouvez parcourir les données des tables en sélectionnant une.
Cela inclut les données brutes et nettoyées, ainsi que quelques analyses simples pour trouver les meilleurs artistes par année. À l’étape suivante, vous créez des requêtes ad hoc pour une analyse plus poussée dans un fichier distinct dans votre pipeline.
Étape 3 : Explorer les jeux de données créés par votre pipeline
Dans cette étape, vous effectuez des requêtes ad hoc sur les données traitées dans le pipeline ETL pour analyser les données de chanson dans l’éditeur SQL Databricks. Ces requêtes utilisent les enregistrements préparés créés à l’étape précédente.
Tout d’abord, exécutez une requête qui recherche les artistes qui ont sorti le plus de chansons chaque année depuis 1990.
Dans la barre latérale du navigateur des ressources de pipeline, cliquez sur
Ajouter puis Exploration.
Entrez un nom et sélectionnez SQL pour le fichier d’exploration. Un notebook SQL est créé dans un nouveau
explorationsdossier. Les fichiers duexplorationsdossier ne sont pas exécutés dans le cadre d’une mise à jour de pipeline par défaut. Le notebook SQL contient des cellules que vous pouvez exécuter ensemble ou séparément.Pour créer une table d’artistes qui publient le plus de chansons chaque année après 1990, entrez le code suivant dans le nouveau fichier SQL (s’il existe un exemple de code dans le fichier, remplacez-le). Étant donné que ce notebook ne fait pas partie du pipeline, il n’utilise pas le catalogue et le schéma par défaut. Remplacez
<catalog>.<schema>par le catalogue et le schéma que vous avez utilisés comme valeurs par défaut pour le pipeline :-- Which artists released the most songs each year in 1990 or later? SELECT artist_name, total_number_of_songs, year -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.top_artists_by_year WHERE year >= 1990 ORDER BY total_number_of_songs DESC, year DESC;Cliquez sur l’icône
ou appuyez
Shift + Enterpour exécuter cette requête.
À présent, exécutez une autre requête qui recherche des chansons avec un rythme de 4/4 et un tempo dansable.
Ajoutez le code suivant à la cellule suivante dans le même fichier. Là encore, remplacez le
<catalog>.<schema>avec les valeurs par défaut de catalogue et de schéma que vous avez utilisées pour le pipeline :-- Find songs with a 4/4 beat and danceable tempo SELECT artist_name, song_title, tempo -- replace with the catalog/schema you are using: FROM <catalog>.<schema>.songs_prepared WHERE time_signature = 4 AND tempo between 100 and 140;Cliquez sur l’icône
ou appuyez
Shift + Enterpour exécuter cette requête.
Étape 4 : Créer un job pour exécuter le pipeline
Ensuite, créez un flux de travail pour automatiser les étapes d’ingestion, de traitement et d’analyse des données à l’aide d’un travail Databricks qui s’exécute selon une planification.
- En haut de l’éditeur, choisissez le bouton Planifier .
- Si la boîte de dialogue Planifications s’affiche, choisissez Ajouter une planification.
- La boîte de dialogue Nouvelle planification s’ouvre, où vous pouvez créer un travail pour exécuter votre pipeline selon une planification.
- Si vous le souhaitez, donnez un nom au travail.
- Par défaut, la planification est définie pour s’exécuter une fois par jour. Vous pouvez accepter ce défaut ou définir votre propre horaire. Le choix d’Advanced vous donne la possibilité de définir une heure spécifique à laquelle le travail s’exécutera. La sélection d’autres options vous permet de créer des notifications lors de l’exécution du travail.
- Sélectionnez Créer pour appliquer les modifications et créer le travail.
Maintenant, la tâche s'exécutera quotidiennement pour maintenir votre pipeline à jour. Vous pouvez choisir de nouveau planifier pour afficher la liste des planifications. Vous pouvez gérer les planifications de votre pipeline à partir de cette boîte de dialogue, notamment l’ajout, la modification ou la suppression de planifications.
Cliquez sur le nom de la planification (ou de la tâche) pour accéder à la page de la tâche dans la liste Tâches & pipelines. À partir de là, vous pouvez afficher des détails sur les exécutions de travaux, y compris l’historique des exécutions, ou exécuter immédiatement le travail avec le bouton Exécuter maintenant .
Pour plus d’informations sur l’exécution des tâches, consultez Suivi et observabilité des tâches Lakeflow.
En savoir plus
- Pour en savoir plus sur les pipelines de traitement des données, consultez Pipelines déclaratifs Spark Lakeflow
- Pour en savoir plus sur les Notebooks Databricks, consultez les Notebooks Databricks.
- Pour en savoir plus sur les travaux Lakeflow, consultez Qu’est-ce que les travaux ?
- Pour en savoir plus sur Delta Lake, consultez Qu’est-ce que Delta Lake dans Azure Databricks ?