Condividi tramite


Data wrangling interattivo con Apache Spark in Azure Machine Learning

Il data wrangling diventa uno degli aspetti più importanti dei progetti di apprendimento automatico. L'integrazione di Azure Machine Learning con Azure Synapse Analytics fornisce l'accesso a un pool di Apache Spark, supportato da Azure Synapse, per il data wrangling interattivo che usa i notebook di Azure Machine Learning.

In questo articolo si apprenderà come gestire il data wrangling usando

  • Calcolo Spark serverless
  • Pool di Spark Synapse collegato

Prerequisiti

Prima di iniziare le attività di data wrangling, è necessario scoprire le informazioni sul processo di archiviazione dei segreti

  • Chiave di accesso dell'account di archiviazione BLOB di Azure
  • Token di firma di accesso condiviso (SAS)
  • Informazioni sull'entità servizio di Azure Data Lake Storage (ADLS) Gen 2

in Azure Key Vault. È anche necessario sapere come gestire le assegnazioni di ruolo negli account di archiviazione di Azure. Le sezioni seguenti di questo documento descrivono questi concetti. Vengono quindi esaminati i dettagli del data wrangling interattivo usando i pool di Spark nei notebook di Azure Machine Learning.

Suggerimento

Per informazioni sulla configurazione dell'assegnazione di ruolo dell'account di archiviazione di Azure o se si accede ai dati negli account di archiviazione usando il pass-through dell'identità utente, vedere Aggiungere assegnazioni di ruolo negli account di archiviazione di Azure.

Data wrangling interattivo con Apache Spark

Per il data wrangling interattivo con Apache Spark nei notebook di Azure Machine Learning, Azure Machine Learning offre un ambiente di calcolo Spark serverless e un pool di Spark collegato. L'ambiente di calcolo Spark serverless non richiede la creazione di risorse nell'area di lavoro di Azure Synapse. Al contrario, un ambiente di calcolo Spark serverless completamente gestito diventa direttamente disponibile nei notebook di Azure Machine Learning. L'uso di un ambiente di calcolo Spark serverless è il modo più semplice per accedere a un cluster Spark in Azure Machine Learning.

Calcolo Spark serverless nei notebook di Azure Machine Learning

Un ambiente di calcolo Spark serverless è disponibile nei notebook di Azure Machine Learning per impostazione predefinita. Per accedervi in un notebook, selezionare Calcolo Spark serverless in Spark serverless di Azure Machine Learning dal menu di selezione Calcolo.

L'interfaccia utente dei notebook offre anche opzioni per la configurazione della sessione di Spark per l'ambiente di calcolo Spark serverless. Per configurare una sessione Spark:

  1. Selezionare Configura sessione nella parte superiore della schermata.
  2. Selezionare Versione di Apache Spark dal menu a discesa.

    Importante

    Runtime di Azure Synapse per Apache Spark: annunci

    • Runtime di Azure Synapse per Apache Spark 3.2:
      • Data annuncio EOLA: 8 luglio 2023
      • Data di fine supporto: 8 luglio 2024. Dopo questa data il runtime sarà disabilitato.
    • Apache Spark 3.3:
      • Data dell’annuncio EOLA: 12 luglio 2024
      • Data di fine supporto: 31 marzo 2025. Dopo questa data il runtime sarà disabilitato.
    • Per usufruire di un supporto continuo e di prestazioni ottimali,è consigliabile eseguire la migrazione ad Apache Spark 3.4
  3. Selezionare il Tipo di istanza dal menu a discesa. Questi tipi sono attualmente supportati:
    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Immettere un valore di timeout sessione Spark, espresso in minuti.
  5. Specificare se si vuole applicare o meno l'opzione Alloca dinamicamente gli executor
  6. Selezionare il numero di executor per la sessione spark.
  7. Selezionare le dimensioni executor dal menu a discesa.
  8. Selezionare le dimensioni driver dal menu a discesa.
  9. Per usare un file Conda per configurare una sessione Spark, selezionare la casella di controllo Carica file conda. Selezionare quindi Sfogliae scegliere il file Conda con la configurazione della sessione Spark desiderata.
  10. Aggiungere le proprietà delle impostazioni di configurazione, i valori di input nelle caselle di testo Proprietà e Valore e selezionare Aggiungi.
  11. Selezionare Applica.
  12. Nella finestra popup Configurare una nuova sessione? selezionare Arresta sessione.

Le modifiche alla configurazione della sessione vengono mantenute e diventano disponibili per un'altra sessione del notebook avviata usando il calcolo Spark serverless.

Suggerimento

Se si usano pacchetti Conda a livello di sessione, è possibile migliorare l'avvio a freddo della sessione Spark se si imposta la variabile di configurazione spark.hadoop.aml.enable_cache su true. L'avvio a freddo di una sessione con i pacchetti Conda a livello di sessione richiede in genere da 10 a 15 minuti all'avvio della sessione per la prima volta. Tuttavia, l'accesso sporadico successivo alla sessione inizia con la variabile di configurazione impostata su true richiede in genere da tre a cinque minuti.

Importare e radunare dati da Azure Data Lake Storage (ADLS) Gen 2

È possibile accedere ed eseguire il wrangling dei dati archiviati negli account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 con URI di dati di tipo abfss://. A tale scopo, è necessario seguire uno dei due meccanismi di accesso ai dati:

  • Pass-through dell'identità utente
  • Accesso ai dati basato sull'entità servizio

Suggerimento

Il data wrangling con un calcolo Spark serverless e il pass-through dell'identità utente per accedere ai dati in un account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2, richiede il minor numero di passaggi di configurazione.

