Exercice - Intégrer un notebook dans des pipelines Azure Synapse

Effectué

Dans cette unité, vous créez un notebook Azure Synapse Spark pour analyser et transformer les données chargées par un flux de données de mappage et stocker les données dans un lac de données. Vous créez une cellule de paramètre qui accepte un paramètre de chaîne qui définit le nom de dossier des données que le notebook écrit dans le lac de données.

Vous ajoutez ensuite ce notebook à un pipeline Synapse et passez l’ID d’exécution du pipeline unique au paramètre de notebook afin de pouvoir corréler par la suite l’exécution du pipeline avec les données enregistrées par l’activité du notebook.

Enfin, vous utilisez le hub Superviser dans Synapse Studio pour superviser l’exécution du pipeline, obtenir l’ID d’exécution, puis rechercher les fichiers correspondants stockés dans le lac de données.

À propos d’Apache Spark et des notebooks

Apache Spark est un framework de traitement parallèle qui prend en charge le traitement en mémoire pour améliorer les performances des applications d’analytique du Big Data. Apache Spark dans Azure Synapse Analytics est l’une des implémentations par Microsoft d’Apache Spark dans le cloud.

Un notebook Apache Spark dans Synapse Studio est une interface web vous permettant de créer des fichiers contenant du code, des visualisations et du texte descriptif dynamiques. Les notebooks constituent un bon endroit où valider des idées et effectuer des expérimentations rapides pour extraire des insights de vos données. Les notebooks sont également largement utilisés pour la préparation et la visualisation de données, le machine learning et d’autres scénarios de Big Data.

Créer un notebook Synapse Spark

Supposez que vous avez créé un flux de données de mappage dans Synapse Analytics pour traiter, joindre et importer des données de profil utilisateur. Vous souhaitez à présent rechercher les cinq premiers produits pour chaque utilisateur, en fonction de ceux qui sont à la fois préférés et premiers choix, et qui comptabilisent le plus d’achats au cours des 12 derniers mois. Ensuite, vous souhaitez calculer les cinq premiers produits tous confondus.

