Data Wrangling interactif avec Apache Spark dans Azure Machine Learning (préversion)

Important

Cette fonctionnalité est actuellement disponible en préversion publique. Cette préversion est fournie sans contrat de niveau de service et n’est pas recommandée pour les charges de travail de production. Certaines fonctionnalités peuvent être limitées ou non prises en charge. Pour plus d’informations, consultez Conditions d’Utilisation Supplémentaires relatives aux Évaluations Microsoft Azure.

Le data wrangling devient l’une des étapes les plus importantes des projets machine learning. L’intégration d’Azure Machine Learning à Azure Synapse Analytics (préversion) permet d’accéder à un pool Apache Spark ( avec Azure Synapse ) pour le wrangling de données interactif à l’aide de notebooks Azure Machine Learning.

Dans cet article, vous allez apprendre à procéder à un data wrangling.

  • Calcul Synapse Spark managé (automatique)
  • Pool Spark Synapse attaché

Prérequis

Avant de commencer les tâches de data wrangling, vous devez vous familiariser avec le processus de stockage des secrets.

  • Clé d’accès au compte de stockage Azure
  • Utilisez un jeton de signature d’accès partagé (SAP)
  • Informations sur le principal de service Azure Data Lake Storage (ADLS) Gen 2

dans le Key Vault Azure. Vous devez également savoir comment gérer les attributions de rôles dans les comptes de stockage Azure. Les sections suivantes passe en revue ces concepts. Ensuite, nous allons explorer les détails du data wrangling interactif en utilisant des pools Spark dans des notebooks Azure Machine Learning.

Conseil

Pour en savoir plus sur la configuration de l’attribution de rôles de compte de stockage Azure ou si vous accédez aux données de vos comptes de stockage à l’aide d’un passthrough d’identité utilisateur, consultez Ajouter des attributions de rôles dans des comptes de stockage Azure.

Data wrangling interactif avec Apache Spark

Azure Machine Learning offre un calcul Spark serverless (préversion) et un pool Spark Synapse attaché pour un data wrangling interactif avec Apache Spark, dans les Notebooks d’Azure Machine Learning. Le calcul Spark serverless ne nécessite pas la création de ressources dans l’espace de travail Azure Synapse. Au lieu de cela, un calcul Spark automatique entièrement managé devient directement disponible dans les notebooks Azure Machine Learning. L’utilisation d’un calcul Spark serverless constitue l’approche la plus simple pour accéder à un cluster Spark dans Azure Machine Learning.

calcul Spark serverless dans les Notebooks d’Azure Machine Learning

Un calcul Spark serverless est disponible par défaut dans les Notebooks d’Azure Machine Learning. Pour y accéder dans un notebook, sélectionnez Calcul Azure Machine Learning Spark, sous Azure Machine Learning Spark dans le menu de sélection Calcul.

Capture d’écran mettant en évidence l’option Azure Machine Learning Spark sélectionnée dans le menu de sélection Calcul.

L’interface utilisateur Notebooks offre également des options de configuration de session Spark, pour le calcul Spark serverless. Pour configurer une session Spark :

  1. Sélectionner Configurer la session en haut de l’écran.

  2. Sélectionnez une version d’Apache Spark dans le menu déroulant.

    Important

    La fin de vie (EOLA) pour Runtime Azure Synapse pour Apache Spark 3.1 a été annoncée le 26 janvier 2023. Par conséquent, Apache Spark 3.1 ne sera pas pris en charge après le 31 juillet 2023. Nous vous recommandons d’utiliser Apache Spark 3.2.

  3. Sélectionnez Type d’instance dans le menu déroulant. Actuellement, les types d’instance suivants sont pris en charge :

    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Entrez une valeur de délai d’expiration de session Spark, en minutes.

  5. Sélectionnez le nombre d’exécuteurs pour la session Spark.

  6. Sélectionnez Taille d’exécuteur dans le menu déroulant.

  7. Sélectionnez Taille du pilote dans le menu déroulant.

  8. Pour utiliser un fichier conda pour configurer une session Spark, cochez la case Charger le fichier conda . Ensuite, sélectionnez Parcourir, puis choisissez le fichier conda avec la configuration de session Spark souhaitée.

  9. Ajoutez les Propriétés des paramètres de configuration, les valeurs d’entrée dans les zones de texte Propriété et Valeur , puis sélectionnez Ajouter.

  10. Sélectionnez Appliquer.

    Capture d’écran montrant les options de configuration de session Spark.

  11. Sélectionnez Arrêter maintenant dans la fenêtre contextuelle Arrêter la session actuelle .

    Capture d’écran montrant la boîte de dialogue Arrêter la session active.

