Introduzione a Fabric MSSparkUtils
Microsoft Spark Utilities (MSSparkUtils) è un pacchetto predefinito che consente di eseguire facilmente attività comuni. È possibile usareMSSparkUtils per lavorare con i file system, per ottenere variabili di ambiente, per concatenare i notebook e per lavorare con i segreti. MSSparkUtils è disponibile in PySpark (Python) Scala, notebook SparkR e pipeline di Microsoft Fabric.
Importante
Microsoft Fabric è attualmente disponibile in ANTEPRIMA. Queste informazioni si riferiscono a un prodotto in versione preliminare che può essere modificato in modo sostanziale prima del rilascio. Microsoft non fornisce alcuna garanzia, espressa o implicita, rispetto alle informazioni fornite qui.
Utilità di file system
mssparkutils.fs fornisce utilità per l'uso di vari file system, tra cui Azure Data Lake Storage Gen2 (ADLS Gen2) e Archiviazione BLOB di Azure. Assicurarsi di configurare l'accesso a Azure Data Lake Storage Gen2 e Archiviazione BLOB di Azure in modo appropriato.
Eseguire i comandi seguenti per una panoramica dei metodi disponibili:
from notebookutils import mssparkutils
mssparkutils.fs.help()
Output
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use mssparkutils.fs.help("methodName") for more info about a method.
mssparkutils funziona con il file system nello stesso modo delle API Spark. Usare mssparkuitls.fs.mkdirs() e Fabric Lakehouse, ad esempio:
Utilizzo | Percorso relativo dalla radice HDFS | Percorso assoluto per il file system ABFS | Percorso assoluto per il file system locale nel nodo del driver |
---|---|---|---|
Lakehouse non predefinito | Non supportato | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Lakehouse predefinito | Directory in "Files" o "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") | mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") | mssparkutils.fs.mkdirs("file:/<new_dir>") |
Elencare file
Elencare il contenuto di una directory, usare mssparkutils.fs.ls('Percorso directory') ad esempio:
mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
mssparkutils.fs.ls("file:/tmp") # based on local file system of driver node
Visualizzazione delle proprietà di file
Restituisce le proprietà del file, tra cui nome file, percorso file, dimensioni del file e se si tratta di una directory e di un file.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
Creare una nuova directory
Crea la directory specificata se non esiste e le directory padre necessarie.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs. mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
mssparkutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
Copia file
Copia un file o una directory. Supporta la copia tra file system.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
Anteprima del contenuto del file
Restituisce fino ai primi byte 'maxBytes' del file specificato come stringa codificata in UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
Sposta file
Sposta un file o una directory. Supporta lo spostamento tra file system.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Scrivere un file
Scrive la stringa specificata in un file codificato in UTF-8. Scrive la stringa specificata in un file codificato in UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Accodare contenuto a un file
Aggiunge la stringa specificata a un file codificato in UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Eliminare file o directory
Rimuove un file o una directory.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Montare/smontare la directory
È possibile trovare l'utilizzo dettagliato in Montaggio file e smontaggio.
Utilità notebook
Usare le utilità notebook MSSparkUtils per eseguire un notebook o uscire da un notebook con un valore. Eseguire il comando seguente per ottenere una panoramica dei metodi disponibili:
mssparkutils.notebook.help()
Output:
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Fare riferimento a un notebook
Fare riferimento a un notebook e restituisce il relativo valore di uscita. È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline. Il notebook a cui si fa riferimento viene eseguito nel pool di Spark di cui il notebook chiama questa funzione.
mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)
Ad esempio:
mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
È possibile aprire il collegamento snapshot di riferimento eseguito nell'output della cella, lo snapshot acquisisce i risultati dell'esecuzione del codice e consente di eseguire facilmente il debug di un'esecuzione di riferimento.
Nota
Attualmente Il notebook di Fabric supporta solo il riferimento ai notebook all'interno di un'area di lavoro.
Uscire da un notebook
Esce da un notebook con un valore . È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline.
- Quando si chiama una funzione exit() da un notebook in modo interattivo, fabric notebook genera un'eccezione, ignora l'esecuzione di celle di sottosequenza e mantiene attiva la sessione Spark.
- Quando si orchestra un notebook nella pipeline che chiama una funzione exit(), l'attività Notebook restituirà con un valore di uscita, completare l'esecuzione della pipeline e arrestare la sessione Spark.
- Quando si chiama una funzione exit() in un notebook a cui si fa riferimento, Fabric Spark interromperà l'ulteriore esecuzione del notebook a cui si fa riferimento e continuerà a eseguire le celle successive nel notebook principale che chiama la funzione run(). Ad esempio: Notebook1 ha tre celle e chiama una funzione exit() nella seconda cella. Notebook2 ha cinque celle e chiamate run(notebook1) nella terza cella. Quando si esegue Notebook2, Notebook1 si arresta nella seconda cella quando si raggiunge la funzione exit(). Notebook2 continua a eseguire la quarta cella e la quinta cella.
mssparkutils.notebook.exit("value string")
Ad esempio:
Notebook Sample1 con due celle seguenti:
La cella 1 definisce un parametro di input con valore predefinito impostato su 10.
La cella 2 esce dal notebook con input come valore di uscita.
È possibile eseguire Sample1 in un altro notebook con valori predefiniti:
exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)
Output:
Notebook executed successfully with exit value 10
È possibile eseguire Sample1 in un altro notebook e impostare il valore di input su 20:
exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Output:
Notebook executed successfully with exit value 20
Utilità delle credenziali
È possibile usare le utilità delle credenziali MSSparkUtils per ottenere i token di accesso e gestire i segreti in Azure Key Vault.
Eseguire il comando seguente per ottenere una panoramica dei metodi disponibili:
mssparkutils.credentials.help()
Output:
getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
Ottenere un token
Restituisce il token di Azure AD per un determinato gruppo di destinatari, nome (facoltativo), L'elenco seguente mostra le chiavi del gruppo di destinatari attualmente disponibili:
- Risorsa gruppo di destinatari archiviazione: "archiviazione"
- Risorsa di Power BI: "pbi"
- Risorsa Key Vault di Azure: "keyvault"
- Risorsa KQL RTA synapse: "kusto"
Eseguire il comando seguente per ottenere il token:
mssparkutils.credentials.getToken('audience Key')
Ottenere un segreto usando le credenziali utente
Restituisce il segreto di Azure Key Vault per un determinato nome Key Vault di Azure, il nome del segreto e il nome del servizio collegato usando le credenziali utente.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Montaggio e smontaggio di file
Gli scenari di montaggio del supporto di Microsoft Fabric nel pacchetto microsoft Spark Utilities. È possibile usare le API mount, unmount, getMountPath() e mounts() per collegare l'archiviazione remota (Azure Data Lake Storage Gen2) a tutti i nodi di lavoro (nodo driver e nodi di lavoro). Dopo aver installato il punto di montaggio dell'archiviazione, usare l'API file locale per accedere ai dati come se fosse archiviato nel file system locale.
Come montare un account ADLS Gen2
Questa sezione illustra come montare Azure Data Lake Storage Gen2 passaggio per passaggio come esempio. Il montaggio dell'archiviazione BLOB funziona in modo analogo.
L'esempio presuppone che sia presente un account Data Lake Storage Gen2 denominato storegen2. L'account ha un contenitore denominato mycontainer che si vuole montare in /test nella sessione Spark del notebook.
Per montare il contenitore denominato mycontainer, mssparkutils deve prima controllare se si dispone dell'autorizzazione per accedere al contenitore. Attualmente, Microsoft Fabric supporta due metodi di autenticazione per l'operazione di montaggio del trigger: accountKey e sastoken.
Montare tramite token di firma di accesso condiviso o chiave dell'account
Mssparkutils supporta il passaggio esplicito di una chiave account o un token di firma di accesso condiviso come parametro per montare la destinazione.
Per motivi di sicurezza, è consigliabile archiviare le chiavi dell'account o i token di firma di accesso condiviso in Azure Key Vault (come illustrato nello screenshot di esempio seguente). È quindi possibile recuperarli usando l'API mssparkutils.credentials.getSecret . Per l'uso di Azure Key Vault, vedere Informazioni sulle chiavi dell'account di archiviazione gestito di Azure Key Vault.
Ecco il codice di esempio dell'uso del metodo accountKey:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Per sastoken, fare riferimento al codice di esempio seguente:
from notebookutils import mssparkutils
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Nota
Per motivi di sicurezza, non è consigliabile archiviare le credenziali nel codice. Per proteggere ulteriormente le credenziali, il segreto verrà redatto nell'output del notebook. Per altri dettagli, vedere Redaction secret (Redaction segreto).
Come montare una lakehouse
Ecco il codice di esempio di montaggio di una lakehouse in /test.
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>",
"/test"
)
Accedere ai file nel punto di montaggio usando l'API mssparktuils fs
Lo scopo principale dell'operazione di montaggio è consentire ai clienti di accedere ai dati archiviati in un account di archiviazione remoto usando un'API del file system locale. È anche possibile accedere ai dati usando l'API mssparkutils fs con un percorso montato come parametro. Il formato del percorso usato qui è leggermente diverso.
Si supponga di aver montato il contenitore Data Lake Storage Gen2 mycontainer in /test usando l'API di montaggio. Quando si accede ai dati usando un'API del file system locale, il formato del percorso è simile al seguente:
/synfs/notebook/{sessionId}/test/{filename}
Per accedere ai dati usando l'API mssparkutils fs, è consigliabile usare getMountPath() per ottenere il percorso accurato:
path = mssparkutils.fs.getMountPath("/test")
Elencare le directory:
mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
Leggere il contenuto del file:
mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
Creare una directory:
mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
Accedere ai file nel punto di montaggio tramite il percorso locale
È possibile leggere e scrivere facilmente i file nel punto di montaggio usando il modo di file system standard, usare Python come esempio:
#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
Come controllare i punti di montaggio esistenti
È possibile usare l'API mssparkutils.fs.mounts() per controllare tutte le informazioni sul punto di montaggio esistenti:
mssparkutils.fs.mounts()
Come smontare il punto di montaggio
Usare il codice seguente per smontare il punto di montaggio (/testare in questo esempio):
mssparkutils.fs.unmount("/test")
Limitazioni note
- Il montaggio corrente è una configurazione a livello di processo, è consigliabile usare l'API mounts per verificare se il punto di montaggio esiste o non è disponibile.
- Il meccanismo di smontaggio non è automatico. Al termine dell'esecuzione dell'applicazione, per smontare il punto di montaggio per rilasciare lo spazio su disco, è necessario chiamare in modo esplicito un'API di smontaggio nel codice. In caso contrario, il punto di montaggio sarà ancora presente nel nodo al termine dell'esecuzione dell'applicazione.
- Il montaggio di un account di archiviazione ADLS Gen1 non è supportato.