Training
Module
Use Spark Notebooks in an Azure Synapse Pipeline - Training
This module describes how Apache Spark notebooks can be integrated into an Azure Synapse Analytics pipeline.
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Notebook Utilities (NotebookUtils) is a built-in package to help you easily perform common tasks in Fabric Notebook. You can use NotebookUtils to work with file systems, to get environment variables, to chain notebooks together, and to work with secrets. The NotebookUtils package is available in PySpark (Python) Scala, SparkR notebooks, and Fabric pipelines.
Note
notebookutils.fs provides utilities for working with various file systems, including Azure Data Lake Storage (ADLS) Gen2 and Azure Blob Storage. Make sure you configure access to Azure Data Lake Storage Gen2 and Azure Blob Storage appropriately.
Run the following commands for an overview of the available methods:
notebookutils.fs.help()
Output
notebookutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use notebookutils.fs.help("methodName") for more info about a method.
NotebookUtils works with the file system in the same way as Spark APIs. Take notebookutils.fs.mkdirs() and Fabric lakehouse usage for example:
Usage | Relative path from HDFS root | Absolute path for ABFS file system | Absolute path for local file system in driver node |
---|---|---|---|
Non-default lakehouse | Not supported | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
Default lakehouse | Directory under “Files” or “Tables”: notebookutils.fs.mkdirs("Files/<new_dir>") | notebookutils.fs.mkdirs("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") | notebookutils.fs.mkdirs("file:/<new_dir>") |
To list the content of a directory, use notebookutils.fs.ls('Your directory path'). For example:
notebookutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # based on ABFS file system
notebookutils.fs.ls("file:/tmp") # based on local file system of driver node
This method returns file properties including file name, file path, file size, and whether it's a directory and a file.
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
This method creates the given directory if it doesn't exist, and creates any necessary parent directories.
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
notebookutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
This method copies a file or directory, and supports copy activity across file systems.
notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
This method offers a more efficient approach to copying or moving files, particularly when dealing with large data volumes. For enhanced performance on Fabric, it is advisable to utilize fastcp
as a substitute for the traditional cp
method.
Note
notebookutils.fs.fastcp()
does not support copying files in OneLake across regions. In this case, you can use notebookutils.fs.cp()
instead.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
This method returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8.
notebookutils.fs.head('file path', maxBytes to read)
This method moves a file or directory, and supports moves across file systems.
notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
This method writes the given string out to a file, encoded in UTF-8.
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
This method appends the given string to a file, encoded in UTF-8.
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Note
notebookutils.fs.append()
and notebookutils.fs.put()
do not support concurrent writing to the same file due to lack of atomicity guarantees.notebookutils.fs.append
API in a for
loop to write to the same file, we recommend to add a sleep
statement around 0.5s~1s between the recurring writes. This is because the notebookutils.fs.append
API's internal flush
operation is asynchronous, so a short delay helps ensure data integrity.This method removes a file or directory.
notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Find more information about detailed usage in File mount and unmount.
Use the Notebook Utilities to run a notebook or exit a notebook with a value. Run the following command to get an overview of the available methods:
notebookutils.notebook.help()
Output:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.
[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.
Use notebookutils.notebook.help("methodName") for more info about a method.
Note
Notebook utilities aren't applicable for Apache Spark job definitions (SJD).
This method references a notebook and returns its exit value. You can run nesting function calls in a notebook interactively or in a pipeline. The notebook being referenced runs on the Spark pool of the notebook that calls this function.
notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
For example:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Fabric notebook also supports referencing notebooks across multiple workspaces by specifying the workspace ID.
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
You can open the snapshot link of the reference run in the cell output. The snapshot captures the code run results and allows you to easily debug a reference run.
Note
notebookutils.nbResPath
in the referenced notebook to make sure it points to the same folder as the interactive run.Important
This feature is in preview.
The method notebookutils.notebook.runMultiple()
allows you to run multiple notebooks in parallel or with a predefined topological structure. The API is using a multi-thread implementation mechanism within a spark session, which means the compute resources are shared by the reference notebook runs.
With notebookutils.notebook.runMultiple()
, you can:
Execute multiple notebooks simultaneously, without waiting for each one to finish.
Specify the dependencies and order of execution for your notebooks, using a simple JSON format.
Optimize the use of Spark compute resources and reduce the cost of your Fabric projects.
View the Snapshots of each notebook run record in the output, and debug/monitor your notebook tasks conveniently.
Get the exit value of each executive activity and use them in downstream tasks.
You can also try to run the notebookutils.notebook.help("runMultiple") to find the example and detailed usage.
Here's a simple example of running a list of notebooks in parallel using this method:
notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
The execution result from the root notebook is as follows:
The following is an example of running notebooks with topological structure using notebookutils.notebook.runMultiple()
. Use this method to easily orchestrate notebooks through a code experience.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
The execution result from the root notebook is as follows:
We also provide a method to check if the DAG is correctly defined.
notebookutils.notebook.validateDAG(DAG)
Note
runMultiple
calls or reducing the concurrency by adjusting the concurrency field in the DAG parameter.This method exits a notebook with a value. You can run nesting function calls in a notebook interactively or in a pipeline.
When you call an exit() function from a notebook interactively, the Fabric notebook throws an exception, skips running subsequent cells, and keeps the Spark session alive.
When you orchestrate a notebook in a pipeline that calls an exit() function, the notebook activity returns with an exit value, completes the pipeline run, and stops the Spark session.
When you call an exit() function in a notebook that is being referenced, Fabric Spark will stop the further execution of the referenced notebook, and continue to run the next cells in the main notebook that calls the run() function. For example: Notebook1 has three cells and calls an exit() function in the second cell. Notebook2 has five cells and calls run(notebook1) in the third cell. When you run Notebook2, Notebook1 stops at the second cell when hitting the exit() function. Notebook2 continues to run its fourth cell and fifth cell.
notebookutils.notebook.exit("value string")
Note
The exit() function will overwrite the current cell output, to avoid losing the output of other code statements, please call notebookutils.notebook.exit()
in a separate cell.
For example:
Sample1 notebook with following two cells:
Cell 1 defines an input parameter with default value set to 10.
Cell 2 exits the notebook with input as exit value.
You can run the Sample1 in another notebook with default values:
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
Output:
Notebook is executed successfully with exit value 10
You can run the Sample1 in another notebook and set the input value as 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Output:
Notebook is executed successfully with exit value 20
notebookutils.notebook
provides specialized utilities for managing Notebook items programmatically. These APIs can help you create, get, update, and delete Notebook items easily.
To utilize these methods effectively, consider the following usage examples:
with open("/path/to/notebook.ipynb", "r") as f:
content = f.read()
artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")
artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")
updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name", "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")
is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")
artifacts_list = notebookutils.notebook.list("optional_workspace_id")
You can use the Credentials Utilities to get access tokens and manage secrets in an Azure Key Vault.
Run the following command to get an overview of the available methods:
notebookutils.credentials.help()
Output:
Help on module notebookutils.credentials in notebookutils:
NAME
notebookutils.credentials - Utility for credentials operations in Fabric
FUNCTIONS
getSecret(akvName, secret) -> str
Gets a secret from the given Azure Key Vault.
:param akvName: The name of the Azure Key Vault.
:param secret: The name of the secret.
:return: The secret value.
getToken(audience) -> str
Gets a token for the given audience.
:param audience: The audience for the token.
:return: The token.
help(method_name=None)
Provides help for the notebookutils.credentials module or the specified method.
Examples:
notebookutils.credentials.help()
notebookutils.credentials.help("getToken")
:param method_name: The name of the method to get help with.
DATA
creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...
FILE
/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py
getToken returns a Microsoft Entra token for a given audience and name (optional). The following list shows the currently available audience keys:
Run the following command to get the token:
notebookutils.credentials.getToken('audience Key')
getSecret returns an Azure Key Vault secret for a given Azure Key Vault endpoint and secret name using user credentials.
notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
Fabric supports the following mount scenarios in the Microsoft Spark Utilities package. You can use the mount, unmount, getMountPath(), and mounts() APIs to attach remote storage (ADLS Gen2) to all working nodes (driver node and worker nodes). After the storage mount point is in place, use the local file API to access data as if it's stored in the local file system.
The following example illustrates how to mount Azure Data Lake Storage Gen2. Mounting Blob Storage works similarly.
This example assumes that you have one Data Lake Storage Gen2 account named storegen2, and the account has one container named mycontainer that you want to mount to /test into your notebook Spark session.
To mount the container called mycontainer, notebookutils first needs to check whether you have the permission to access the container. Currently, Fabric supports two authentication methods for the trigger mount operation: accountKey and sastoken.
NotebookUtils supports explicitly passing an account key or Shared access signature (SAS) token as a parameter to mount the target.
For security reasons, we recommend that you store account keys or SAS tokens in Azure Key Vault (as the following screenshot shows). You can then retrieve them by using the notebookutils.credentials.getSecret API. For more information about Azure Key Vault, see About Azure Key Vault managed storage account keys.
Sample code for the accountKey method:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
Sample code for sastoken:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
Mount parameters:
You can use these parameters like this:
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Note
For security purposes, it is advised to avoid embedding credentials directly in code. To further safeguard your credentials, any secrets displayed in notebook outputs will be redacted. For more information, see Secret redaction.
Sample code for mounting a lakehouse to /<mount_name>:
notebookutils.fs.mount(
"abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse",
"/<mount_name>"
)
The main purpose of the mount operation is to let customers access the data stored in a remote storage account with a local file system API. You can also access the data by using the notebookutils fs API with a mounted path as a parameter. This path format is a little different.
Assume that you mounted the Data Lake Storage Gen2 container mycontainer to /test by using the mount API. When you access the data with a local file system API, the path format is like this:
/synfs/notebook/{sessionId}/test/{filename}
When you want to access the data by using the notebookutils fs API, we recommend using getMountPath() to get the accurate path:
path = notebookutils.fs.getMountPath("/test")
List directories:
notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
Read file content:
notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
Create a directory:
notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
You can easily read and write the files in mount point using the standard file system. Here's a Python example:
#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
You can use notebookutils.fs.mounts() API to check all existing mount point info:
notebookutils.fs.mounts()
Use the following code to unmount your mount point (/test in this example):
notebookutils.fs.unmount("/test")
The current mount is a job level configuration; we recommend you use the mounts API to check if a mount point exists or not available.
The unmount mechanism is not automatically applied. When the application run finishes, to unmount the mount point and release the disk space, you need to explicitly call an unmount API in your code. Otherwise, the mount point will still exist in the node after the application run finishes.
Mounting an ADLS Gen1 storage account is not supported.
notebookutils.lakehouse
provides utilities specifically tailored for managing Lakehouse items. These utilities empower you to create, get, update, and delete Lakehouse artifacts effortlessly.
Below is an overview of the available methods provided by notebookutils.lakehouse
:
# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]
# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table]
# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table]
To utilize these methods effectively, consider the following usage examples:
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")
updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")
artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")
artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")
notebookutils.lakehouse.loadTable(
{
"relativePath": "Files/myFile.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}, "table_name", "artifact_name", "optional_workspace_id")
For more detailed information about each method and its parameters, utilize the notebookutils.lakehouse.help("methodName")
function.
With notebookutils.runtime.context
you can get the context information of the current live session, including the notebook name, default lakehouse, workspace info, if it's a pipeline run, etc.
notebookutils.runtime.context
When using runtime version above 1.2 and run notebookutils.help()
, the listed fabricClient, PBIClient APIs are not supported for now, will be available in the further. Additionally, the Credentials API isn't supported in Scala notebooks for now.
Training
Module
Use Spark Notebooks in an Azure Synapse Pipeline - Training
This module describes how Apache Spark notebooks can be integrated into an Azure Synapse Analytics pipeline.