Share via


Data wrangling interattivo con Apache Spark in Azure Machine Learning

Il data wrangling diventa uno dei passaggi più importanti dei progetti di Machine Learning. L'integrazione di Azure Machine Learning, con Azure Synapse Analytics, fornisce l'accesso a un pool di Spark Apache, supportato da Azure Synapse, per il data wrangling interattivo usando i notebook di Azure Machine Learning.

In questo articolo si apprenderà come eseguire il data wrangling usando

  • Calcolo Spark serverless
  • Pool di Spark Synapse collegato

Prerequisiti

Prima di iniziare le attività di data wrangling, è necessario scoprire le informazioni sul processo di archiviazione dei segreti

  • Chiave di accesso dell'account di archiviazione BLOB di Azure
  • Token di firma di accesso condiviso (SAS)
  • Informazioni sull'entità servizio di Azure Data Lake Storage (ADLS) Gen 2

in Azure Key Vault. È anche necessario sapere come gestire le assegnazioni di ruolo negli account di archiviazione di Azure. Le seguenti sezioni esaminano questi concetti. Verranno quindi esaminati i dettagli del data wrangling interattivo usando i pool di Spark nei notebook di Azure Machine Learning.

Suggerimento

Per informazioni sulla configurazione dell'assegnazione di ruolo dell'account di archiviazione di Azure o se si accede ai dati negli account di archiviazione usando il pass-through dell'identità utente, consultare Aggiungere assegnazioni di ruolo negli account di archiviazione di Azure.

Data wrangling interattivo con Apache Spark

Azure Machine Learning offre risorse di calcolo Spark serverless e pool di Spark collegati per il data wrangling interattivo con Apache Spark nei notebook di Azure Machine Learning. L'ambiente di calcolo Spark serverless non richiede la creazione di risorse nell'area di lavoro di Azure Synapse. Al contrario, un ambiente di calcolo Spark serverless completamente gestito diventa direttamente disponibile nei notebook di Azure Machine Learning. L'uso di un ambiente di calcolo Spark serverless è l'approccio più semplice per accedere a un cluster Spark in Azure Machine Learning.

Calcolo Spark serverless nei notebook di Azure Machine Learning

Un ambiente di calcolo Spark serverless è disponibile nei notebook di Azure Machine Learning per impostazione predefinita. Per accedervi in un notebook, selezionare Calcolo Spark serverless in Spark serverless di Azure Machine Learning dal menu di selezione Calcolo.

L'interfaccia utente notebook offre anche opzioni per la configurazione della sessione Spark, per il calcolo Spark serverless. Per configurare una sessione Spark:

  1. Selezionare Configura sessione nella parte superiore della schermata.
  2. Selezionare Versione di Apache Spark dal menu a discesa.

    Importante

    Runtime di Azure Synapse per Apache Spark: annunci

    • Runtime di Azure Synapse per Apache Spark 3.2:
      • Data annuncio EOLA: 8 luglio 2023
      • Data di fine supporto: 8 luglio 2024. Dopo questa data, il runtime verrà disabilitato.
    • Per un supporto continuo e prestazioni ottimali, è consigliabile eseguire la migrazione ad Apache Spark 3.3.
  3. Selezionare il Tipo di istanza dal menu a discesa. Attualmente sono supportati i tipi di istanza seguenti:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Immettere un valore di timeout sessione Spark, espresso in minuti.
  5. Selezionare se allocare dinamicamente gli executor
  6. Selezionare il numero di executor per la sessione spark.
  7. Selezionare le dimensioni executor dal menu a discesa.
  8. Selezionare le dimensioni driver dal menu a discesa.
  9. Per usare un file Conda per configurare una sessione Spark, selezionare la casella di controllo Carica file conda. Selezionare quindi Sfogliae scegliere il file Conda con la configurazione della sessione Spark desiderata.
  10. Aggiungere le proprietà delle impostazioni di configurazione, i valori di input nelle caselle di testo Proprietà e Valore e selezionare Aggiungi.
  11. Selezionare Applica.
  12. Selezionare Arresta sessione nella finestra popup Configura nuova sessione?

Le modifiche alla configurazione della sessione vengono mantenute e diventano disponibili per un'altra sessione del notebook avviata usando il calcolo Spark serverless.

Suggerimento

Se si usano pacchetti Conda a livello di sessione, è possibile migliorare l'avvio a freddo della sessione Spark se si imposta la variabile spark.hadoop.aml.enable_cache di configurazione su true. L'avvio a freddo di una sessione con i pacchetti Conda a livello di sessione richiede in genere da 10 a 15 minuti all'avvio della sessione per la prima volta. Tuttavia, l'accesso sporadico successivo alla sessione inizia con la variabile di configurazione impostata su true richiede in genere da tre a cinque minuti.

Importare e radunare dati da Azure Data Lake Storage (ADLS) Gen 2

È possibile accedere e radunare dati archiviati in account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 con URI dati abfss:// seguendo uno dei due meccanismi di accesso ai dati:

  • Pass-through dell'identità utente
  • Accesso ai dati basato sull'entità servizio

Suggerimento

Il data wrangling con un calcolo Spark serverless e il pass-through dell'identità utente per accedere ai dati in un account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2, richiede il minor numero di passaggi di configurazione.

