Wrangling di dati interattivi con Apache Spark in Azure Machine Learning (anteprima)

Importante

Questa funzionalità è attualmente in anteprima pubblica. Questa versione di anteprima viene fornita senza contratto di servizio e non è consigliata per carichi di lavoro di produzione. Alcune funzionalità potrebbero non essere supportate o potrebbero presentare funzionalità limitate. Per altre informazioni, vedere Condizioni supplementari per l'utilizzo delle anteprime di Microsoft Azure.

Il data wrangling diventa uno dei passaggi più importanti dei progetti di Machine Learning. L'integrazione di Azure Machine Learning, con Azure Synapse Analytics (anteprima), fornisce l'accesso a un pool di Apache Spark, supportato da Azure Synapse, per la creazione di data wrangling interattivi con i notebook di Azure Machine Learning.

In questo articolo si apprenderà come eseguire il wrangling dei dati usando

  • Calcolo di Synapse Spark gestito (automatico)
  • Pool di Spark synapse collegato

Prerequisiti

Prima di avviare le attività di wrangling dei dati, è necessario avere familiarità con il processo di archiviazione dei segreti

  • Chiave di accesso dell'account di archiviazione BLOB di Azure
  • Token di firma di accesso condiviso
  • informazioni sull'entità servizio di Azure Data Lake Storage (ADLS) Gen 2

nella Key Vault di Azure. È anche necessario sapere come gestire le assegnazioni di ruolo negli account di archiviazione di Azure. Le sezioni seguenti esaminano questi concetti. Verranno quindi esaminati i dettagli del wrangling interattivo dei dati usando i pool di Spark in Azure Machine Learning Notebooks.

Suggerimento

Per informazioni sulla configurazione dell'assegnazione di ruolo dell'account di archiviazione di Azure o se si accede ai dati negli account di archiviazione usando il pass-through dell'identità utente, vedere Aggiungere assegnazioni di ruolo negli account di archiviazione di Azure.

Wrangling di dati interattivi con Apache Spark

Azure Machine Learning offre risorse di calcolo Spark gestite (automatiche) e un pool di Spark Synapse collegato, per creare data wrangling interattivi con Apache Spark, in Azure Machine Learning Notebooks. Il calcolo Spark gestito (automatico) non richiede la creazione di risorse nell'area di lavoro Azure Synapse. Al contrario, un ambiente di calcolo Spark completamente gestito diventa direttamente disponibile nei notebook di Azure Machine Learning. L'uso di un ambiente di calcolo Spark gestito (automatico) è l'approccio più semplice per accedere a un cluster Spark in Azure Machine Learning.

Calcolo Spark gestito (automatico) nei notebook di Azure Machine Learning

Un ambiente di calcolo Spark gestito (automatico) è disponibile nei notebook di Azure Machine Learning per impostazione predefinita. Per accedervi in un notebook, selezionare Calcolo Spark di Azure Machine Learning in Azure Machine Learning Spark dal menu di selezione calcolo .

Screenshot che evidenzia l'opzione Spark di Azure Machine Learning selezionata nel menu di selezione calcolo.

L'interfaccia utente dei notebook offre anche opzioni per la configurazione della sessione Spark, per il calcolo Spark gestito (automatico). Per configurare una sessione Spark:

  1. Selezionare Configura sessione nella parte inferiore della schermata.

  2. Selezionare una versione di Apache Spark dal menu a discesa.

  3. Selezionare Tipo di istanza dal menu a discesa. Sono attualmente supportati i tipi di istanza seguenti:

    • Standard_E4s_v3
    • Standard_E8s_v3
    • Standard_E16s_v3
    • Standard_E32s_v3
    • Standard_E64s_v3
  4. Immettere un valore di timeout della sessione Spark, espresso in minuti.

  5. Selezionare il numero di executor per la sessione Spark.

  6. Selezionare Dimensioni executor dal menu a discesa.

  7. Selezionare Dimensioni driver dal menu a discesa.

  8. Per usare un file conda per configurare una sessione Spark, selezionare la casella di controllo Carica file conda . Selezionare quindi Sfoglia e scegliere il file conda con la configurazione della sessione Spark desiderata.

  9. Aggiungere le proprietà delle impostazioni di configurazione , i valori di input nelle caselle di testo Proprietà e Valore e selezionare Aggiungi.

  10. Selezionare Applica.

    Screenshot che mostra le opzioni di configurazione della sessione Spark.

  11. Selezionare Arresta ora nella finestra popup Arresta sessione corrente .

    Screenshot che mostra la finestra di dialogo Arresta sessione corrente.

