Data wrangling interattivo con Apache Spark in Azure Machine Learning
Il data wrangling diventa uno degli aspetti più importanti dei progetti di apprendimento automatico. L'integrazione di Azure Machine Learning con Azure Synapse Analytics fornisce l'accesso a un pool di Apache Spark, supportato da Azure Synapse, per il data wrangling interattivo che usa i notebook di Azure Machine Learning.
In questo articolo si apprenderà come gestire il data wrangling usando
- Calcolo Spark serverless
- Pool di Spark Synapse collegato
Prerequisiti
- Una sottoscrizione di Azure; se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
- Un'area di lavoro di Azure Machine Learning. Per altre informazioni, vedere Creare risorse dell'area di lavoro.
- Un account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2. Per altre informazioni, vedere Creare un account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.
- (Facoltativo): insieme di credenziali delle chiavi di Azure. Per altre informazioni, vedere Creare un'istanza di Azure Key Vault.
- (Facoltativo): un'entità servizio. Per altre informazioni, vedere Creare un'entità servizio.
- (Facoltativo): un pool di Spark per Synapse collegato all'area di lavoro di Azure Machine Learning.
Prima di iniziare le attività di data wrangling, è necessario scoprire le informazioni sul processo di archiviazione dei segreti
- Chiave di accesso dell'account di archiviazione BLOB di Azure
- Token di firma di accesso condiviso (SAS)
- Informazioni sull'entità servizio di Azure Data Lake Storage (ADLS) Gen 2
in Azure Key Vault. È anche necessario sapere come gestire le assegnazioni di ruolo negli account di archiviazione di Azure. Le sezioni seguenti di questo documento descrivono questi concetti. Vengono quindi esaminati i dettagli del data wrangling interattivo usando i pool di Spark nei notebook di Azure Machine Learning.
Suggerimento
Per informazioni sulla configurazione dell'assegnazione di ruolo dell'account di archiviazione di Azure o se si accede ai dati negli account di archiviazione usando il pass-through dell'identità utente, vedere Aggiungere assegnazioni di ruolo negli account di archiviazione di Azure.
Data wrangling interattivo con Apache Spark
Per il data wrangling interattivo con Apache Spark nei notebook di Azure Machine Learning, Azure Machine Learning offre un ambiente di calcolo Spark serverless e un pool di Spark collegato. L'ambiente di calcolo Spark serverless non richiede la creazione di risorse nell'area di lavoro di Azure Synapse. Al contrario, un ambiente di calcolo Spark serverless completamente gestito diventa direttamente disponibile nei notebook di Azure Machine Learning. L'uso di un ambiente di calcolo Spark serverless è il modo più semplice per accedere a un cluster Spark in Azure Machine Learning.
Calcolo Spark serverless nei notebook di Azure Machine Learning
Un ambiente di calcolo Spark serverless è disponibile nei notebook di Azure Machine Learning per impostazione predefinita. Per accedervi in un notebook, selezionare Calcolo Spark serverless in Spark serverless di Azure Machine Learning dal menu di selezione Calcolo.
L'interfaccia utente dei notebook offre anche opzioni per la configurazione della sessione di Spark per l'ambiente di calcolo Spark serverless. Per configurare una sessione Spark:
- Selezionare Configura sessione nella parte superiore della schermata.
- Selezionare Versione di Apache Spark dal menu a discesa.
Importante
Runtime di Azure Synapse per Apache Spark: annunci
- Runtime di Azure Synapse per Apache Spark 3.2:
- Data annuncio EOLA: 8 luglio 2023
- Data di fine supporto: 8 luglio 2024. Dopo questa data il runtime sarà disabilitato.
- Apache Spark 3.3:
- Data dell’annuncio EOLA: 12 luglio 2024
- Data di fine supporto: 31 marzo 2025. Dopo questa data il runtime sarà disabilitato.
- Per usufruire di un supporto continuo e di prestazioni ottimali,è consigliabile eseguire la migrazione ad Apache Spark 3.4
- Runtime di Azure Synapse per Apache Spark 3.2:
- Selezionare il Tipo di istanza dal menu a discesa. Questi tipi sono attualmente supportati:
Standard_E4s_v3
Standard_E8s_v3
Standard_E16s_v3
Standard_E32s_v3
Standard_E64s_v3
- Immettere un valore di timeout sessione Spark, espresso in minuti.
- Specificare se si vuole applicare o meno l'opzione Alloca dinamicamente gli executor
- Selezionare il numero di executor per la sessione spark.
- Selezionare le dimensioni executor dal menu a discesa.
- Selezionare le dimensioni driver dal menu a discesa.
- Per usare un file Conda per configurare una sessione Spark, selezionare la casella di controllo Carica file conda. Selezionare quindi Sfogliae scegliere il file Conda con la configurazione della sessione Spark desiderata.
- Aggiungere le proprietà delle impostazioni di configurazione, i valori di input nelle caselle di testo Proprietà e Valore e selezionare Aggiungi.
- Selezionare Applica.
- Nella finestra popup Configurare una nuova sessione? selezionare Arresta sessione.
Le modifiche alla configurazione della sessione vengono mantenute e diventano disponibili per un'altra sessione del notebook avviata usando il calcolo Spark serverless.
Suggerimento
Se si usano pacchetti Conda a livello di sessione, è possibile migliorare l'avvio a freddo della sessione Spark se si imposta la variabile di configurazione spark.hadoop.aml.enable_cache
su true. L'avvio a freddo di una sessione con i pacchetti Conda a livello di sessione richiede in genere da 10 a 15 minuti all'avvio della sessione per la prima volta. Tuttavia, l'accesso sporadico successivo alla sessione inizia con la variabile di configurazione impostata su true richiede in genere da tre a cinque minuti.
Importare e radunare dati da Azure Data Lake Storage (ADLS) Gen 2
È possibile accedere ed eseguire il wrangling dei dati archiviati negli account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 con URI di dati di tipo abfss://
. A tale scopo, è necessario seguire uno dei due meccanismi di accesso ai dati:
- Pass-through dell'identità utente
- Accesso ai dati basato sull'entità servizio
Suggerimento
Il data wrangling con un calcolo Spark serverless e il pass-through dell'identità utente per accedere ai dati in un account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2, richiede il minor numero di passaggi di configurazione.
Avviare il data wrangling interattivo con il pass-through dell'identità utente:
Verificare che l’identità utente disponga delle assegnazioni di ruolo Collaboratore e Collaboratore ai dati dei BLOB di archiviazione nell’account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.
Per usare il calcolo Spark serverless, selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo.
Per usare un pool di Spark Synapse collegato, selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.
Questo esempio di codice di data wrangling Titanic mostra l'uso di un URI di dati in formato
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
conpyspark.pandas
epyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/data/wrangled", index_col="PassengerId", )
Nota
Questo esempio di codice Python usa
pyspark.pandas
. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.
Radunare dati tramite l'accesso tramite un'entità servizio:
Verificare che l’entità servizio disponga delle assegnazioni di ruolo Collaboratore e Collaboratore ai dati dei BLOB di archiviazione nell’account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2.
Creare segreti di Azure Key Vault per l'ID tenant dell'entità servizio, l'ID client e i valori dei segreti client.
Nel menu di selezione Calcolo selezionare Ambiente di calcolo Spark serverless in Spark serverless per Azure Machine Learning. È anche possibile selezionare un pool di Spark per Synapse collegato in Pool Spark di Synapse dal menu di selezione Calcolo.
Impostare i valori per ID tenant dell'entità servizio, ID client e segreto client nella configurazione ed eseguire l'esempio di codice seguente.
La chiamata
get_secret()
nel codice dipende dal nome dell'insieme di credenziali delle chiavi di Azure e dai nomi dei segreti di Azure Key Vault creati per l'ID tenant dell'entità servizio, l'ID client e il segreto client. Impostare i valori/nome della proprietà corrispondenti nella configurazione:- Proprietà ID client:
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Proprietà del segreto client:
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Proprietà ID tenant:
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net
- Valore ID tenant:
https://login.microsoftonline.com/<TENANT_ID>/oauth2/token
from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary # Set up service principal tenant ID, client ID and secret from Azure Key Vault client_id = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_ID_SECRET_NAME>") tenant_id = token_library.getSecret("<KEY_VAULT_NAME>", "<TENANT_ID_SECRET_NAME>") client_secret = token_library.getSecret("<KEY_VAULT_NAME>", "<CLIENT_SECRET_NAME>") # Set up service principal which has access of the data sc._jsc.hadoopConfiguration().set( "fs.azure.account.auth.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "OAuth" ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_id, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", client_secret, ) sc._jsc.hadoopConfiguration().set( "fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net", "https://login.microsoftonline.com/" + tenant_id + "/oauth2/token", )
- Proprietà ID client:
Usando i dati Titanic, importare ed eseguire il wrangling dei dati usando l'URI di dati con formato
abfss://<FILE_SYSTEM_NAME>@<STORAGE_ACCOUNT_NAME>.dfs.core.windows.net/<PATH_TO_DATA>
, come mostrato nell'esempio di codice.
Importare e radunare dati da Archiviazione BLOB di Azure
È possibile accedere ai dati di Archiviazione BLOB di Azure con la chiave di accesso dell'account di archiviazione o un token di firma di accesso (SAS) condiviso. È necessario archiviare queste credenziali in Azure Key Vault come segretoe impostarle come proprietà nella configurazione della sessione.
Avviare il data wrangling interattivo:
Nel pannello sinistro di studio di Azure Machine Learning selezionare Notebook.
Nel menu di selezione Calcolo selezionare Ambiente di calcolo Spark serverless in Spark serverless per Azure Machine Learning. È anche possibile selezionare un pool di Spark per Synapse collegato in Pool Spark di Synapse dal menu di selezione Calcolo.
Configurare la chiave di accesso dell'account di archiviazione o un token di firma di accesso (SAS) condiviso per l'accesso ai dati nei notebook di Azure Machine Learning:
Per la chiave di accesso, impostare la proprietà
fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
, come illustrato in questo frammento di codice:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary access_key = token_library.getSecret("<KEY_VAULT_NAME>", "<ACCESS_KEY_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.account.key.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", access_key )
Per il token di firma di accesso condiviso, impostare la proprietà
fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net
, come illustrato in questo frammento di codice:from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() token_library = sc._jvm.com.microsoft.azure.synapse.tokenlibrary.TokenLibrary sas_token = token_library.getSecret("<KEY_VAULT_NAME>", "<SAS_TOKEN_SECRET_NAME>") sc._jsc.hadoopConfiguration().set( "fs.azure.sas.<BLOB_CONTAINER_NAME>.<STORAGE_ACCOUNT_NAME>.blob.core.windows.net", sas_token, )
Nota
Le chiamate
get_secret()
nei frammenti di codice precedenti richiedono il nome dell'istanza di Azure Key Vault e i nomi dei segreti creati per la chiave di accesso dell'account di Archiviazione BLOB di Azure o il token di firma di accesso condiviso.
Eseguire il codice di dati nello stesso notebook. Formattare l'URI dei dati come
wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/<PATH_TO_DATA>
, in modo analogo a quanto illustrato da questo frammento di codice:import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "wasbs://<BLOB_CONTAINER_NAME>@<STORAGE_ACCOUNT_NAME>.blob.core.windows.net/data/wrangled", index_col="PassengerId", )
Nota
Questo esempio di codice Python usa
pyspark.pandas
. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.
Importare e radunare i dati dall'archivio dati di Azure Machine Learning
Per accedere ai dati dall’archivio dati di Azure Machine Learning, definire un percorso ai dati nell’archivio dati con formato URI azureml://datastores/<DATASTORE_NAME>/paths/<PATH_TO_DATA>
. Radunare dati da un archivio dati di Azure Machine Learning in una sessione notebook in modo interattivo:
Selezionare Calcolo Spark serverless in Azure Machine Learning Serverless Spark dal menu di selezione Calcolo oppure selezionare un pool di Spark Synapse collegato in Pool di Spark Synapse dal menu di selezione Calcolo.
Questo esempio di codice illustra come leggere ed eseguire il wrangling dei dati Titanic da un archivio dati di Azure Machine Learning, usando l'URI dell'archivio dati
azureml://
,pyspark.pandas
epyspark.ml.feature.Imputer
.import pyspark.pandas as pd from pyspark.ml.feature import Imputer df = pd.read_csv( "azureml://datastores/workspaceblobstore/paths/data/titanic.csv", index_col="PassengerId", ) imputer = Imputer(inputCols=["Age"], outputCol="Age").setStrategy( "mean" ) # Replace missing values in Age column with the mean value df.fillna( value={"Cabin": "None"}, inplace=True ) # Fill Cabin column with value "None" if missing df.dropna(inplace=True) # Drop the rows which still have any missing value df.to_csv( "azureml://datastores/workspaceblobstore/paths/data/wrangled", index_col="PassengerId", )
Nota
Questo esempio di codice Python usa
pyspark.pandas
. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.
Gli archivi dati di Azure Machine Learning possono accedere ai dati usando le credenziali dell'account di archiviazione di Azure
- Chiave di accesso
- Token di firma di accesso condiviso
- entità servizio
In alternativa, usano l'accesso ai dati senza credenziali. A seconda del tipo di archivio dati e del tipo di account di archiviazione di Azure sottostante, selezionare un meccanismo di autenticazione appropriato per garantire l'accesso ai dati. Questa tabella riepiloga i meccanismi di autenticazione per accedere ai dati negli archivi dati di Azure Machine Learning:
Storage account type | Accesso ai dati senza credenziali | Meccanismo di accesso ai dati | Assegnazioni di ruolo |
---|---|---|---|
BLOB Azure | No | Chiave di accesso o token di firma di accesso (SAS) condiviso | Nessuna assegnazione di ruolo necessaria |
BLOB Azure | Sì | Pass-through dell'identità utente* | L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione BLOB di Azure |
Azure Data Lake Storage (ADLS) Gen2 | No | Entità servizio | L'entità servizio deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 |
Azure Data Lake Storage (ADLS) Gen2 | Sì | Pass-through dell'identità utente | L'identità utente deve avere assegnazioni di ruolo appropriate nell'account di archiviazione di Azure Data Lake Storage (ADLS) Gen 2 |
* il pass-through dell'identità utente funziona per gli archivi dati senza credenziali che puntano agli account di archiviazione BLOB di Azure, solo se l’eliminazione temporanea non è abilitata.
Accesso ai dati nella condivisione file predefinita
La condivisione file predefinita viene montata sia nel calcolo Spark serverless che nei pool di Spark collegati.
In studio di Azure Machine Learning i file nella condivisione file predefinita vengono visualizzati nell'albero delle directory nella scheda File. Il codice del notebook può accedere direttamente ai file archiviati in questa condivisione file con il protocollo file://
, insieme al percorso assoluto del file, senza più configurazioni. Questo frammento di codice mostra come accedere a un file archiviato nella condivisione file predefinita:
import os
import pyspark.pandas as pd
from pyspark.ml.feature import Imputer
abspath = os.path.abspath(".")
file = "file://" + abspath + "/Users/<USER>/data/titanic.csv"
print(file)
df = pd.read_csv(file, index_col="PassengerId")
imputer = Imputer(
inputCols=["Age"],
outputCol="Age").setStrategy("mean") # Replace missing values in Age column with the mean value
df.fillna(value={"Cabin" : "None"}, inplace=True) # Fill Cabin column with value "None" if missing
df.dropna(inplace=True) # Drop the rows which still have any missing value
output_path = "file://" + abspath + "/Users/<USER>/data/wrangled"
df.to_csv(output_path, index_col="PassengerId")
Nota
Questo esempio di codice Python usa pyspark.pandas
. Questo supporto è supportato solo dal runtime Spark versione 3.2 o successiva.