Utilità Microsoft Spark (MSSparkUtils) per Fabric

Microsoft Spark Utilities (MSSparkUtils) è un pacchetto predefinito che consente di eseguire facilmente attività comuni. È possibile usare MSSparkUtils per lavorare con i file system, per ottenere variabili di ambiente, per concatenare i notebook e per lavorare con i segreti. Il pacchetto MSSparkUtils è disponibile nelle pipeline di PySpark (Python), Scala, SparkR e Fabric.

Utilità di file system

mssparkutils.fs fornisce utilità per l'uso di vari file system, tra cui Azure Data Lake Archiviazione (ADLS) Gen2 e Archiviazione BLOB di Azure. Assicurarsi di configurare l'accesso ad Azure Data Lake Archiviazione Gen2 e Archiviazione BLOB di Azure in modo appropriato.

Eseguire i comandi seguenti per una panoramica dei metodi disponibili:

from notebookutils import mssparkutils
mssparkutils.fs.help()

Output

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils funziona con il file system nello stesso modo delle API Spark. Usare mssparkuitls.fs.mkdirs() e Fabric lakehouse, ad esempio:

Utilizzo Percorso relativo dalla radice HDFS Percorso assoluto per il file system ABFS Percorso assoluto per il file system locale nel nodo driver
Lakehouse non predefinito Non supportato mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")
Lakehouse predefinito Directory in "Files" o "Tables": mssparkutils.fs.mkdirs("Files/<new_dir>") mssparkutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>") mssparkutils.fs.mkdirs("file:/<new_dir>")

Elencare file

Per elencare il contenuto di una directory, usare mssparkutils.fs.ls('Percorso della directory').To list the content of a directory, use mssparkutils.fs.ls('Your directory path')). Ad esempio:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

Visualizzazione delle proprietà di file

Questo metodo restituisce proprietà di file, tra cui nome file, percorso file, dimensioni del file e se si tratta di una directory e di un file.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Creare una nuova directory

Questo metodo crea la directory specificata se non esiste e crea le directory padre necessarie.

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Copia file

Questo metodo copia un file o una directory e supporta l'attività di copia tra file system.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

File di copia con prestazioni elevate

Questo metodo offre un modo più rapido per copiare o spostare file, in particolare volumi elevati di dati.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Anteprima del contenuto del file

Questo metodo restituisce fino ai primi byte 'maxBytes' del file specificato come stringa codificata in UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)

Sposta file

Questo metodo sposta un file o una directory e supporta lo spostamento tra file system.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Scrivere il file

Questo metodo scrive la stringa specificata in un file codificato in UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Aggiungere contenuto a un file

Questo metodo aggiunge la stringa specificata a un file codificato in UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Eliminare file o directory

Questo metodo rimuove un file o una directory.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Directory di montaggio/smontaggio

Altre informazioni sull'utilizzo dettagliato in Montaggio file e smontaggio.

Utilità notebook

Usare le utilità notebook MSSparkUtils per eseguire un notebook o uscire da un notebook con un valore . Eseguire il comando seguente per ottenere una panoramica dei metodi disponibili:

mssparkutils.notebook.help()

Output:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Nota

Le utilità del notebook non sono applicabili per le definizioni di processo Apache Spark (SJD).

Fare riferimento a un notebook

Questo metodo fa riferimento a un notebook e restituisce il relativo valore di uscita. È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline. Il notebook a cui si fa riferimento viene eseguito nel pool di Spark del notebook che chiama questa funzione.

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>)

Ad esempio:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

È possibile aprire il collegamento snapshot dell'esecuzione del riferimento nell'output della cella. Lo snapshot acquisisce i risultati dell'esecuzione del codice e consente di eseguire facilmente il debug di un'esecuzione di riferimento.

Screenshot del risultato dell'esecuzione di riferimento.

Screenshot di un esempio di snapshot.