Le modifiche alla configurazione della sessione verranno mantenute e diventeranno disponibili per un'altra sessione del notebook avviata usando il calcolo Spark gestito (automatico).

Importare e wrangle i dati da Azure Data Lake Storage (ADLS) Gen 2

È possibile accedere ai dati wrangle archiviati in account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 con abfss:// URI di dati seguendo uno dei due meccanismi di accesso ai dati:

  • Pass-through dell'identità utente
  • Accesso ai dati basato su entità servizio

Suggerimento

Per accedere ai dati nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 è necessario il minor numero di passaggi di configurazione.

Per avviare il wrangling dei dati interattivi con il pass-through dell'identità utente:

  • Verificare che l'identità utente abbia assegnazioni di ruoloCollaboratore ai dati del BLOB di archiviazione e Collaboratore ai dati del BLOB di archiviazione nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.

  • Per usare l'ambiente di calcolo Spark gestito (automatico), selezionare Calcolo Spark di Azure Machine Learning, in Azure Machine Learning Spark dal menu di selezione Calcolo .

    Screenshot che mostra l'uso di un ambiente di calcolo Spark gestito (automatico).

  • Per usare un pool di Synapse Spark collegato, selezionare un pool synapse Spark collegato nel pool di Spark Synapse (anteprima) dal menu Di selezione calcolo .

    Screenshot che mostra l'uso di un pool di Spark collegato.

  • Questo esempio di codice di data wrangling del Titanic mostra l'uso di un URI di dati in formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> con pyspark.pandas e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas, supportato solo dal runtime spark versione 3.2.

