Condividi tramite


NotebookUtils (in precedenza MSSparkUtils) per Fabric

Notebook Utilities (NotebookUtils) è un pacchetto predefinito che consente di eseguire facilmente attività comuni in un notebook Fabric. È possibile usare NotebookUtils per gestire file system, ottenere variabili di ambiente, concatenare notebook e gestire segreti. Il pacchetto NotebookUtils è disponibile per i notebook PySpark (Python) Scala, SparkR, e le pipeline di Fabric.

Nota

  • MsSparkUtils è stato rinominato ufficialmente in NotebookUtils. Il codice esistente rimane retrocompatibile e non causa alcun cambiamento drastico. È fortemente consigliato di eseguire l'aggiornamento a notebookutils per garantire il supporto continuo e l'accesso alle nuove funzionalità. Il namespace mssparkutils verrà dismesso in futuro.
  • NotebookUtils è progettato per funzionare con Spark 3.4 (Runtime v1.2) e versioni successive. Tutte le nuove funzionalità e gli aggiornamenti saranno supportati esclusivamente con lo spazio dei nomi notebookutils d'ora in poi.

Utilità di file system

notebookutils.fs fornisce utilità per l'uso di vari file system, tra cui Azure Data Lake Storage (ADLS) Gen2 e Archiviazione BLOB di Azure. Assicurarsi che l'accesso ad Azure Data Lake Storage Gen2 e Azure Blob Storage sia configurato in modo appropriato.

Usare i comandi seguenti per una panoramica dei metodi disponibili:

notebookutils.fs.help()

Risultato

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

NotebookUtils funziona con il file system nello stesso modo delle API Spark. Usare notebookutils.fs.mkdirs() e Fabric Lakehouse come esempio:

Utilizzo Percorso relativo dalla radice HDFS Percorso assoluto per il file system ABFS Percorso assoluto del file system locale nel nodo driver
Lakehouse non predefinito Non supportato notebookutils.fs.mkdirs("abfss://<nome_contenitore>@<nome_account_archiviazione>.dfs.core.windows.net/<nuova_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
Lakehouse predefinito Directory sotto 'Files' o 'Tables': notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://<nome_contenitore>@<nome_account_archiviazione>.dfs.core.windows.net/<nuova_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")

Nota

  • Per il Lakehouse predefinito, i percorsi dei file sono montati nel tuo notebook con un timeout predefinito della cache di 120 secondi. Ciò significa che i file vengono memorizzati nella cache nella cartella temporanea locale del notebook per 120 secondi, anche se vengono rimossi da Lakehouse. Se si desidera modificare la regola di timeout, è possibile smontare i percorsi di file Lakehouse predefiniti e montarli di nuovo con un valore diverso fileCacheTimeout.
  • Per le configurazioni Lakehouse non predefinite, è possibile impostare il parametro fileCacheTimeout appropriato durante il montaggio dei percorsi Lakehouse. L'impostazione del timeout su 0 garantisce che il file più recente venga recuperato dal server Lakehouse.

Elencare file

Per elencare il contenuto di una directory, usare notebookutils.fs.ls('Percorso directory'). Ad esempio:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

L'API notebookutils.fs.ls() si comporta diversamente quando si utilizza un percorso relativo, a seconda del tipo di notebook.

  • In un notebook Spark: il percorso relativo è riferito al percorso ABFSS predefinito di Lakehouse. Ad esempio, notebookutils.fs.ls("Files") punta alla directory Files nel Lakehouse predefinito.

    Ad esempio:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • In un notebook Python: il percorso relativo si riferisce alla directory di lavoro del file system locale, che per impostazione predefinita è /home/trusted-service-user/work. Pertanto, è consigliabile usare il percorso completo anziché un percorso relativo notebookutils.fs.ls("/lakehouse/default/Files") per accedere alla directory Files nel Lakehouse di default.

    Ad esempio:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Visualizzazione delle proprietà di file

Questo metodo restituisce le proprietà dei file, tra cui nome file, percorso file, dimensioni del file e se si tratta di una directory e di un file.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Creare una nuova cartella

Questo metodo crea la cartella specificata, se non esiste, e crea eventuali cartelle padre necessarie.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Copia il file

Questo metodo copia un file o una directory e supporta l'attività di copia tra file system.

notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Nota

A causa delle limitazioni del collegamento OneLake, quando è necessario usare notebookutils.fs.cp() per copiare dati da un collegamento di tipo S3/GCS, è consigliabile usare un percorso montato invece di un percorso abfss.

File di copia con prestazioni elevate

Questo metodo offre un approccio più efficiente alla copia o allo spostamento di file, in particolare quando si gestiscono volumi di dati di grandi dimensioni. Per migliorare le prestazioni in Fabric, è consigliabile usare fastcp come sostituto del metodo cp tradizionale.

Nota

  • notebookutils.fs.fastcp() non supporta la copia di file in OneLake tra aree. In questo caso, è possibile usare notebookutils.fs.cp() invece .
  • A causa delle limitazioni del collegamento OneLake, quando è necessario usare notebookutils.fs.fastcp() per copiare dati da un collegamento di tipo S3/GCS, è consigliabile usare un percorso montato invece di un percorso abfss.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Anteprima del contenuto del file

Questo metodo restituisce fino ai primi 'maxBytes' byte del file specificato sotto forma di stringa codificata in UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Sposta file

Questo metodo sposta un file o una directory e supporta spostamenti tra file system.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Scrivi il file

Questo metodo scrive la stringa specificata in un file codificato in UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Accodare contenuto a un file

Questo metodo accoda la stringa specificata a un file codificato in UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Nota

  • notebookutils.fs.append() e notebookutils.fs.put() non supportano la scrittura simultanea nello stesso file a causa della mancanza di garanzie di atomicità.
  • Quando si usa l'API notebookutils.fs.append in un ciclo for per scrivere nello stesso file, è consigliabile aggiungere un'istruzione sleep circa 0,5 s ~ 1 s tra le scritture ricorrenti. Questo suggerimento è dovuto al fatto che l'operazione interna notebookutils.fs.append dell'API flush è asincrona, quindi un breve ritardo aiuta a garantire l'integrità dei dati.

Eliminare un file o una directory

Questo metodo rimuove un file o una directory.

notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Montare/smontare cartella

Per altre informazioni sull'uso dettagliato, vedere Montaggio e smontaggio di file.

Utilità per notebook

Utilizzare le utilità del notebook per eseguire un notebook o terminarlo restituendo un valore. Usare il comando seguente per ottenere una panoramica dei metodi disponibili:

notebookutils.notebook.help()

Prodotto:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.

[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Nota

Le utilità notebook non sono applicabili per le definizioni di lavoro di Apache Spark (SJD).

Fare riferimento a un notebook

Questo metodo fa riferimento a un notebook e restituisce il relativo valore di uscita. È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline. Il notebook a cui si fa riferimento funziona sul pool di Spark del notebook che chiama questa funzione.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Ad esempio:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Un notebook Fabric supporta anche il riferimento ai notebook in più aree di lavoro specificando l'ID area di lavoro.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

È possibile aprire il link allo snapshot del run di riferimento nell'output della cella. Lo snapshot acquisisce i risultati dell'esecuzione del codice e consente di eseguire facilmente il debug di un'esecuzione di riferimento.

Screenshot del risultato dell'esecuzione di riferimento.

Screenshot di un esempio di snapshot.

Nota

  • Il notebook di riferimento tra aree di lavoro è supportato dal runtime versione 1.2 e successive.
  • Se si utilizzano i file sotto Risorsa Notebook, utilizzare notebookutils.nbResPath nel notebook di riferimento per assicurarsi che punti alla stessa cartella dell'esecuzione interattiva.

Consultare l'esecuzione in parallelo di più notebook.

Importante

Questa funzionalità è in anteprima.

Il metodo notebookutils.notebook.runMultiple() consente di eseguire più notebook in parallelo o con una struttura topologica predefinita. L'API utilizza un meccanismo di implementazione multithread all'interno di una sessione Spark, il che significa che i notebook di riferimento in esecuzione condividono le risorse di calcolo.

Con notebookutils.notebook.runMultiple() è possibile:

  • Eseguire più notebook contemporaneamente senza attendere il completamento di ognuno.

  • Specificare le dipendenze e l'ordine di esecuzione per i notebook usando un formato JSON semplice.

  • Ottimizzare l'uso delle risorse di calcolo di Spark e ridurre i costi dei progetti Fabric.

  • Visualizzare gli snapshot di ogni record di esecuzione del notebook nell'output e effettuare comodamente il debug e il monitoraggio delle attività del notebook.

  • Ottieni il valore di uscita di ogni attività esecutiva e usalo nelle attività successive.

È anche possibile provare a eseguire notebookutils.notebook.help("runMultiple") per trovare l'esempio e l'uso dettagliato.

Di seguito è riportato un semplice esempio di esecuzione di un elenco di notebook in parallelo usando questo metodo:


notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Il risultato dell'esecuzione del notebook radice è il seguente:

Screenshot di un elenco di notebook di riferimento.

Di seguito è riportato un esempio di esecuzione di notebook con struttura topologica usando notebookutils.notebook.runMultiple(). Usare questo metodo per orchestrare facilmente i notebook tramite un'esperienza di programmazione.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

Il risultato dell'esecuzione del notebook radice è il seguente:

Screenshot di un elenco di notebook con parametri come riferimento.

È disponibile anche un metodo per verificare se il DAG è definito correttamente.

notebookutils.notebook.validateDAG(DAG)

Nota

  • Il grado di parallelismo dell'esecuzione di più notebook è limitato alla risorsa di calcolo totale disponibile di una sessione Spark.
  • Il limite massimo per le attività del notebook o i notebook simultanei è 50. Il superamento di questo limite può causare problemi di stabilità e prestazioni a causa dell'utilizzo elevato delle risorse di calcolo. In caso di problemi, considera la possibilità di separare i notebook in più chiamate di runMultiple oppure di ridurre la concorrenza modificando il campo concorrenza nel parametro DAG.
  • Il timeout predefinito per l'intero DAG è di 12 ore e il timeout predefinito per ogni cella del notebook figlio è di 90 secondi. È possibile modificare il timeout impostando i campi timeoutInSeconds e timeoutPerCellInSeconds nel parametro DAG.

Uscire da un notebook

Questo metodo termina un notebook restituendo un valore. È possibile eseguire chiamate di funzione di annidamento in un notebook in modo interattivo o in una pipeline.

  • Quando si chiama una funzione exit() da un notebook in modo interattivo, il notebook Fabric genera un'eccezione, ignora l'esecuzione delle celle successive e mantiene attiva la sessione Spark.

  • Quando si orchestra un notebook in una pipeline che chiama una funzione exit(), l'attività del notebook restituisce un valore di uscita. Questa operazione completa l'esecuzione della pipeline e arresta la sessione Spark.

  • Quando si chiama una funzione exit() in un notebook a cui si fa riferimento, Fabric Spark interromperà l'ulteriore esecuzione del notebook di riferimento e continuerà l’esecuzione delle celle successive nel notebook principale che chiama la funzione run(). Esempio: Notebook1 ha tre celle e chiama una funzione exit() nella seconda cella. Notebook2 ha cinque celle e chiama run(notebook1) nella terza cella. Quando si esegue Notebook2, Notebook1 si arresta nella seconda cella quando si raggiunge la funzione exit(). Notebook2 sta continuando a eseguire la quarta e la quinta cella.

notebookutils.notebook.exit("value string")

Nota

La funzione exit() sovrascrive l'output della cella corrente. Per evitare di perdere l'output di altre istruzioni di codice, chiama notebookutils.notebook.exit() in una cella separata.

Ad esempio:

Notebook Sample1 con le due celle seguenti:

  • La cella 1 definisce un parametro di input con il valore predefinito impostato su 10.

  • La cella 2 esce dal notebook utilizzando input come valore di uscita.

Screenshot che mostra un esempio di notebook con la funzione di uscita.

È possibile eseguire Sample1 in un altro notebook con i valori predefiniti:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Prodotto:

Notebook is executed successfully with exit value 10

È possibile eseguire Sample1 in un altro notebook e impostare il valore di input su 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Prodotto:

Notebook is executed successfully with exit value 20

Gestire gli artefatti del notebook

notebookutils.notebook fornisce utilità specializzate per la gestione degli elementi del notebook a livello di codice. Queste API consentono di creare, ottenere, aggiornare ed eliminare facilmente elementi del notebook.

Per usare questi metodi in modo efficace, considerare gli esempi di utilizzo seguenti:

Creazione di un Notebook

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Recupero del contenuto di un notebook

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Aggiornamento di un Notebook

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Eliminazione di un Notebook

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Elencazione dei notebook in un'area di lavoro

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Utilità delle Funzioni per i dati utente (UDF, User Data Function)

notebookutils.udf fornisce utilità progettate per l'integrazione del codice Notebook con le Funzioni di Dati Utente (UDF). Queste utilità consentono di accedere alle Funzioni da un elemento UDF all'interno della stessa area di lavoro o in aree di lavoro diverse. È quindi possibile richiamare funzioni all'interno di un elemento UDF in base alle esigenze.

Ecco una panoramica dei metodi disponibili:

# Get functions
myFunctions = notebookutils.udf.getFunctions('UDFItemName') # Get functions from UDF within the same workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId') # Get functions from UDF across different workspace

# Additional helper method to return all functions, their respective parameters, and types.
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)

# Invoke the function
myFunctions.functionName('value1', 'value2')
myFunctions.functionName(parameter1='value1', parameter2='value2'...) # Another way to invoke the function

Recuperare funzioni da una UDF

myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")

Richiamare una funzione

myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)