Au cours de cet exercice, vous allez créer un notebook Synapse Spark pour effectuer ces calculs.

  1. Ouvrez Synapse Analytics Studio (https://web.azuresynapse.net/), puis accédez au hub Données.

    The Data menu item is highlighted.

  2. Sélectionnez l’onglet Lié(1) et développez le compte de stockage de lac de données principal (2) sous Azure Data Lake Storage Gen2. Sélectionnez le conteneur wwi-02(3) et ouvrez le dossier top-products(4). Cliquez avec le bouton droit sur n’importe quel fichier Parquet (5), sélectionnez l’élément de menu Nouveau notebook(6), puis sélectionnez Chargement dans le dataframe (7). Si vous ne voyez pas le dossier, sélectionnez Refresh.

    The Parquet file and new notebook option are highlighted.

  3. Assurez-vous que le notebook est attaché au pool Spark.

    The attach to Spark pool menu item is highlighted.

  4. Remplacez le nom de fichier Parquet par *.parquet(1) pour sélectionner tous les fichiers Parquet du dossier top-products. Par exemple, le chemin doit ressembler à ce qui suit : abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top-products/*.parquet.

    The filename is highlighted.

  5. Sélectionnez Exécuter tout dans la barre d’outils du notebook pour exécuter ce dernier.

    The cell results are displayed.

    Remarque

    La première fois que vous exécutez un notebook dans un pool Spark, Synapse crée une session. Cette opération peut prendre environ trois à cinq minutes.

    Notes

    Pour exécuter uniquement la cellule, pointez dessus et sélectionnez l’icône Exécuter la cellule à gauche de la cellule, ou sélectionnez la cellule, puis appuyez sur Ctrl + Entrée.

  6. Créez une nouvelle cellule en-dessous en sélectionnant le bouton + et en choisissant l’élément Cellule de code. Le bouton + se trouve sous la cellule de notebook sur la gauche. Vous pouvez également développer le menu + Cellule dans la barre d’outils Notebook et sélectionner l’élément Cellule de code.

    The Add Code menu option is highlighted.

  7. Exécutez la commande suivante dans la nouvelle cellule pour remplir un nouveau dataframe appelé topPurchases, créer une nouvelle vue temporaire appelée top_purchases et afficher les 100 premières lignes :

    topPurchases = df.select(
        "UserId", "ProductId",
        "ItemsPurchasedLast12Months", "IsTopProduct",
        "IsPreferredProduct")
    
    # Populate a temporary view so we can query from SQL
    topPurchases.createOrReplaceTempView("top_purchases")
    
    topPurchases.show(100)
    

    Le résultat doit être semblable à ce qui suit :

    +------+---------+--------------------------+------------+------------------+
    |UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
    +------+---------+--------------------------+------------+------------------+
    |   148|     2717|                      null|       false|              true|
    |   148|     4002|                      null|       false|              true|
    |   148|     1716|                      null|       false|              true|
    |   148|     4520|                      null|       false|              true|
    |   148|      951|                      null|       false|              true|
    |   148|     1817|                      null|       false|              true|
    |   463|     2634|                      null|       false|              true|
    |   463|     2795|                      null|       false|              true|
    |   471|     1946|                      null|       false|              true|
    |   471|     4431|                      null|       false|              true|
    |   471|      566|                      null|       false|              true|
    |   471|     2179|                      null|       false|              true|
    |   471|     3758|                      null|       false|              true|
    |   471|     2434|                      null|       false|              true|
    |   471|     1793|                      null|       false|              true|
    |   471|     1620|                      null|       false|              true|
    |   471|     1572|                      null|       false|              true|
    |   833|      957|                      null|       false|              true|
    |   833|     3140|                      null|       false|              true|
    |   833|     1087|                      null|       false|              true|
    
  8. Exécutez la commande suivante dans une nouvelle cellule pour créer une nouvelle vue temporaire à l’aide de SQL :

    %%sql
    
    CREATE OR REPLACE TEMPORARY VIEW top_5_products
    AS
        select UserId, ProductId, ItemsPurchasedLast12Months
        from (select *,
                    row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
            from top_purchases
            ) a
        where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
        order by a.UserId
    

    Notes

    Il n’y a aucune sortie pour cette requête.

    La requête utilise la vue temporaire top_purchases comme source et fait appel à une méthode row_number() over pour appliquer un numéro de ligne aux enregistrements de chaque utilisateur où ItemsPurchasedLast12Months est le plus grand. La clause where filtre les résultats de sorte que nous récupérons uniquement jusqu’à cinq produits pour lesquels les deux valeurs IsTopProduct et IsPreferredProduct sont définies sur true. Cela nous donne les cinq produits les plus achetés pour chaque utilisateur. Ils sont également identifiés comme leurs produits préférés selon leur profil utilisateur stocké dans Azure Cosmos DB.

  9. Exécutez la commande suivante dans une nouvelle cellule pour créer et afficher un nouveau DataFrame qui stocke les résultats de la vue temporaire top_5_products que vous avez créée dans la cellule précédente :

    top5Products = sqlContext.table("top_5_products")
    
    top5Products.show(100)
    

    Vous devez voir une sortie similaire à ce qui suit, qui affiche les cinq premiers produits préférés par utilisateur :

    The top five preferred products are displayed per user.

  10. Calculez les cinq premiers produits tous confondus, en fonction de ceux qui sont préférés par les clients et achetés le plus. Pour ce faire, exécutez la commande suivante dans une nouvelle cellule :

    top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
        .groupBy("ProductId")
        .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
        .orderBy( col("Total").desc() )
        .limit(5))
    
    top5ProductsOverall.show()
    

    Dans cette cellule, nous avons regroupé les cinq premiers produits préférés par ID de produit, totalisé le nombre total d’articles achetés ces 12 derniers mois, trié cette valeur dans l’ordre décroissant et retourné les cinq premiers résultats. Le résultat doit ressembler à ce qui suit :

    +---------+-----+
    |ProductId|Total|
    +---------+-----+
    |     2107| 4538|
    |     4833| 4533|
    |      347| 4523|
    |     3459| 4233|
    |     4246| 4155|
    +---------+-----+
    

Créer une cellule de paramètre

Les pipelines Azure Synapse recherchent la cellule de paramètre et la traite comme cellule par défaut pour les paramètres passés au moment de l’exécution. Le moteur d’exécution ajoutera une nouvelle cellule sous la cellule des paramètres avec des paramètres d’entrée en vue de remplacer les valeurs par défaut. Lorsqu’il n’y a pas de cellule de paramètres désignée, la cellule injectée est insérée tout en haut du notebook.

  1. Nous allons exécuter ce notebook à partir d’un pipeline. Nous voulons passer un paramètre qui définit une valeur de variable runId qui sera utilisée pour nommer le fichier Parquet. Exécutez la commande suivante dans une nouvelle cellule :

    import uuid
    
    # Generate random GUID
    runId = uuid.uuid4()
    

    Nous utilisons la bibliothèque uuid fournie avec Spark pour générer un GUID aléatoire. Nous voulons remplacer la variable runId par un paramètre passé par le pipeline. Pour ce faire, nous devons l’activer comme cellule de paramètre.

  2. Sélectionnez les points de suspension des actions (...) en haut à droite de la cellule (1), puis sélectionnez Activer/désactiver la cellule de paramètre (2).

    The menu item is highlighted.

    Une fois cette option activée, vous voyez l’étiquette Paramètres sur la cellule.

    The cell is configured to accept parameters.

  3. Collez le code suivant dans une nouvelle cellule pour utiliser la variable runId en tant que nom de fichier Parquet dans le chemin /top5-products/ du compte de lac de données principal. Remplacez YOUR_DATALAKE_NAME dans le chemin par le nom de votre compte de lac de données principal. Pour le trouver, faites défiler jusqu’à la Cellule 1 en haut de la page (1). Copiez le compte de stockage de lac de données à partir du chemin (2). Collez cette valeur en remplacement de YOUR_DATALAKE_NAME dans le chemin (3) dans la nouvelle cellule, puis exécutez la commande dans la cellule.

    %%pyspark
    
    top5ProductsOverall.write.parquet('abfss://wwi-02@YOUR_DATALAKE_NAME.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')
    

    The path is updated with the name of the primary data lake account.

  4. Vérifiez que le fichier a été écrit dans le lac de données. Accédez au hub Données et sélectionnez l’onglet Lié(1). Développez le compte de stockage de lac de données principal, puis sélectionnez le conteneur wwi-02(2). Accédez au dossier top5-products(3). Vous devriez voir un dossier pour le fichier Parquet dans le répertoire avec un GUID comme nom de fichier (4).

    The parquet file is highlighted.

    La méthode write Parquet sur le dataframe dans la cellule Notebook a créé ce répertoire puisqu’il n’existait pas avant.

Ajouter le notebook à un pipeline Synapse

Pour en revenir au flux de données de mappage que nous avons décrit au début de l’exercice, supposez que vous souhaitez exécuter ce notebook après l’exécution du flux de données dans le cadre de votre processus d’orchestration. Pour ce faire, vous ajoutez ce notebook à un pipeline en tant que nouvelle activité Notebook.

  1. Revenez au notebook. Sélectionnez Propriétés(1) en haut à droite du notebook, puis entrez Calculate Top 5 Products pour le Nom (2).

    The properties blade is displayed.

  2. Sélectionnez Ajouter au pipeline(1) en haut à droite du notebook, puis sélectionnez Pipeline existant (2).

    The add to pipeline button is highlighted.

  3. Sélectionnez le pipeline Écrire les données de profil utilisateur dans ASA(1), puis sélectionnez Ajouter *(2).

    The pipeline is selected.

  4. Synapse Studio ajoute l’activité Notebook au pipeline. Réorganisez l’activité Notebook pour qu’elle se trouve à droite de l’activité Flux de données. Sélectionnez l’activité Flux de données, puis faites glisser un carré vert de connexion de pipeline d’activité Réussite vers l’activité Notebook.

    The green arrow is highlighted.

    La flèche d’activité Réussite indique au pipeline d’exécuter l’activité Notebook après que l’exécution de l’activité Flux de données a réussi.

  5. Sélectionnez l’activité Notebook (1), sélectionnez l’onglet Paramètres(2), développez Paramètres de base (3), puis sélectionnez + Nouveau (4). Entrez runId dans le champ Nom(5). Sélectionnez Chaîne comme Type (6). Pour la Valeur, sélectionnez Ajouter du contenu dynamique (7).

    The settings are displayed.

  6. Sélectionnez ID d’exécution du pipeline sous Variables système (1). Cela ajoute @pipeline().RunId à la zone de contenu dynamique (2). Sélectionnez Terminer (3) pour fermer la boîte de dialogue.

    The dynamic content form is displayed.

    La valeur de l’ID d’exécution du pipeline est un GUID unique affecté à chaque exécution du pipeline. Nous allons utiliser cette valeur pour le nom du fichier Parquet en la passant en tant que paramètre de notebook runId. Nous pouvons ensuite examiner l’historique des exécutions du pipeline et rechercher le fichier Parquet spécifique créé pour chaque exécution du pipeline.

  7. Sélectionnez Tout publier puis Publier pour enregistrer vos modifications.

    Publish all is highlighted.

  8. Une fois la publication terminée, sélectionnez Ajouter un déclencheur (1), puis Déclencher maintenant (2) pour exécuter le pipeline mis à jour.

    The trigger menu item is highlighted.

  9. Sélectionnez OK pour exécuter le déclencheur.

    The OK button is highlighted.

Surveiller l’exécution du pipeline.

Le hub Superviser vous permet de superviser les activités actuelles et historiques de SQL, Apache Spark et Pipelines.

  1. Accédez au hub Superviser.

    The Monitor hub menu item is selected.

  2. Sélectionnez Exécutions du pipeline (1) et attendez que l’exécution du pipeline se termine correctement (2). Vous devrez peut-être actualiser (3) la vue.

    The pipeline run succeeded.

  3. Sélectionnez le nom du pipeline pour voir les exécutions de l’activité du pipeline.

    The pipeline name is selected.

  4. Notez à la fois l’activité Flux de données et la nouvelle activité Notebook(1). Notez la valeur de l’ID d’exécution du pipeline(2). Nous allons la comparer au nom du fichier Parquet généré par le notebook. Sélectionnez le nom du notebook Calculer les 5 premiers produits pour en voir les détails (3).

    The pipeline run details are displayed.

  5. Nous voyons ici les détails de l’exécution de Notebook. Vous pouvez sélectionner Lecture(1) pour regarder la lecture de la progression des travaux (2). En bas, vous pouvez voir les Diagnostics et les Journaux avec différentes options de filtre (3). À droite, nous pouvons voir les détails de l’exécution, tels que la durée, l’ID Livy, les détails du pool Spark, et ainsi de suite. Sélectionnez le lien Afficher les détails sur un travail pour voir des informations détaillées à son sujet (5).

    The run details are displayed.

  6. L’interface utilisateur de l’application Spark s’ouvre dans un nouvel onglet dans lequel nous pouvons voir les détails de la phase. Développez la visualisation DAG pour voir les détails de la phase.

    The Spark stage details are displayed.

  7. Revenez au hub Données.

    Data hub.

  8. Sélectionnez l’onglet Lié(1), sélectionnez le conteneur wwi-02(2) sur le compte de stockage de lac de données principal, accédez au dossier top5-products(3), puis vérifiez qu’il existe un dossier pour le fichier Parquet dont le nom correspond à l’ID d’exécution du pipeline.

    The file is highlighted.

    Comme vous pouvez le voir, nous avons un fichier dont le nom correspond à l’ID d’exécution du pipeline noté précédemment :

    The Pipeline run ID is highlighted.

    Ces valeurs correspondent parce que nous avons passé l’ID d’exécution du pipeline au paramètre runId sur l’activité Notebook.