Utilità Microsoft Spark (MSSparkUtils) per Fabric
Microsoft Spark Utilities (MSSparkUtils) è un pacchetto predefinito che consente di eseguire facilmente attività comuni. È possibile usare MSSparkUtils per lavorare con i file system, per ottenere variabili di ambiente, per concatenare i notebook e per lavorare con i segreti. Il pacchetto MSSparkUtils è disponibile nelle pipeline di PySpark (Python), Scala, SparkR e Fabric.
Utilità di file system
mssparkutils.fs fornisce utilità per l'uso di vari file system, tra cui Azure Data Lake Archiviazione (ADLS) Gen2 e Archiviazione BLOB di Azure. Assicurarsi di configurare l'accesso ad Azure Data Lake Archiviazione Gen2 e Archiviazione BLOB di Azure in modo appropriato.
Eseguire i comandi seguenti per una panoramica dei metodi disponibili:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Output
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
MSSparkUtils funziona con il file system nello stesso modo delle API Spark. Usare mssparkuitls.fs.mkdirs() e Fabric lakehouse, ad esempio:
Utilizzo | Percorso relativo dalla radice HDFS | Percorso assoluto per il file system ABFS | Percorso assoluto per il file system locale nel nodo driver |
---|---|---|---|
Lakehouse non predefinito | Non supportato | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Lakehouse predefinito | Directory in "Files" o "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Elencare file
Per elencare il contenuto di una directory, usare mssparkutils.fs.ls('Percorso della directory').To list the content of a directory, use mssparkutils.fs.ls('Your directory path')). Ad esempio:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Visualizzazione delle proprietà di file
Questo metodo restituisce proprietà di file, tra cui nome file, percorso file, dimensioni del file e se si tratta di una directory e di un file.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Creare una nuova directory
Questo metodo crea la directory specificata se non esiste e crea le directory padre necessarie.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Copia file
Questo metodo copia un file o una directory e supporta l'attività di copia tra file system.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
File di copia con prestazioni elevate
Questo metodo offre un modo più rapido per copiare o spostare file, in particolare volumi elevati di dati.
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Anteprima del contenuto del file
Questo metodo restituisce fino ai primi byte 'maxBytes' del file specificato come stringa codificata in UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
Sposta file
Questo metodo sposta un file o una directory e supporta lo spostamento tra file system.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Scrivere il file
Questo metodo scrive la stringa specificata in un file codificato in UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Aggiungere contenuto a un file
Questo metodo aggiunge la stringa specificata a un file codificato in UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Eliminare file o directory
Questo metodo rimuove un file o una directory.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Directory di montaggio/smontaggio
Altre informazioni sull'utilizzo dettagliato in Montaggio file e smontaggio.
Utilità notebook
Usare le utilità notebook MSSparkUtils per eseguire un notebook o uscire da un notebook con un valore . Eseguire il comando seguente per ottenere una panoramica dei metodi disponibili:
mssparkutils.notebook.help()
Output:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Nota
Le utilità del notebook non sono applicabili per le definizioni di processo Apache Spark (SJD).
Fare riferimento a un notebook
Questo metodo fa riferimento a un notebook e restituisce il relativo valore di uscita. È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline. Il notebook a cui si fa riferimento viene eseguito nel pool di Spark del notebook che chiama questa funzione.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)
Ad esempio:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
È possibile aprire il collegamento snapshot dell'esecuzione del riferimento nell'output della cella. Lo snapshot acquisisce i risultati dell'esecuzione del codice e consente di eseguire facilmente il debug di un'esecuzione di riferimento.
Nota
- Attualmente, il notebook di Fabric supporta solo il riferimento ai notebook all'interno di un'area di lavoro.
- Se si usano i file in Risorsa notebook, usare
mssparkutils.nbResPath
nel notebook di riferimento per assicurarsi che punti alla stessa cartella dell'esecuzione interattiva.
Il riferimento esegue più notebook in parallelo
Il metodo mssparkutils.notebook.runMultiple()
consente di eseguire più notebook in parallelo o con una struttura topologica predefinita. L'API usa un meccanismo di implementazione multithread all'interno di una sessione Spark, il che significa che le risorse di calcolo vengono condivise dal notebook di riferimento.
Con mssparkutils.notebook.runMultiple()
è possibile:
Eseguire più notebook contemporaneamente, senza attendere il completamento di ognuno di essi.
Specificare le dipendenze e l'ordine di esecuzione per i notebook usando un formato JSON semplice.
Ottimizzare l'uso delle risorse di calcolo Spark e ridurre i costi dei progetti di Infrastruttura.
Visualizzare gli snapshot di ogni record di esecuzione del notebook nell'output e eseguire il debug/monitorare le attività del notebook in modo pratico.
Ottenere il valore di uscita di ogni attività executive e usarli nelle attività downstream.
È anche possibile provare a eseguire mssparkutils.notebook.help("runMultiple") per trovare l'esempio e l'utilizzo dettagliato.
Ecco un semplice esempio di esecuzione di un elenco di notebook in parallelo usando questo metodo:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
Il risultato dell'esecuzione del notebook radice è il seguente:
Di seguito è riportato un esempio di esecuzione di notebook con struttura topologica usando mssparkutils.notebook.runMultiple()
. Usare questo metodo per orchestrare facilmente i notebook tramite un'esperienza di codice.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
]
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
Il risultato dell'esecuzione del notebook radice è il seguente:
Nota
Il grado di parallelismo dell'esecuzione di più notebook è limitato alla risorsa di calcolo totale disponibile di una sessione Spark.
Uscire da un notebook
Questo metodo esce da un notebook con un valore . È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline.
Quando si chiama una funzione exit() da un notebook in modo interattivo, il notebook di Fabric genera un'eccezione, ignora l'esecuzione di celle successive e mantiene attiva la sessione Spark.
Quando si orchestra un notebook in una pipeline che chiama una funzione exit(), l'attività del notebook restituisce con un valore di uscita, completa l'esecuzione della pipeline e arresta la sessione Spark.
Quando si chiama una funzione exit() in un notebook a cui si fa riferimento, Fabric Spark interromperà l'ulteriore esecuzione del notebook a cui si fa riferimento e continuerà a eseguire le celle successive nel notebook principale che chiama la funzione run(). Ad esempio: Notebook1 ha tre celle e chiama una funzione exit() nella seconda cella. Notebook2 include cinque celle e chiamate run(notebook1) nella terza cella. Quando si esegue Notebook2, Notebook1 si arresta nella seconda cella quando si raggiunge la funzione exit(). Notebook2 continua a eseguire la quarta cella e la quinta cella.
mssparkutils.notebook.exit("value string")
Ad esempio:
Notebook Sample1 con due celle seguenti:
La cella 1 definisce un parametro di input con valore predefinito impostato su 10.
La cella 2 esce dal notebook con input come valore di uscita.
È possibile eseguire Sample1 in un altro notebook con valori predefiniti:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Output:
Notebook executed successfully with exit value 10
È possibile eseguire Sample1 in un altro notebook e impostare il valore di input su 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Output:
Notebook executed successfully with exit value 20
Utilità delle credenziali
È possibile usare le utilità delle credenziali MSSparkUtils per ottenere i token di accesso e gestire i segreti in un insieme di credenziali delle chiavi di Azure.
Eseguire il comando seguente per ottenere una panoramica dei metodi disponibili:
mssparkutils.credentials.help()
Output:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name
Ottenere un token
getToken restituisce un token Microsoft Entra per un determinato gruppo di destinatari e un nome (facoltativo). L'elenco seguente mostra le chiavi del gruppo di destinatari attualmente disponibili:
- risorsa gruppo di destinatari Archiviazione: "archiviazione"
- Risorsa di Power BI: "pbi"
- Risorsa di Azure Key Vault: "keyvault"
- Risorsa del database KQL di Synapse RTA: "kusto"
Eseguire il comando seguente per ottenere il token:
mssparkutils.credentials.getToken('audience Key')
Ottenere un segreto usando le credenziali utente
getSecret restituisce un segreto di Azure Key Vault per un determinato endpoint di Azure Key Vault e un nome segreto usando le credenziali utente.
mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
Montaggio e smontaggio di file
Fabric supporta gli scenari di montaggio seguenti nel pacchetto Utilità Di Microsoft Spark. È possibile usare le API mount, unmount, getMountPath() e mounts() per collegare l'archiviazione remota (ADLS Gen2) a tutti i nodi di lavoro (nodo driver e nodi di lavoro). Dopo aver creato il punto di montaggio dell'archiviazione, usare l'API file locale per accedere ai dati come se fossero archiviati nel file system locale.
Come montare un account ADLS Gen2
L'esempio seguente illustra come montare Azure Data Lake Archiviazione Gen2. Il montaggio di Archiviazione BLOB funziona in modo analogo.
In questo esempio si presuppone che sia presente un account Data Lake Archiviazione Gen2 denominato storegen2 e che l'account abbia un contenitore denominato mycontainer che si vuole montare in /test nella sessione spark del notebook.
Per montare il contenitore denominato mycontainer, mssparkutils deve prima verificare se si dispone dell'autorizzazione per accedere al contenitore. Attualmente Fabric supporta due metodi di autenticazione per l'operazione di montaggio del trigger: accountKey e sastoken.
Montare tramite token di firma di accesso condiviso o chiave dell'account
MSSparkUtils supporta il passaggio esplicito di una chiave dell'account o di un token di firma di accesso condiviso come parametro per montare la destinazione.
Per motivi di sicurezza, è consigliabile archiviare le chiavi dell'account o i token di firma di accesso condiviso in Azure Key Vault (come illustrato nello screenshot seguente). È quindi possibile recuperarli usando l'API mssparkutils.credentials.getSecret . Per altre informazioni su Azure Key Vault, vedere Informazioni sulle chiavi dell'account di archiviazione gestito di Azure Key Vault.
Codice di esempio per il metodo accountKey :
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Codice di esempio per sastoken:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Nota
Potrebbe essere necessario importare mssparkutils
se non è disponibile:
from notebookutils import mssparkutils
Parametri di montaggio:
- fileCacheTimeout: i BLOB verranno memorizzati nella cache nella cartella temporanea locale per 120 secondi per impostazione predefinita. Durante questo periodo blobfuse non verificherà se il file è aggiornato o meno. È possibile impostare il parametro per modificare il timeout predefinito. Quando più client modificano contemporaneamente i file, per evitare incoerenze tra file locali e remoti, è consigliabile abbreviare il tempo della cache o persino modificarlo su 0 e ottenere sempre i file più recenti dal server.
- timeout: il timeout dell'operazione di montaggio è 120 secondi per impostazione predefinita. È possibile impostare il parametro per modificare il timeout predefinito. Quando sono presenti troppi executor o quando si verifica il timeout del montaggio, è consigliabile aumentare il valore.
È possibile usare questi parametri come segue:
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Nota
Per motivi di sicurezza, è consigliabile non archiviare le credenziali nel codice. Per proteggere ulteriormente le credenziali, il segreto verrà elaborato nell'output del notebook. Per altre informazioni, vedere Rollforward dei segreti.
Come montare una lakehouse
Codice di esempio per il montaggio di una lakehouse in /test:
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
Accedere ai file nel punto di montaggio usando l'API mssparktuils fs
Lo scopo principale dell'operazione di montaggio è consentire ai clienti di accedere ai dati archiviati in un account di archiviazione remoto con un'API del file system locale. È anche possibile accedere ai dati usando l'API mssparkutils fs con un percorso montato come parametro. Questo formato di percorso è leggermente diverso.
Si supponga di aver montato il contenitore data lake Archiviazione Gen2 mycontainer in /test usando l'API di montaggio. Quando si accede ai dati con un'API del file system locale, il formato del percorso è simile al seguente:
/synfs/notebook/{sessionId}/test/{filename}
Per accedere ai dati usando l'API mssparkutils fs, è consigliabile usare getMountPath() per ottenere il percorso accurato:
path = mssparkutils.fs.getMountPath("/test")
Elencare le directory:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
Leggere il contenuto del file:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
Creare una directory:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
Accedere ai file nel punto di montaggio tramite il percorso locale
È possibile leggere e scrivere facilmente i file nel punto di montaggio usando il file system standard. Ecco un esempio di Python:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Come controllare i punti di montaggio esistenti
È possibile usare l'API mssparkutils.fs.mounts() per controllare tutte le informazioni sul punto di montaggio esistenti:
mssparkutils.fs.mounts()
Come smontare il punto di montaggio
Usare il codice seguente per smontare il punto di montaggio (/testare in questo esempio):
mssparkutils.fs.unmount("/test")
Limitazioni note
Il montaggio corrente è una configurazione a livello di processo; È consigliabile usare l'API mounts per verificare se esiste o non è disponibile un punto di montaggio.
Il meccanismo di smontaggio non è automatico. Al termine dell'esecuzione dell'applicazione, per smontare il punto di montaggio e rilasciare lo spazio su disco, è necessario chiamare in modo esplicito un'API di smontaggio nel codice. In caso contrario, il punto di montaggio sarà ancora presente nel nodo al termine dell'esecuzione dell'applicazione.
Il montaggio di un account di archiviazione DILS Gen1 non è supportato.
Utilità lakehouse
mssparkutils.lakehouse
fornisce utilità specifiche per la gestione degli artefatti lakehouse. Queste utilità consentono agli utenti di creare, recuperare, aggiornare ed eliminare facilmente gli artefatti Lakehouse.
Nota
Le API Lakehouse sono supportate solo in Runtime versione 1.2+.
Panoramica dei metodi
Di seguito è riportata una panoramica dei metodi disponibili forniti da mssparkutils.lakehouse
:
# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]
Esempi di utilizzo
Per usare questi metodi in modo efficace, considerare gli esempi di utilizzo seguenti:
Creazione di un artefatto Lakehouse
artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
Recupero di un elemento Lakehouse
artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")
Aggiornamento di un artefatto Lakehouse
updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
Eliminazione di un artefatto Lakehouse
is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")
Elenco degli artefatti lakehouse
artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")
Informazioni aggiuntive
Per informazioni più dettagliate su ogni metodo e sui relativi parametri, utilizzare la mssparkutils.lakehouse.help("methodName")
funzione .
Grazie alle utilità Lakehouse di MSSparkUtils, la gestione degli artefatti lakehouse diventa più efficiente e integrata nelle pipeline di Fabric, migliorando l'esperienza di gestione complessiva dei dati.
È possibile esplorare queste utilità e incorporarle nei flussi di lavoro di Fabric per una gestione degli artefatti lakehouse senza problemi.
Contenuto correlato
Commenti e suggerimenti
https://aka.ms/ContentUserFeedback.
Presto disponibile: nel corso del 2024 verranno dismessi i problemi di GitHub come meccanismo di feedback per il contenuto e verranno sostituiti con un nuovo sistema di feedback. Per altre informazioni, vedere:Invia e visualizza il feedback per