Partager via


Tutoriel : Créer votre premier pipeline à l’aide de l’éditeur de pipelines Lakeflow

Découvrez comment créer un pipeline à l’aide de Pipelines déclaratifs Spark Lakeflow (SDP) pour l’orchestration des données et le chargeur automatique. Ce didacticiel étend l’exemple de pipeline en nettoyant les données et en créant une requête pour rechercher les 100 premiers utilisateurs.

Dans ce tutoriel, vous allez apprendre à utiliser l’éditeur de pipelines Lakeflow pour :

  • Créez un pipeline avec la structure de dossiers par défaut et commencez par un ensemble d’exemples de fichiers.
  • Définissez des contraintes de qualité des données à l’aide des attentes.
  • Utilisez les fonctionnalités de l’éditeur pour étendre le pipeline avec une nouvelle transformation pour effectuer une analyse sur vos données.

Spécifications

Avant de commencer ce tutoriel, vous devez :

  • Connectez-vous à un espace de travail Azure Databricks.
  • Activez le catalogue Unity pour votre espace de travail.
  • Pour votre espace de travail, l'éditeur de pipelines Lakeflow doit être activé et vous devez être inscrit. Consultez L’activation de l’éditeur de pipelines Lakeflow et la surveillance mise à jour.
  • Disposez de l’autorisation de créer une ressource de calcul ou d’accéder à une ressource de calcul.
  • Disposez des autorisations nécessaires pour créer un schéma dans un catalogue. Les autorisations nécessaires sont ALL PRIVILEGES ou USE CATALOG et CREATE SCHEMA.

Étape 1 : Créer un pipeline

Dans cette étape, vous créez un pipeline à l’aide de la structure de dossiers et des exemples de code par défaut. Les exemples de code référencent la table users dans la source de données d'exemple wanderbricks.

  1. Dans votre espace de travail Azure Databricks, cliquez sur l’icône Plus.Nouveau, puis l’icône pipeline.Pipeline ETL. L’éditeur de pipeline s’ouvre, dans la page créer un pipeline.

  2. Cliquez sur l’en-tête pour donner un nom à votre pipeline.

  3. Juste en dessous du nom, choisissez le catalogue et le schéma par défaut pour vos tables de sortie. Celles-ci sont utilisées lorsque vous ne spécifiez pas de catalogue et de schéma dans vos définitions de pipeline.

  4. Sous Étape suivante de votre pipeline, cliquez sur l’icône Schéma.Commencez par un exemple de code dans l’icône SQL ou Schéma.Commencez par l’exemple de code en Python, en fonction de vos préférences de langage. Cela modifie la langue par défaut de votre exemple de code, mais vous pouvez ajouter du code dans l’autre langue ultérieurement. Cela crée une structure de dossiers par défaut avec un exemple de code pour commencer.

  5. Vous pouvez afficher l’exemple de code dans le navigateur de ressources de pipeline sur le côté gauche de l’espace de travail. Sous transformations, se trouvent deux fichiers qui génèrent chacun un jeu de données de pipeline. Sous explorations, il y a un bloc-notes contenant du code pour vous aider à afficher la sortie de votre pipeline. Cliquer sur un fichier vous permet d’afficher et de modifier le code dans l’éditeur.

    Les jeux de données de sortie n’ont pas encore été créés et le graphique pipeline sur le côté droit de l’écran est vide.

  6. Pour exécuter le code du pipeline (le code dans le transformations dossier), cliquez sur Exécuter le pipeline dans la partie supérieure droite de l’écran.

    Une fois l’exécution terminée, la partie inférieure de l’espace de travail affiche les deux nouvelles tables créées, sample_users_<pipeline-name> et sample_aggregation_<pipeline-name>. Vous pouvez également voir que le graphique de pipeline sur le côté droit de l’espace de travail affiche désormais les deux tables, y compris que sample_users est la source pour sample_aggregation.

Étape 2 : Appliquer des vérifications de qualité des données

Dans cette étape, vous ajoutez un contrôle de qualité des données à la sample_users table. Vous utilisez des contraintes de pipeline afin de limiter les données. Dans ce cas, vous supprimez tous les enregistrements utilisateur qui n’ont pas d’adresse e-mail valide et affichez la table nettoyée en tant que users_cleaned.

  1. Dans le navigateur des ressources du pipeline, cliquez sur icône Plus, puis sélectionnez Transformation.

  2. Dans la boîte de dialogue Créer un fichier de transformation , effectuez les sélections suivantes :

    • Choisissez Python ou SQL pour le langage. Cela ne doit pas correspondre à votre sélection précédente.
    • Donnez un nom au fichier. Dans ce cas, choisissez users_cleaned.
    • Pour le chemin de destination, conservez la valeur par défaut.
    • Pour le type de jeu de données, laissez-le en tant que Aucun sélectionné ou choisissez Affichage matérialisé. Si vous sélectionnez l’affichage Matérialisé, il génère un exemple de code pour vous.
  3. Dans votre nouveau fichier de code, modifiez le code pour qu’il corresponde à ce qui suit (utilisez SQL ou Python, en fonction de votre sélection sur l’écran précédent). Remplacez <pipeline-name> par le nom complet de votre sample_users table.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<pipeline-name>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.table
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<pipeline_name>")
        )
    
  4. Cliquez sur Exécuter le pipeline pour mettre à jour le pipeline. Il doit maintenant avoir trois tables.

Étape 3 : Analyser les principaux utilisateurs

Obtenez ensuite les 100 premiers utilisateurs en fonction du nombre de réservations qu'ils ont créées. Joignez la table wanderbricks.bookings à la vue matérialisée users_cleaned.

  1. Dans le navigateur des ressources du pipeline, cliquez sur icône Plus, puis sélectionnez Transformation.

  2. Dans la boîte de dialogue Créer un fichier de transformation , effectuez les sélections suivantes :

    • Choisissez Python ou SQL pour le langage. Cela ne doit pas correspondre à vos sélections précédentes.
    • Donnez un nom au fichier. Dans ce cas, choisissez users_and_bookings.
    • Pour le chemin de destination, conservez la valeur par défaut.
    • Pour le type de jeu de données, laissez-le comme Aucun sélectionné.
  3. Dans votre nouveau fichier de code, modifiez le code pour qu’il corresponde à ce qui suit (utilisez SQL ou Python, en fonction de votre sélection sur l’écran précédent).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.table
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Cliquez sur Exécuter le pipeline pour mettre à jour les jeux de données. Une fois l’exécution terminée, vous pouvez voir dans pipeline Graph qu’il existe quatre tables, y compris la nouvelle users_and_bookings table.

    Graphique de pipeline montrant quatre tables dans le pipeline

Étapes suivantes

Maintenant que vous avez appris à utiliser certaines des fonctionnalités de l’éditeur de pipelines Lakeflow et à créer un pipeline, voici quelques autres fonctionnalités pour en savoir plus sur :