Nota

  • Attualmente, il notebook di Fabric supporta solo il riferimento ai notebook all'interno di un'area di lavoro.
  • Se si usano i file in Risorsa notebook, usare mssparkutils.nbResPath nel notebook di riferimento per assicurarsi che punti alla stessa cartella dell'esecuzione interattiva.

Il riferimento esegue più notebook in parallelo

Il metodo mssparkutils.notebook.runMultiple() consente di eseguire più notebook in parallelo o con una struttura topologica predefinita. L'API usa un meccanismo di implementazione multithread all'interno di una sessione Spark, il che significa che le risorse di calcolo vengono condivise dal notebook di riferimento.

Con mssparkutils.notebook.runMultiple()è possibile:

  • Eseguire più notebook contemporaneamente, senza attendere il completamento di ognuno di essi.

  • Specificare le dipendenze e l'ordine di esecuzione per i notebook usando un formato JSON semplice.

  • Ottimizzare l'uso delle risorse di calcolo Spark e ridurre i costi dei progetti di Infrastruttura.

  • Visualizzare gli snapshot di ogni record di esecuzione del notebook nell'output e eseguire il debug/monitorare le attività del notebook in modo pratico.

  • Ottenere il valore di uscita di ogni attività executive e usarli nelle attività downstream.

È anche possibile provare a eseguire mssparkutils.notebook.help("runMultiple") per trovare l'esempio e l'utilizzo dettagliato.

Ecco un semplice esempio di esecuzione di un elenco di notebook in parallelo usando questo metodo:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Il risultato dell'esecuzione del notebook radice è il seguente:

Screenshot del riferimento a un elenco di notebook.

Di seguito è riportato un esempio di esecuzione di notebook con struttura topologica usando mssparkutils.notebook.runMultiple(). Usare questo metodo per orchestrare facilmente i notebook tramite un'esperienza di codice.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Il risultato dell'esecuzione del notebook radice è il seguente:

Screenshot del riferimento a un elenco di notebook con parametri.

Nota

Il grado di parallelismo dell'esecuzione di più notebook è limitato alla risorsa di calcolo totale disponibile di una sessione Spark.

Uscire da un notebook

Questo metodo esce da un notebook con un valore . È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline.

  • Quando si chiama una funzione exit() da un notebook in modo interattivo, il notebook di Fabric genera un'eccezione, ignora l'esecuzione di celle successive e mantiene attiva la sessione Spark.

  • Quando si orchestra un notebook in una pipeline che chiama una funzione exit(), l'attività del notebook restituisce con un valore di uscita, completa l'esecuzione della pipeline e arresta la sessione Spark.

  • Quando si chiama una funzione exit() in un notebook a cui si fa riferimento, Fabric Spark interromperà l'ulteriore esecuzione del notebook a cui si fa riferimento e continuerà a eseguire le celle successive nel notebook principale che chiama la funzione run(). Ad esempio: Notebook1 ha tre celle e chiama una funzione exit() nella seconda cella. Notebook2 include cinque celle e chiamate run(notebook1) nella terza cella. Quando si esegue Notebook2, Notebook1 si arresta nella seconda cella quando si raggiunge la funzione exit(). Notebook2 continua a eseguire la quarta cella e la quinta cella.

mssparkutils.notebook.exit("value string")

Ad esempio:

Notebook Sample1 con due celle seguenti:

  • La cella 1 definisce un parametro di input con valore predefinito impostato su 10.

  • La cella 2 esce dal notebook con input come valore di uscita.

Screenshot che mostra un notebook di esempio della funzione exit.

È possibile eseguire Sample1 in un altro notebook con valori predefiniti:

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

Output:

Notebook executed successfully with exit value 10

È possibile eseguire Sample1 in un altro notebook e impostare il valore di input su 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

Notebook executed successfully with exit value 20

Utilità delle credenziali