Avviare il data wrangling interattivo con il pass-through dell'identità utente:

  • Verificare che l’identità utente disponga delle assegnazioni di ruolo Collaboratore e Collaboratore ai dati dei BLOB di archiviazione nell’account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.

  • Per usare il calcolo Spark serverless, selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo.

  • Per usare un pool di Spark Synapse collegato, selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.

  • Questo esempio di codice di data wrangling Titanic mostra l'uso di un URI di dati in formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> con pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Radunare dati tramite l'accesso tramite un'entità servizio:

  1. Verificare che l’entità servizio disponga delle assegnazioni di ruolo Collaboratore e Collaboratore ai dati dei BLOB di archiviazione nell’account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.

  2. Creare segreti di Azure Key Vault per l'ID tenant dell'entità servizio, l'ID client e i valori dei segreti client.

  3. Nel menu di selezione Calcolo selezionare Ambiente di calcolo Spark serverless in Spark serverless per Azure Machine Learning. È anche possibile selezionare un pool di Spark per Synapse collegato in Pool Spark di Synapse dal menu di selezione Calcolo.

  4. Impostare i valori per ID tenant dell'entità servizio, ID client e segreto client nella configurazione ed eseguire l'esempio di codice seguente.

    • La chiamata get_secret() nel codice dipende dal nome dell'insieme di credenziali delle chiavi di Azure e dai nomi dei segreti di Azure Key Vault creati per l'ID tenant dell'entità servizio, l'ID client e il segreto client. Impostare i valori/nome della proprietà corrispondenti nella configurazione:

      • Proprietà ID client: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Proprietà del segreto client: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Proprietà ID tenant: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Valore ID tenant: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Usando i dati Titanic, importare ed eseguire il wrangling dei dati usando l'URI di dati con formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>, come mostrato nell'esempio di codice.

Importare e radunare dati da Archiviazione BLOB di Azure

È possibile accedere ai dati di Archiviazione BLOB di Azure con la chiave di accesso dell'account di archiviazione o un token di firma di accesso (SAS) condiviso. È necessario archiviare queste credenziali in Azure Key Vault come segretoe impostarle come proprietà nella configurazione della sessione.

Avviare il data wrangling interattivo:

  1. Nel pannello sinistro di studio di Azure Machine Learning selezionare Notebook.

  2. Nel menu di selezione Calcolo selezionare Ambiente di calcolo Spark serverless in Spark serverless per Azure Machine Learning. È anche possibile selezionare un pool di Spark per Synapse collegato in Pool Spark di Synapse dal menu di selezione Calcolo.

  3. Configurare la chiave di accesso dell'account di archiviazione o un token di firma di accesso (SAS) condiviso per l'accesso ai dati nei notebook di Azure Machine Learning:

    • Per la chiave di accesso, impostare la proprietà fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net, come illustrato in questo frammento di codice:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Per il token di firma di accesso condiviso, impostare la proprietà fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net, come illustrato in questo frammento di codice:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Nota

      Le chiamate get_secret() nei frammenti di codice precedenti richiedono il nome dell'istanza di Azure Key Vault e i nomi dei segreti creati per la chiave di accesso dell'account di Archiviazione BLOB di Azure o il token di firma di accesso condiviso.

  4. Eseguire il codice di dati nello stesso notebook. Formattare l'URI dei dati come wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>, in modo analogo a quanto illustrato da questo frammento di codice:

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Importare e radunare i dati dall'archivio dati di Azure Machine Learning

Per accedere ai dati dall’archivio dati di Azure Machine Learning, definire un percorso ai dati nell’archivio dati con formato URI azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>. Radunare dati da un archivio dati di Azure Machine Learning in una sessione notebook in modo interattivo:

  1. Selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo oppure selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.

  2. Questo esempio di codice illustra come leggere ed eseguire il wrangling dei dati Titanic da un archivio dati di Azure Machine Learning, usando l'URI dell'archivio dati azureml://, pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Gli archivi dati di Azure Machine Learning possono accedere ai dati usando le credenziali dell'account di archiviazione di Azure

  • Chiave di accesso
  • Token di firma di accesso condiviso
  • entità servizio

In alternativa, usano l'accesso ai dati senza credenziali. A seconda del tipo di archivio dati e del tipo di account di archiviazione di Azure sottostante, selezionare un meccanismo di autenticazione appropriato per garantire l'accesso ai dati. Questa tabella riepiloga i meccanismi di autenticazione per accedere ai dati negli archivi dati di Azure Machine Learning:

Storage account type Accesso ai dati senza credenziali Meccanismo di accesso ai dati Assegnazioni di ruolo
BLOB Azure No Chiave di accesso o token di firma di accesso (SAS) condiviso Nessuna assegnazione di ruolo necessaria
BLOB Azure Pass-through dell'identità utente* L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione BLOB di Azure
Azure Data Lake Storage (ADLS) Gen2 No Entità servizio L'entità servizio deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2
Azure Data Lake Storage (ADLS) Gen2 Pass-through dell'identità utente L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2

* il pass-through dell'identità utente funziona per gli archivi dati senza credenziali che puntano agli account di archiviazione BLOB di Azure, solo se l’eliminazione temporanea non è abilitata.

Accesso ai dati nella condivisione file predefinita

La condivisione file predefinita viene montata sia nel calcolo Spark serverless che nei pool di Spark collegati.

Screenshot che mostra l’uso di una condivisione file.

In studio di Azure Machine Learning i file nella condivisione file predefinita vengono visualizzati nell'albero delle directory nella scheda File. Il codice del notebook può accedere direttamente ai file archiviati in questa condivisione file con il protocollo file://, insieme al percorso assoluto del file, senza più configurazioni. Questo frammento di codice mostra come accedere a un file archiviato nella condivisione file predefinita:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Nota

Questo esempio di codice Python usa pyspark.pandas. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.

Passaggi successivi