Visualizzare i dettagli per un elemento UDF

display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()

Visualizzare i dettagli della funzione per una UDF

display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()

Utilità delle credenziali

È possibile usare le utilità delle credenziali per ottenere i token di accesso e gestire segreti in Azure Key Vault.

Usare il comando seguente per ottenere una panoramica dei metodi disponibili:

notebookutils.credentials.help()

Prodotto:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Ottenere un token

getToken restituisce un token di Microsoft Entra per un determinato destinatario e nome (facoltativo). L'elenco seguente mostra le chiavi del pubblico attualmente disponibili:

  • Risorsa di archiviazione per il pubblico: "storage"
  • Risorsa di Power BI: "pbi"
  • Risorse di Azure Key Vault: "keyvault"
  • Risorsa del database Synapse RTA KQL: "kusto"

Usare il comando seguente per ottenere il token:

notebookutils.credentials.getToken('audience Key')

Ottenere un segreto usando le credenziali utente

getSecret restituisce un segreto di Azure Key Vault per un determinato endpoint di Azure Key Vault e un nome segreto usando le credenziali utente.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Montaggio e smontaggio di file

Fabric supporta gli scenari di montaggio seguenti nel pacchetto di utilità di Microsoft Spark. È possibile usare le API mount, unmount, getMountPath() e mounts() per collegare l'archiviazione remota (ADLS Gen2) a tutti i nodi di lavoro (nodo driver e nodi di lavoro). Dopo aver creato il punto di montaggio della memoria, utilizzare l'API dei file locali per accedere ai dati come se fossero archiviati nel file system locale.