È possibile usare le utilità delle credenziali MSSparkUtils per ottenere i token di accesso e gestire i segreti in un insieme di credenziali delle chiavi di Azure.

Eseguire il comando seguente per ottenere una panoramica dei metodi disponibili:

mssparkutils.credentials.help()

Output:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

Ottenere un token

getToken restituisce un token Microsoft Entra per un determinato gruppo di destinatari e un nome (facoltativo). L'elenco seguente mostra le chiavi del gruppo di destinatari attualmente disponibili:

  • risorsa gruppo di destinatari Archiviazione: "archiviazione"
  • Risorsa di Power BI: "pbi"
  • Risorsa di Azure Key Vault: "keyvault"
  • Risorsa del database KQL di Synapse RTA: "kusto"

Eseguire il comando seguente per ottenere il token:

mssparkutils.credentials.getToken('audience Key')

Ottenere un segreto usando le credenziali utente

getSecret restituisce un segreto di Azure Key Vault per un determinato endpoint di Azure Key Vault e un nome segreto usando le credenziali utente.

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Montaggio e smontaggio di file

Fabric supporta gli scenari di montaggio seguenti nel pacchetto Utilità Di Microsoft Spark. È possibile usare le API mount, unmount, getMountPath() e mounts() per collegare l'archiviazione remota (ADLS Gen2) a tutti i nodi di lavoro (nodo driver e nodi di lavoro). Dopo aver creato il punto di montaggio dell'archiviazione, usare l'API file locale per accedere ai dati come se fossero archiviati nel file system locale.

Come montare un account ADLS Gen2

L'esempio seguente illustra come montare Azure Data Lake Archiviazione Gen2. Il montaggio di Archiviazione BLOB funziona in modo analogo.

In questo esempio si presuppone che sia presente un account Data Lake Archiviazione Gen2 denominato storegen2 e che l'account abbia un contenitore denominato mycontainer che si vuole montare in /test nella sessione spark del notebook.

Screenshot che mostra dove selezionare un contenitore da montare.

Per montare il contenitore denominato mycontainer, mssparkutils deve prima verificare se si dispone dell'autorizzazione per accedere al contenitore. Attualmente Fabric supporta due metodi di autenticazione per l'operazione di montaggio del trigger: accountKey e sastoken.

Montare tramite token di firma di accesso condiviso o chiave dell'account

MSSparkUtils supporta il passaggio esplicito di una chiave dell'account o di un token di firma di accesso condiviso come parametro per montare la destinazione.

Per motivi di sicurezza, è consigliabile archiviare le chiavi dell'account o i token di firma di accesso condiviso in Azure Key Vault (come illustrato nello screenshot seguente). È quindi possibile recuperarli usando l'API mssparkutils.credentials.getSecret . Per altre informazioni su Azure Key Vault, vedere Informazioni sulle chiavi dell'account di archiviazione gestito di Azure Key Vault.

Screenshot che mostra dove vengono archiviati i segreti in un insieme di credenziali delle chiavi di Azure.

Codice di esempio per il metodo accountKey :

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Codice di esempio per sastoken:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Nota

Potrebbe essere necessario importare mssparkutils se non è disponibile:

from notebookutils import mssparkutils

Parametri di montaggio:

  • fileCacheTimeout: i BLOB verranno memorizzati nella cache nella cartella temporanea locale per 120 secondi per impostazione predefinita. Durante questo periodo blobfuse non verificherà se il file è aggiornato o meno. È possibile impostare il parametro per modificare il timeout predefinito. Quando più client modificano contemporaneamente i file, per evitare incoerenze tra file locali e remoti, è consigliabile abbreviare il tempo della cache o persino modificarlo su 0 e ottenere sempre i file più recenti dal server.
  • timeout: il timeout dell'operazione di montaggio è 120 secondi per impostazione predefinita. È possibile impostare il parametro per modificare il timeout predefinito. Quando sono presenti troppi executor o quando si verifica il timeout del montaggio, è consigliabile aumentare il valore.