Avviare il data wrangling interattivo con il pass-through dell'identità utente:

  • Verificare che l'identità dell’utente abbia le assegnazioni di ruoloCollaboratore e Collaboratore ai dati dei BLOB di archiviazione nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.

  • Per usare il calcolo Spark serverless, selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo.

  • Per usare un pool di Spark Synapse collegato, selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.

  • Questo esempio di codice di data wrangling Titanic mostra l'uso di un URI di dati in formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> con pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Radunare dati tramite l'accesso tramite un'entità servizio:

  1. Verificare che l'entità servizio abbia le assegnazioni di ruoloCollaboratore e Collaboratore ai dati dei BLOB di archiviazione nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.

  2. Creare segreti di Azure Key Vault per l'ID tenant dell'entità servizio, l'ID client e i valori dei segreti client.

  3. Selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo oppure selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.

  4. Per impostare l'ID tenant dell'entità servizio, l'ID client e il segreto client nella configurazione ed eseguire l'esempio di codice seguente.

    • La chiamata get_secret() nel codice dipende dal nome dell'insieme di credenziali delle chiavi di Azure e dai nomi dei segreti di Azure Key Vault creati per l'ID tenant dell'entità servizio, l'ID client e il segreto client. Impostare i valori/nome della proprietà corrispondenti nella configurazione:

      • Proprietà ID client: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Proprietà del segreto client: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Proprietà ID tenant: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Valore ID tenant: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importare e radunare dati usando l'URI dei dati nel formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> come illustrato nell'esempio di codice, usando i dati Titanic.

Importare e radunare dati da Archiviazione BLOB di Azure

È possibile accedere ai dati di Archiviazione BLOB di Azure con la chiave di accesso dell'account di archiviazione o un token di firma di accesso (SAS) condiviso. È necessario archiviare queste credenziali in Azure Key Vault come segretoe impostarle come proprietà nella configurazione della sessione.

Avviare il data wrangling interattivo:

  1. Nel pannello sinistro di studio di Azure Machine Learning selezionare Notebook.

  2. Selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo oppure selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.

  3. Configurare la chiave di accesso dell'account di archiviazione o un token di firma di accesso (SAS) condiviso per l'accesso ai dati nei notebook di Azure Machine Learning:

    • Per la chiave di accesso, impostare la proprietà fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net come illustrato in questo frammento di codice:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Per il token di firma di accesso (SAS) condiviso, impostare la proprietà fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net come illustrato nel frammento di codice seguente:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Nota

      Le chiamate get_secret() nei frammenti di codice precedenti richiedono il nome di Azure Key Vault e i nomi dei segreti creati per la chiave di accesso dell'account di Archiviazione BLOB di Azure o il token di firma di accesso (SAS) condiviso

  4. Eseguire il codice di dati nello stesso notebook. Formattare l'URI dei dati come wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, in modo analogo a quanto illustrato da questo frammento di codice:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Importare e radunare i dati dall'archivio dati di Azure Machine Learning

Per accedere ai dati da archivio dati di Azure Machine Learning, definire un percorso per i dati nell'archivio dati con formato URIazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>. Radunare dati da un archivio dati di Azure Machine Learning in una sessione notebook in modo interattivo:

  1. Selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo oppure selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.

  2. Questo esempio di codice illustra come leggere e radunare dati Titanic da un archivio dati di Azure Machine Learning, usando l'URI dell'archivio dati azureml://, pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Gli archivi dati di Azure Machine Learning possono accedere ai dati usando le credenziali dell'account di archiviazione di Azure

  • Chiave di accesso
  • Token di firma di accesso condiviso
  • entità servizio

o fornendo l'accesso ai dati senza credenziali. A seconda del tipo di archivio dati e del tipo di account di archiviazione di Azure sottostante, selezionare un meccanismo di autenticazione appropriato per garantire l'accesso ai dati. Questa tabella riepiloga i meccanismi di autenticazione per accedere ai dati negli archivi dati di Azure Machine Learning:

Storage account type Accesso ai dati senza credenziali Meccanismo di accesso ai dati Assegnazioni di ruolo
BLOB Azure No Chiave di accesso o token di firma di accesso (SAS) condiviso Nessuna assegnazione di ruolo necessaria
BLOB Azure Pass-through dell'identità utente* L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione BLOB di Azure
Azure Data Lake Storage (ADLS) Gen2 No Entità servizio L'entità servizio deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2
Azure Data Lake Storage (ADLS) Gen2 Pass-through dell'identità utente L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2

* il pass-through dell'identità utente funziona per gli archivi dati senza credenziali che puntano agli account di archiviazione BLOB di Azure, solo se l’eliminazione temporanea non è abilitata.

Accesso ai dati nella condivisione file predefinita

La condivisione file predefinita viene montata sia nel calcolo Spark serverless che nei pool di Spark collegati.

Screenshot showing use of a file share.

In studio di Azure Machine Learning i file nella condivisione file predefinita vengono visualizzati nell'albero delle directory nella scheda File. Il codice del notebook può accedere direttamente ai file archiviati in questa condivisione file con il protocollo file://, insieme al percorso assoluto del file, senza più configurazioni. Questo frammento di codice mostra come accedere a un file archiviato nella condivisione file predefinita:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Nota

Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Passaggi successivi