Come configurare un account ADLS Gen2

L'esempio seguente illustra come montare Azure Data Lake Storage Gen2. Il montaggio di Blob Storage funziona in modo analogo.

In questo esempio si presuppone che sia presente un account Data Lake Storage Gen2 denominato storegen2, con un contenitore denominato mycontainer che si vuole montare su /test nella sessione Spark del notebook.

Screenshot che mostra dove selezionare un contenitore da montare.

Per montare il contenitore denominato mycontainer, notebookutils deve prima verificare se si dispone dell'autorizzazione per accedere al contenitore. Attualmente, Fabric supporta due metodi di autenticazione per l'operazione di montaggio trigger: accountKey e sastoken.

Montaggio tramite token di firma di accesso condiviso o chiave dell'account

NotebookUtils supporta il passaggio esplicito di una chiave dell'account o di un token di firma di accesso condiviso (SAS) come parametro per montare la destinazione.

Per motivi di sicurezza, è consigliabile archiviare le chiavi dell'account o i token di firma di accesso condiviso (SAS) in Azure Key Vault, come illustrato nello screenshot seguente. È quindi possibile recuperarli usando l'API notebookutils.credentials.getSecret. Per altre informazioni su Azure Key Vault, vedere Informazioni sulle chiavi dell'account di archiviazione gestito di Azure Key Vault.