È possibile usare questi parametri come segue:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Nota

Per motivi di sicurezza, è consigliabile non archiviare le credenziali nel codice. Per proteggere ulteriormente le credenziali, il segreto verrà elaborato nell'output del notebook. Per altre informazioni, vedere Rollforward dei segreti.

Come montare una lakehouse

Codice di esempio per il montaggio di una lakehouse in /test:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

Accedere ai file nel punto di montaggio usando l'API mssparktuils fs

Lo scopo principale dell'operazione di montaggio è consentire ai clienti di accedere ai dati archiviati in un account di archiviazione remoto con un'API del file system locale. È anche possibile accedere ai dati usando l'API mssparkutils fs con un percorso montato come parametro. Questo formato di percorso è leggermente diverso.

Si supponga di aver montato il contenitore data lake Archiviazione Gen2 mycontainer in /test usando l'API di montaggio. Quando si accede ai dati con un'API del file system locale, il formato del percorso è simile al seguente:

/synfs/notebook/{sessionId}/test/{filename}

Per accedere ai dati usando l'API mssparkutils fs, è consigliabile usare getMountPath() per ottenere il percorso accurato:

path = mssparkutils.fs.getMountPath("/test")
  • Elencare le directory:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • Leggere il contenuto del file:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Creare una directory:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

Accedere ai file nel punto di montaggio tramite il percorso locale

È possibile leggere e scrivere facilmente i file nel punto di montaggio usando il file system standard. Ecco un esempio di Python:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Come controllare i punti di montaggio esistenti

È possibile usare l'API mssparkutils.fs.mounts() per controllare tutte le informazioni sul punto di montaggio esistenti:

mssparkutils.fs.mounts()

Come smontare il punto di montaggio

Usare il codice seguente per smontare il punto di montaggio (/testare in questo esempio):

mssparkutils.fs.unmount("/test")

Limitazioni note

  • Il montaggio corrente è una configurazione a livello di processo; È consigliabile usare l'API mounts per verificare se esiste o non è disponibile un punto di montaggio.

  • Il meccanismo di smontaggio non è automatico. Al termine dell'esecuzione dell'applicazione, per smontare il punto di montaggio e rilasciare lo spazio su disco, è necessario chiamare in modo esplicito un'API di smontaggio nel codice. In caso contrario, il punto di montaggio sarà ancora presente nel nodo al termine dell'esecuzione dell'applicazione.

  • Il montaggio di un account di archiviazione DILS Gen1 non è supportato.

Utilità lakehouse

mssparkutils.lakehouse fornisce utilità specifiche per la gestione degli artefatti lakehouse. Queste utilità consentono agli utenti di creare, recuperare, aggiornare ed eliminare facilmente gli artefatti Lakehouse.

Nota

Le API Lakehouse sono supportate solo in Runtime versione 1.2+.

Panoramica dei metodi

Di seguito è riportata una panoramica dei metodi disponibili forniti da mssparkutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

Esempi di utilizzo

Per usare questi metodi in modo efficace, considerare gli esempi di utilizzo seguenti:

Creazione di un artefatto Lakehouse

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Recupero di un elemento Lakehouse

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

Aggiornamento di un artefatto Lakehouse

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Eliminazione di un artefatto Lakehouse

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Elenco degli artefatti lakehouse

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

Informazioni aggiuntive

Per informazioni più dettagliate su ogni metodo e sui relativi parametri, utilizzare la mssparkutils.lakehouse.help("methodName") funzione .

Grazie alle utilità Lakehouse di MSSparkUtils, la gestione degli artefatti lakehouse diventa più efficiente e integrata nelle pipeline di Fabric, migliorando l'esperienza di gestione complessiva dei dati.

È possibile esplorare queste utilità e incorporarle nei flussi di lavoro di Fabric per una gestione degli artefatti lakehouse senza problemi.