Per wrangle data by access through a service principal :To wrangle data by access through a service principal:

  1. Verificare che l'entità servizio abbia assegnazioni di ruoloCollaboratore ai dati del BLOB di archiviazione e Collaboratore ai dati del BLOB di archiviazione nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen2.

  2. Creare segreti di Azure Key Vault per l'ID tenant dell'entità servizio, l'ID client e i valori dei segreti client.

  3. Selezionare Managed (Automatic) Spark compute Azure Machine Learning Spark Compute (Managed (Automatic) Spark Compute (Managed (Automatic) Spark Compute (Managed (Automatic) Spark compute Azure Machine Learning Spark Compute (Azure Machine Learning Spark ComputeCompute Compute SparkCompute Spark compute spark compute) (Calcolo gestito (automatico) dal menu di selezione calcolo oppure selezionare un pool synapse Spark collegato (anteprima)

  4. Per impostare l'ID tenant dell'entità servizio, l'ID client e il segreto client nella configurazione, eseguire l'esempio di codice seguente.

    • La get_secret() chiamata nel codice dipende dal nome del Key Vault di Azure e dai nomi dei segreti di Azure Key Vault creati per l'ID tenant dell'entità servizio, l'ID client e il segreto client. Il nome/i valori della proprietà corrispondenti da impostare nella configurazione sono i seguenti:

      • Proprietà ID client: fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Proprietà del segreto client: fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Proprietà ID tenant: fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
      • Valore ID tenant: https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      
      # Set up service principal tenant ID, client ID and secret from Azure Key Vault
      client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>")
      tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>")
      client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>")
      
      # Set up service principal which has access of the data
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth"
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_id,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          client_secret,
      )
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net",
          "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token",
      )
      
  5. Importare e wrangle dati usando l'URI dei dati in formato abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA> come illustrato nell'esempio di codice usando i dati Titanic.

Importare e wrangle dati da Archiviazione BLOB di Azure

È possibile accedere ai dati di Archiviazione BLOB di Azure con la chiave di accesso dell'account di archiviazione o un token di firma di accesso condiviso. È necessario archiviare queste credenziali nel Key Vault di Azure come segreto e impostarle come proprietà nella configurazione della sessione.

Per avviare il wrangling dei dati interattivi:

  1. Nel pannello studio di Azure Machine Learning sinistro selezionare Notebook.

  2. Nel menu di selezione Calcolo selezionare l'ambiente di calcolo Spark gestito (automatico) di Calcolo Spark di Azure Machine Learning in Azure Machine Learning Spark oppure selezionare un pool synapse Spark collegato nel pool synapse Spark (anteprima) dal menu Di selezione calcolo .

  3. Per configurare la chiave di accesso dell'account di archiviazione o un token di firma di accesso condiviso per l'accesso ai dati in Azure Machine Learning Notebooks:

    • Per la chiave di accesso, impostare la proprietà fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net come illustrato nel frammento di codice:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key
      )
      
    • Per il token di firma di accesso condiviso, impostare la proprietà fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net come illustrato nel frammento di codice seguente:

      from pyspark.sql import SparkSession
      
      sc = SparkSession.builder.getOrCreate()
      token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary
      sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>")
      sc._jsc.hadoopConfiguration().set(
          "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net",
          sas_token,
      )
      

      Nota

      Le get_secret() chiamate nei frammenti di codice precedenti richiedono il nome del Key Vault di Azure e i nomi dei segreti creati per la chiave di accesso all'account di archiviazione BLOB di Azure o il token di firma di accesso condiviso

  4. Eseguire il codice wrangling dei dati nello stesso notebook. Formattare l'URI dati come wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA> simile al frammento di codice

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas, che è supportato solo da Spark runtime versione 3.2.

Importare e wrangle dati dall'archivio dati di Azure Machine Learning

Per accedere ai dati dall'archivio dati di Azure Machine Learning, definire un percorso per i dati nell'archivio dati con formatoazureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA> URI. Per wrangle dati da un archivio dati di Azure Machine Learning in una sessione Notebooks in modo interattivo:

  1. Selezionare Managed (Automatic) Spark compute Azure Machine Learning Spark Compute in Azure Machine Learning Spark dal menu di selezione calcolo oppure selezionare un pool Di Synapse Spark collegato nel pool synapse Spark (anteprima) dal menu di selezione calcolo .

  2. Questo esempio di codice illustra come leggere e wrangle Titanic dati da un archivio dati di Azure Machine Learning, usando azureml:// l'URI pyspark.pandas dell'archivio dati e pyspark.ml.feature.Imputer.

    import pyspark.pandas as pd
    from pyspark.ml.feature import Imputer
    
    df = pd.read_csv(
        "azureml://datastores/workspaceblobstore/paths/data/titanic.csv",
        index_col="PassengerId",
    )
    imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy(
        "mean"
    )  # Replace missing values in Age column with the mean value
    df.fillna(
        value={"Cabin": "None"}, inplace=True
    )  # Fill Cabin column with value "None" if missing
    df.dropna(inplace=True)  # Drop the rows which still have any missing value
    df.to_csv(
        "azureml://datastores/workspaceblobstore/paths/data/wrangled",
        index_col="PassengerId",
    )
    

    Nota

    Questo esempio di codice Python usa pyspark.pandas, che è supportato solo da Spark runtime versione 3.2.

Gli archivi dati di Azure Machine Learning possono accedere ai dati usando le credenziali dell'account di archiviazione di Azure

  • Chiave di accesso
  • Token di firma di accesso condiviso
  • entità servizio

o fornire l'accesso ai dati senza credenziali. A seconda del tipo di archivio dati e del tipo di account di archiviazione di Azure sottostante, adottare un meccanismo di autenticazione appropriato per garantire l'accesso ai dati. Questa tabella riepiloga i meccanismi di autenticazione per accedere ai dati negli archivi dati di Azure Machine Learning:

Tipo di account di archiviazione Accesso ai dati senza credenziali Meccanismo di accesso ai dati Assegnazioni di ruoli
BLOB Azure No Chiave di accesso o token di firma di accesso condiviso Nessuna assegnazione di ruolo necessaria
BLOB Azure Pass-through dell'identità utente* L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione BLOB di Azure
Azure Data Lake Storage (ADLS) Gen 2 No Entità servizio L'entità servizio deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2
Azure Data Lake Storage (ADLS) Gen 2 Pass-through dell'identità utente L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2

* il pass-through dell'identità utente funziona per gli archivi dati senza credenziali che puntano agli account di archiviazione BLOB di Azure, solo se l'eliminazione temporanea non è abilitata.

Accesso ai dati nella condivisione file predefinita

La condivisione file predefinita viene montata sia per il calcolo Spark gestito (automatico) che per i pool Di Synapse Spark collegati.

Screenshot che mostra l'uso di una condivisione file.

In studio di Azure Machine Learning i file nella condivisione file predefinita vengono visualizzati nell'albero della directory nella scheda File. Il codice del notebook può accedere direttamente ai file archiviati in questa condivisione file con file:// il protocollo, insieme al percorso assoluto del file, senza più configurazioni. Questo frammento di codice illustra come accedere a un file archiviato nella condivisione file predefinita:

import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer

abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
    inputCols=["Age"],
    outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")

Nota

Questo esempio di codice Python usa pyspark.pandas, che è supportato solo da Spark runtime versione 3.2.

Passaggi successivi