Screenshot che mostra dove vengono archiviati i segreti in Azure Key Vault.

Codice di esempio per il metodo accountKey:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Codice di esempio per sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Parametri di montaggio:

  • fileCacheTimeout: i BLOB vengono memorizzati nella cache nella cartella temporanea locale per 120 secondi per impostazione predefinita. Durante questo periodo, blobfuse non controlla se il file è aggiornato o meno. È possibile impostare il parametro per modificare il timeout predefinito. Quando più client modificano contemporaneamente i file, per evitare incoerenze tra file locali e remoti, è consigliabile abbreviare il tempo della cache o persino modificarlo su 0 e ottenere sempre i file più recenti dal server.
  • timeout: il timeout dell'operazione di montaggio è di 120 secondi per impostazione predefinita. È possibile impostare il parametro per modificare il timeout predefinito. Quando ci sono troppi executor o si verifica un timeout del montaggio, è consigliabile aumentare il valore.

È possibile usare questi parametri come indicato di seguito:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Nota

Ai fini della sicurezza, è consigliabile evitare di incorporare le credenziali direttamente nel codice. Per proteggere ulteriormente le tue credenziali, eventuali segreti visualizzati negli output dei notebook vengono oscurati. Per ulteriori informazioni, vedere Redazione dei segreti.

Come configurare un lakehouse

Codice di esempio per il montaggio di una lakehouse in /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Accedere ai file nel punto di montaggio usando l'API notebookutils fs

Lo scopo principale dell'operazione di montaggio è consentire ai clienti di accedere ai dati archiviati in un account di archiviazione remoto con un'API del file system locale. È anche possibile accedere ai dati usando l'API notebookutils fs con un percorso montato come parametro. Questo formato di percorso è lievemente diverso.

Si supponga di aver montato il contenitore Data Lake Storage Gen2 mycontainer in /test usando l'API di montaggio. Quando si accede ai dati con un'API del file system locale, il formato del percorso è simile al seguente:

/synfs/notebook/{sessionId}/test/{filename}

Per accedere ai dati usando l'API notebookutils fs, è consigliabile usare getMountPath() per ottenere il percorso accurato:

path = notebookutils.fs.getMountPath("/test")
  • Elenco delle directory:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Leggere il contenuto del file:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Creare una directory:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Accedere ai file nel punto di montaggio tramite il percorso locale

È possibile leggere e scrivere facilmente i file nel punto di montaggio usando il file system standard. Di seguito è riportato un esempio in Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Come controllare i punti di montaggio esistenti

È possibile usare l'API notebookutils.fs.mounts() per controllare tutte le informazioni sui punti di montaggio esistenti:

notebookutils.fs.mounts()

Come smontare il punto di montaggio

Usare il codice seguente per smontare il punto di montaggio (/test in questo esempio):

notebookutils.fs.unmount("/test")

Limitazioni note

  • Il montaggio corrente è una configurazione a livello di processo; è consigliabile usare l'API mounts per verificare se esiste o meno un punto di montaggio.

  • Il meccanismo di smontaggio non viene applicato automaticamente. Al termine dell'esecuzione dell'applicazione, per smontare il punto di montaggio e rilasciare lo spazio su disco, è necessario chiamare in modo esplicito un'API di smontaggio nel codice. In caso contrario, il punto di montaggio esisterà ancora nel nodo al termine dell'esecuzione dell'applicazione.

  • Il montaggio di un account di archiviazione ADLS Gen1 non è supportato.

Utilità di Lakehouse

notebookutils.lakehouse fornisce utilità personalizzate per la gestione degli oggetti Lakehouse. Queste utilità consentono di creare, ottenere, aggiornare ed eliminare facilmente artefatti di Lakehouse.

Panoramica dei metodi

Ecco una panoramica dei metodi disponibili forniti da notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Esempi di utilizzo

Per usare questi metodi in modo efficace, considerare gli esempi di utilizzo seguenti:

Creazione di un lakehouse

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Ottenere un lakehouse

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Aggiornamento di un Lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Eliminazione di un Lakehouse

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Elencare i lakehouse in un'area di lavoro

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Elencazione di tutte le tabelle in un Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Avviare un'operazione di caricamento della tabella in un lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Informazioni aggiuntive