Les modifications de configuration de session persisteront et seront mis à la disposition d’une autre session notebook démarrée à l’aide du calcul Spark serverless.

Importer et étrangler des données à partir de Azure Data Lake Storage (ADLS) Gen 2

Vous pouvez accéder aux données stockées dans des comptes de stockage Azure Data Lake Storage (ADLS) Gen 2 avec abfss:// des URI de données en suivant l’un des deux mécanismes d’accès aux données :

  • Passthrough d’identité utilisateur
  • Accès aux données basées sur le principal de service

Conseil

Le data wrangling avec un calcul Spark serverless et un passthrough d’identité utilisateur pour accéder aux données du compte de stockage de génération 2 d’Azure Data Lake Storage (ADLS) nécessite un moindre nombre d’étapes de configuration.

Pour démarrer le wrangling de données interactif avec le passthrough d’identité utilisateur :

  • Vérifiez que l’identité de l’utilisateur a des attributions de rôlesContributeuretContributeur aux données Blob de stockage dans le compte de stockage Azure Data Lake Storage (ADLS) Gen 2.

  • Pour utiliser le calcul Spark serverless, sélectionnez Calcul Spark Azure Machine Learning, sous Spark Azure Machine Learning, dans le menu de sélectionCalcul.

    Capture d’écran montrant l’utilisation d’un calcul Spark serverless.

  • Pour utiliser un pool Synapse Spark attaché, sélectionnez un pool Synapse Spark attaché sous Pool Synapse Spark (préversion) dans le menu de sélection Calcul .

    Capture d’écran montrant l’utilisation d’un pool Spark attaché.

  • Cet exemple de code de wrangling de données Titanic montre l’utilisation d’un URI de données au format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> avec pyspark.pandas et pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Notes

    Cet exemple de code Python utilise pyspark.pandas, qui est uniquement pris en charge par le runtime Spark version 3.2.

Pour étrangler des données par accès via un principal de service :

  1. Vérifiez que le principal de service a des attributions de rôlesContributeuretContributeur aux données Blob de stockage dans le compte de stockage Azure Data Lake Storage (ADLS) Gen 2.

  2. Créez des secrets Azure Key Vault pour l’ID de locataire du principal de service, l’ID client et les valeurs de clé secrète client.

  3. Sélectionnez le calcul Spark serverless Calcul Spark Azure Machine Learning sous Spark Azure Machine Learning dans le menu de sélection Calcul, ou sélectionnez un pool Spark Synapse attaché sous Pool Spark Synapse (préversion) dans le menu de sélection Calcul

  4. Pour définir l’ID de locataire du principal de service, l’ID client et la clé secrète client dans la configuration, exécutez l’exemple de code suivant.

    • L’appel de get_secret() dans le code dépend du nom du coffre de clés Azure et des noms des secrets Azure Key Vault créés pour l’ID de locataire, l’ID de client et le secret client du principal de service. Le nom/les valeurs de propriété correspondants à définir dans la configuration sont les suivantes :

      • Propriété ID client : fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Propriété de clé secrète client : fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Propriété ID de locataire : fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Valeur de l’ID de locataire : https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importez et wranglez des données à l’aide de l’URI de données au format abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> , comme indiqué dans l’exemple de code à l’aide des données Titanic.

Importez et étranglez les données à partir du stockage Blob Azure

Vous pouvez accéder aux données du stockage Blob Azure avec la clé d’accès du compte de stockage ou un jeton de signature d’accès partagé (SAP). Vous devez stocker ces informations d’identification dans le Key Vault Azure en tant que secret et les définir en tant que propriétés dans la configuration de session.