Per informazioni più dettagliate su ogni metodo e sui relativi parametri, utilizzare la funzione notebookutils.lakehouse.help("methodName").

Utilità di runtime

Visualizzare informazioni contestuali della sessione

Con notebookutils.runtime.context puoi ottenere le informazioni contestuali della sessione live corrente, tra cui il nome del notebook, il lakehouse predefinito, informazioni sull'area di lavoro, se si tratta di un'esecuzione della pipeline e così via.

notebookutils.runtime.context

Nella tabella seguente vengono descritte le proprietà.

Parametro Spiegazione
currentNotebookName Nome del notebook corrente
currentNotebookId ID univoco del notebook corrente
currentWorkspaceName Nome dell'area di lavoro corrente
currentWorkspaceId ID dell'area di lavoro corrente
defaultLakehouseName Nome visualizzato del lakehouse predefinito, se definito
defaultLakehouseId ID del lakehouse di default, se definito
defaultLakehouseWorkspaceName Nome dell'area di lavoro del lakehouse predefinito, se definito
defaultLakehouseWorkspaceId ID dell'area di lavoro del lakehouse predefinito, se definito
currentRunId In un'esecuzione di riferimento, l'ID dell'esecuzione corrente
parentRunId In un'esecuzione di riferimento con esecuzioni annidate, viene utilizzato l'ID dell'esecuzione padre
rootRunId In un'esecuzione di riferimento con esecuzioni annidate, questo è l'ID di esecuzione radice
isForPipeline Indica se l'operazione è per una pipeline
isReferenceRun Indica se l'esecuzione corrente è un'esecuzione di riferimento
referenceTreePath Struttura ad albero dei cicli di riferimento annidati, utilizzata solo per la gerarchia degli snapshot nella pagina di monitoraggio livello 2
rootNotebookId (Solo in esecuzione di riferimento) ID del notebook radice in un'esecuzione di riferimento.
rootNotebookName (Solo in un'esecuzione di riferimento) Il nome del notebook principale in un'esecuzione di riferimento.
rootWorkspaceId (Solo in esecuzione di riferimento) ID dell'area di lavoro del notebook radice in un'esecuzione di riferimento.
rootWorkspaceName (Solo in esecuzione di riferimento) Nome dell'area di lavoro del notebook radice in un'esecuzione di riferimento.
activityId ID del processo Livy per l'attività corrente
hcRepId ID REPL in modalità concorrenza elevata
clusterId Identità del cluster Synapse Spark
poolName Nome del pool di Spark attualmente utilizzato
environmentId ID dell'ambiente in cui è in esecuzione il compito
environmentWorkspaceId ID dell'area di lavoro dell'ambiente
userId ID utente dell'utente corrente
userName Nome utente dell'utente corrente

Gestione delle sessioni

Terminare una sessione interattiva

Invece di fare clic manualmente sul pulsante Arresta, a volte è più pratico arrestare una sessione interattiva chiamando un'API nel codice. Per questi casi, viene fornita un'API notebookutils.session.stop() per supportare l'arresto della sessione interattiva tramite codice, disponibile per Scala e PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API arresta la sessione interattiva corrente asincronicamente in background. Arresta anche la sessione Spark e rilascia le risorse occupate dalla sessione, in modo che siano disponibili per altre sessioni nello stesso pool.

Riavviare l'interprete Python

L'utilità notebookutils.session consente di riavviare l'interprete Python.

notebookutils.session.restartPython()

Nota

  • Nel caso di esecuzione del riferimento al notebook, restartPython() riavvia solo l'interprete Python del notebook corrente a cui si fa riferimento.
  • In rari casi, il comando potrebbe non riuscire a causa del meccanismo di reflection di Spark. Aggiungere un tentativo di ripetizione può attenuare il problema.

Problema noto

  • Quando si usa una versione del runtime successiva a 1.2 e si esegue notebookutils.help(), le API fabricClient, PBIClient per il momento non sono supportate, ma saranno disponibili successivamente. Inoltre, l'API Credenziali per il momento non è supportata nei notebook Scala.

  • Il notebook Python non supporta le API stop e restartPython quando si usa l'utilità notebookutils.session per la gestione delle sessioni.