Pour démarrer le wrangling de données interactif :

  1. Dans le volet Azure Machine Learning studio gauche, sélectionnez Blocs-notes.

  2. Dans le menu de sélection Calcul, sélectionnez Calcul Spark serverless sous Spark serverless Azure Machine Learning, ou sélectionnez un pool Spark Synapse attaché sous Pool Spark Synapse (préversion) dans le menu de sélection Calcul.

  3. Pour configurer la clé d’accès du compte de stockage ou un jeton de signature d’accès partagé (SAP) pour l’accès aux données dans Azure Machine Learning Notebooks :

    • Pour la clé d’accès, définissez la propriété fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net comme indiqué dans cet extrait de code :

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Pour le jeton SAS, définissez la propriété fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net comme indiqué dans cet extrait de code :

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Notes

      Les appels get_secret() dans les extraits de code ci-dessus nécessitent le nom du Key Vault Azure et les noms des secrets créés pour la clé d’accès du compte de stockage Blob Azure ou le jeton SAP

  4. Exécutez le code de data wrangling dans le même notebook. Mettre en forme l’URI de données comme wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA> étant similaire à cet extrait de code

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Notes

    Cet exemple de code Python utilise pyspark.pandas, qui est uniquement pris en charge par le runtime Spark version 3.2.

Importer et wrangler des données à partir d’un magasin de données Azure Machine Learning

Pour accéder aux données du magasin de données Azure Machine Learning, définissez un chemin d’accès aux données du magasin de données au format URIazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>. Pour wrangle des données à partir d’un magasin de données Azure Machine Learning dans une session Notebooks de manière interactive :

  1. Sélectionnez le calcul Spark serverless Calcul Spark Azure Machine Learning sous Spark Azure Machine Learning dans le menu de sélection Calcul, ou sélectionnez un pool Spark Synapse attaché sous Pool Spark Synapse (préversion) dans le menu de sélection Calcul.

  2. Cet exemple de code montre comment lire et extraire des données Titanic à partir d’un magasin de données Azure Machine Learning, à l’aide azureml:// de l’URI du magasin de données et pyspark.pandaspyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Notes

    Cet exemple de code Python utilise pyspark.pandas, qui est uniquement pris en charge par le runtime Spark version 3.2.

Les magasins de données Azure Machine Learning peuvent accéder aux données à l’aide des informations d’identification du compte de stockage Azure

  • Clé d’accès
  • Jeton SAS
  • principal du service

ou fournir un accès aux données sans informations d’identification. Selon le type de magasin de données et le type de compte de stockage Azure sous-jacent, adoptez un mécanisme d’authentification approprié pour garantir l’accès aux données. Ce tableau récapitule les mécanismes d’authentification permettant d’accéder aux données dans les magasins de données Azure Machine Learning :

Type de compte de stockage Accès aux données sans informations d’identification Mécanisme d’accès aux données Affectations de rôles
Objets blob Azure Non Clé d’accès ou jeton SAP Aucune attribution de rôle n’est nécessaire
Objets blob Azure Oui Passthrough d’identité utilisateur* L’identité de l’utilisateur doit avoir des attributions de rôles appropriées dans le compte de stockage Blob Azure
Azure Data Lake Storage (ADLS) Gen 2 Non Principal du service Le principal de service doit avoir des attributions de rôle appropriées dans le compte de stockage Azure Data Lake Storage (ADLS) Gen2
Azure Data Lake Storage (ADLS) Gen 2 Oui Passthrough d’identité utilisateur L’identité de l’utilisateur doit avoir des attributions de rôle appropriées dans le compte de stockage Azure Data Lake Storage (ADLS) Gen2

Le passthrough d’identité utilisateur * fonctionne pour les magasins de données sans informations d’identification qui pointent vers des comptes de stockage Blob Azure, uniquement si la suppression réversible n’est pas activée.

Accès aux données sur le partage de fichiers par défaut

Le partage de fichiers par défaut est monté à la fois sur le calcul Spark serverless et sur les pools Spark Synapse attachés.

Capture d’écran montrant l’utilisation d’un partage de fichiers.

Dans Azure Machine Learning studio, les fichiers du partage de fichiers par défaut sont affichés dans l’arborescence de répertoires sous l’onglet Fichiers. Le code du notebook peut accéder directement aux fichiers stockés dans ce partage de fichiers avec le protocole file://, ainsi que le chemin absolu du fichier, sans autre configuration. Cet extrait de code montre comment accéder à un fichier stocké sur le partage de fichiers par défaut :

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Notes

Cet exemple de code Python utilise pyspark.pandas, qui est uniquement pris en charge par le runtime Spark version 3.2.

Étapes suivantes