Notebook 公用程式 (NotebookUtils) 是內建套件,可協助您輕鬆地在網狀架構筆記本中執行一般工作。 您可以使用 NotebookUtils 來處理文件系統、取得環境變數、將筆記本鏈結在一起,以及使用秘密。 NotebookUtils 套件可在 PySpark (Python) Scala、SparkR Notebook 和 Fabric 管線中使用。
Note
- MsSparkUtils 已正式重新命名為 NotebookUtils。 現有程式碼保持 回溯相容 ,不會導致任何重大變更。 強烈 建議 升級到 notebookutils,以確保持續支援和存取新功能。 mssparkutils 命名空間未來將會淘汰。
- NotebookUtils 的設計目的是與 Spark 3.4(執行階段 v1.2)及更高版本 兼容。 未來,所有新功能和更新將僅支援 notebookutils 命名空間。
檔案系統公用程式
notebookutils.fs 提供公用程式來使用各種檔案系統,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 儲存體。 請確保您已適當地設定對 Azure Data Lake Storage Gen2 和 Azure Blob 儲存體的存取權限。
執行下列命令以取得可用方法的概觀:
notebookutils.fs.help()
Output
notebookutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point
Use notebookutils.fs.help("methodName") for more info about a method.
NotebookUtils 會以與 Spark API 相同的方式與文件系統搭配運作。 以 notebookutils.fs.mkdirs() 和 Fabric Lakehouse 使用方式為例:
| Usage | HDFS 根目錄的相對路徑 | ABFS 檔案系統的絕對路徑 | 驅動程式節點中本機檔系統的絕對路徑 |
|---|---|---|---|
| 非預設 Lakehouse | 不支援 | notebookutils.fs.mkdirs(“abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>”) | notebookutils.fs.mkdirs(“file:/<new_dir>”) |
| 預設 Lakehouse | 'Files' 或 'Tables' 下的目錄:notebookutils.fs.mkdirs(“Files/<new_dir>”) | notebookutils.fs.mkdirs(“abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>”) | notebookutils.fs.mkdirs(“file:/<new_dir>”) |
預設的 Lakehouse 中,檔案路徑會掛載在您的 Notebook 上,預設檔案快取逾時為 120 秒。 這表示檔案會在 Notebook 的本地暫存資料夾中快取 120 秒,即使檔案已從 Lakehouse 移除也一樣。 如果您想要變更逾時規則,您可以卸載預設的 Lakehouse 檔案路徑,並使用不同的 fileCacheTimeout 值再次掛接它們。
針對非預設 Lakehouse 設定,您可以在掛接 Lakehouse 路徑期間設定適當的 fileCacheTimeout 參數。 將逾時設定為 0 可確保從 Lakehouse 伺服器擷取最新的檔案。
清單檔案
若要列出目錄的內容,請使用 notebookutils.fs.ls(「您的目錄路徑」)。 例如:
notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>") # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp") # The full path of the local file system of driver node
使用相對路徑時,notebookutils.fs.ls() API 的行為會有所不同,視筆記本類型而定。
Spark 筆記本:相對路徑是相對於預設的 Lakehouse ABFSS 路徑。 例如,
notebookutils.fs.ls("Files")指向預設 Lakehouse 中的Files目錄。例如:
notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")Python 筆記本中:相對路徑是相對於本地檔案系統的工作目錄,預設為 /home/trusted-service-user/work。 因此,您應該使用完整路徑,而不是相對路徑
notebookutils.fs.ls("/lakehouse/default/Files")來存取預設 Lakehouse 中的Files目錄。例如:
notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
檢視檔案屬性
此方法會傳回檔案屬性,包括檔案名稱、檔案路徑、檔案大小,以及其是否為目錄和檔案。
files = notebookutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size)
建立新的目錄
如果指定的目錄不存在,這個方法會建立指定的目錄,並建立任何必要的父目錄。
notebookutils.fs.mkdirs('new directory name')
notebookutils.fs.mkdirs("Files/<new_dir>") # works with the default lakehouse files using relative path
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>") # based on ABFS file system
notebookutils.fs.ls("file:/<new_dir>") # based on local file system of driver node
複製檔案
此方法會複製檔案或目錄,並支援跨檔案系統複製活動。 我們設定 recurse=True 以遞歸方式複製所有檔案和目錄。
notebookutils.fs.cp('source file or directory', 'destination file or directory', recurse=True)
Note
由於 OneLake 快捷方式的
高效能複製檔案
此方法提供更有效率的方法來複製或移動檔案,特別是在處理大量資料時。 為了提升 Fabric 的效能,建議使用 fastcp 作為傳統 cp 方法的替代方法。
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', recurse=True)
Considerations:
-
notebookutils.fs.fastcp()不支援跨區域複製 OneLake 中的檔案。 在這裡情況下,您可以改用notebookutils.fs.cp()。 - 由於 OneLake 快捷方式的
限制,當您需要使用 從 S3/GCS 類型快捷方式複製數據時,建議使用掛載的路徑,而不是 abfss 路徑。
預覽檔案內容
這個方法會傳回指定檔案的第一個 'maxBytes' 位元組,做為以 UTF-8 編碼的字串。
notebookutils.fs.head('file path', maxBytes to read)
移動檔案
此方法會移動檔案或目錄,並支援跨檔案系統移動。
notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.
寫入檔案
這個方法會將指定的字串寫出至以 UTF-8 編碼的檔案。
notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
將內容附加至檔案
這個方法會將指定的字串附加至以 UTF-8 編碼的檔案。
notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Considerations:
-
notebookutils.fs.append()和notebookutils.fs.put()因缺乏原子性保證,無法支援同時寫入相同的檔案。 - 在
notebookutils.fs.append迴圈中使用forAPI 來寫入相同的檔案時,建議您在週期性寫入之間新增大約 0.5 秒 ~ 1 秒的sleep語句。 這項建議是因為notebookutils.fs.appendAPI 的內部flush作業是異步的,因此短暫的延遲有助於確保數據完整性。
刪除檔案或目錄
此方法會移除檔案或目錄。 我們設定 recurse=True 遞歸刪除所有檔案和目錄。
notebookutils.fs.rm('file path', recurse=True)
掛載/卸載目錄
在檔案裝載和卸載中尋找詳細使用方式的詳細資訊。
筆記本實用程式
使用 Notebook 效用工具來執行筆記本,或以某個值結束筆記本。 執行下列命令以取得可用方法的概觀:
notebookutils.notebook.help()
Output:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> This method check if the DAG is correctly defined.
Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.
Use notebookutils.notebook.help("methodName") for more info about a method.
Note
筆記本公用程式不適用於 Apache Spark 工作定義 (SJD)。
參考筆記本
此方法會參考筆記本,並傳回其結束值。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。 被參考的記事本會在呼叫此函式的記事本的 Spark 集區上執行。
notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)
例如:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Fabric 筆記本也支援指定 工作區識別碼,跨多個工作區參考筆記本。
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
您可以在儲存格輸出中開啟參考執行過程的快照連結。 快照會擷取程式碼執行的結果,並可讓您輕鬆地偵錯參考執行。
Considerations:
- 跨工作區參考筆記本支援執行階段 1.2 版和更高版本。
- 如果您使用 [筆記本資源] 底下的檔案,
notebookutils.nbResPath請在參考的筆記本中使用,以確保它指向與互動式執行相同的資料夾。 - 參考執行只允許子筆記本在子筆記本使用與父系相同的湖庫、繼承父系的湖庫,或兩者都未定義時執行。 如果子系指定與父筆記本不同的 Lakehouse,則會封鎖執行。 若要略過此檢查,請將 設定
useRootDefaultLakehouse: True。
參考指南以平行方式運行多個筆記本
方法 notebookutils.notebook.runMultiple() 可讓您平行執行多個筆記本,或使用預先定義的拓撲結構。 API 在 Spark 會話中使用多線程實作機制,這表示參考筆記本運行會共用計算資源。
使用 notebookutils.notebook.runMultiple(),您可以:
同時執行多個筆記本,無需等待各自完成。
使用簡單的 JSON 格式,來指定您的筆記本的相依性與執行順序。
最佳化 Spark 計算資源的使用,並降低 Fabric 專案的成本。
檢視輸出中每個筆記本執行記錄的快照,並方便地偵錯和監控您的筆記本任務。
取得每個執行活動的結束值,並在下游工作中使用這些值。
您也可以嘗試執行 notebookutils.notebook.help("runMultiple") 來尋找範例和詳細的使用方式。
以下是使用此方法平行執行筆記本清單的簡單範例:
notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
根筆記本的執行結果如下所示:
以下是使用 notebookutils.notebook.runMultiple()執行具有拓撲結構的筆記本範例。 使用此方法,透過程式碼界面輕鬆地管理筆記本。
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
根筆記本的執行結果如下所示:
我們也提供方法來檢查 DAG 是否已正確定義。
notebookutils.notebook.validateDAG(DAG)
Considerations:
- 多個筆記本的平行運行程度受限於 Spark 工作階段的總可用計算資源。
- Spark 筆記本的預設並行筆記本數目為 50 ,而 Python 筆記本的預設數目為 25 。 您可以自訂此值,但過多的平行處理原則可能會因為高運算資源使用量而導致穩定性和效能問題。 如果發生問題,請考慮將筆記本分成多個
runMultiple呼叫,或透過調整 DAG 參數中的 並行 欄位來減少並行。 - 整個 DAG 的預設逾時為 12 小時,而子筆記本中每個儲存格的預設逾時為 90 秒。 您可以在 DAG 參數中設定 timeoutInSeconds 和 timeoutPerCellInSeconds 欄位來變更逾時。
離開筆記本
這個方法會以一個值關閉筆記本。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。
當您以互動方式從筆記本呼叫 exit() 函式時,Fabric 筆記本會擲回例外狀況、略過執行後續儲存格,並讓 Spark 工作階段保持運作狀態。
當您在管線中管理呼叫 exit() 函式的程式筆記本時,記事本操作會傳回一個結束值。 這將完成整個管線運行,並停止 Spark 會話。
當您在參考的筆記本中呼叫 exit() 函式時,Fabric Spark 會停止參考筆記本的進一步執行,並繼續執行呼叫 run() 函式的主要筆記本中的下一個儲存格。 例如:Notebook1 有三個儲存格,並在第二個儲存格中呼叫 exit() 函數。 Notebook2 有五個儲存格,並在第三個儲存格中呼叫 run(notebook1)。 當您運行 Notebook2 時,Notebook1 在點擊 exit() 函數時停止在第二個單元格處。 Notebook2 繼續執行第四個儲存格和第五個儲存格。
notebookutils.notebook.exit("value string")
Note
exit() 函數會覆蓋當前單元格輸出。 若要避免遺失其他程式代碼語句的輸出,請在個別單元格中呼叫 notebookutils.notebook.exit()。
例如:
Sample1 筆記本,其中包含下列兩個儲存格:
儲存格 1 定義預設值設定為 10 的 輸入 參數。
儲存格 2 會以 輸入 作為結束值結束筆記本。
您可以在另一個筆記本中執行 Sample1 ,並具有預設值:
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
Output:
Notebook is executed successfully with exit value 10
您可以在另一個筆記本中執行 Sample1 ,並將 輸入 值設定為 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Output:
Notebook is executed successfully with exit value 20
管理筆記本資產
notebookutils.notebook 提供特殊公用程式,以程序設計方式管理 Notebook 項目。 這些 API 可協助您輕鬆建立、取得、更新及刪除筆記本項目。
若要有效地使用這些方法,請考慮下列使用範例:
建立筆記本
with open("/path/to/notebook.ipynb", "r") as f:
content = f.read()
artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")
取得筆記本的內容
artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")
更新筆記本
updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name", "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")
刪除筆記本
is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")
列出工作區中的筆記本
artifacts_list = notebookutils.notebook.list("optional_workspace_id")
使用者資料功能 (UDF) 公用程式
notebookutils.udf 提供專為整合 Notebook 程式碼與使用者資料函式 (UDF) 設計的公用程式。 這些公用程式可讓您存取相同工作區內 UDF 項目的函式,或跨不同工作區進行存取。 之後您便可以視需求叫用 UDF 項目內的函式。
以下是如何使用 UDF 公用程式的一些範例:
# Get functions from a UDF item
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
# Or from another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
# Display function and item details
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)
# Invoke a function
myFunctions.functionName('value1', 'value2')
# Or with named parameters
myFunctions.functionName(parameter1='value1', parameter2='value2')
從 UDF 擷取函式
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
叫用函式
myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)
顯示 UDF 項目的詳細資料
display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()
顯示 UDF 函數的功能詳細資料
display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()
認證公用程式
您可以使用認證公用程式來取得存取令牌,以及管理 Azure 金鑰保存庫 中的秘密。
執行下列命令以取得可用方法的概觀:
notebookutils.credentials.help()
Output:
Help on module notebookutils.credentials in notebookutils:
NAME
notebookutils.credentials - Utility for credentials operations in Fabric
FUNCTIONS
getSecret(akvName, secret) -> str
Gets a secret from the given Azure Key Vault.
:param akvName: The name of the Azure Key Vault.
:param secret: The name of the secret.
:return: The secret value.
getToken(audience) -> str
Gets a token for the given audience.
:param audience: The audience for the token.
:return: The token.
help(method_name=None)
Provides help for the notebookutils.credentials module or the specified method.
Examples:
notebookutils.credentials.help()
notebookutils.credentials.help("getToken")
:param method_name: The name of the method to get help with.
DATA
creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...
FILE
/home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py
取得權杖
getToken 會針對指定的對象和可選名稱傳回 Microsoft Entra 憑證。 下列清單顯示目前可用的受眾密鑰:
- 儲存觀眾資源:“儲存”
- Power BI 資源:“pbi”
- Azure Key Vault 資源: "keyvault"
- Synapse RTA KQL DB 資源:「kusto」
執行下列命令以取得權杖:
notebookutils.credentials.getToken('audience Key')
Considerations:
以「pbi」作為對象的權杖範圍可能會隨著時間而變更。 目前支援下列範圍。
當您呼叫 notebookutils.credentials.getToken(“pbi”)) 時,如果筆記本在服務主體下執行,傳回的權杖範圍有限。 權杖沒有完整的 Fabric 服務範圍。 如果筆記型電腦是在使用者身分識別下執行,令牌仍具有完整的 Fabric 服務範圍,但這可能會隨著安全性改善而變更。 若要確保權杖具有完整的 Fabric 服務範圍,請使用 MSAL 驗證,而不是 notebookutils.credentials.getToken API。 如需詳細資訊,請參閱 使用 Microsoft Entra ID 進行驗證。
以下是權杖在呼叫 notebookutils.credentials.getToken 時所具有的範圍清單,其中包含服務主體身分識別下的物件金鑰 pbi :
- Lakehouse.ReadWrite.All
- MLExperiment.ReadWrite.All
- MLModel.ReadWrite.All
- Notebook.ReadWrite.All
- SparkJobDefinition.ReadWrite.All
- Workspace.ReadWrite.All
- Dataset.ReadWrite.All
獲取秘密
getSecret 會使用使用者認證,針對指定的 Azure Key Vault 端點和祕密名稱,傳回 Azure Key Vault 祕密。
notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')
檔案裝載和卸載
Fabric 支援Microsoft Spark 公用程式套件中的下列裝載案例。 您可以使用 掛載、 卸載、 getMountPath()和 mounts() API將遠端儲存設備(ADLS Gen2)連接至所有工作節點(驅動程式節點和背景工作節點)。 儲存載入點就緒之後,請使用本機檔案 API 來存取資料,就好像儲存在本機文件系統中一樣。
如何裝載 ADLS Gen2 帳戶
下列範例說明如何裝載 Azure Data Lake Storage Gen2。 掛載 Blob Storage 和 Azure 檔案分享的運作方式類似。
此範例假設您有一個名為 storegen2 的 Data Lake Storage Gen2 帳戶,其中有一個名為 mycontainer 的容器,您想要在筆記本 Spark 工作階段中掛接至 /test 。
要掛載名為 mycontainer 的容器, notebookutils 首先需要檢查您是否具有存取容器的權限。 目前,Fabric 支援觸發掛接作業的兩種驗證方法: accountKey 和 sastoken。
透過共用存取簽章令牌或帳戶密鑰掛載
NotebookUtils 支援明確傳遞帳戶密鑰或 共用存取簽章 (SAS) 令牌作為掛接目標的參數。
基於安全性考慮,我們建議您將帳戶密鑰或 SAS 令牌儲存在 Azure 金鑰保存庫 中(如下列螢幕擷取畫面所示)。 然後,您可以使用 notebookutils.credentials.getSecret API 來擷取它們。 如需 Azure Key Vault 的更多資訊,請參閱關於 Azure Key Vault 受控儲存體帳戶金鑰。
accountKey 方法的範例程式碼:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"accountKey":accountKey}
)
sastoken 的範例程式碼:
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"sasToken":sasToken}
)
掛載參數:
- fileCacheTimeout:Blobs 預設會在本機臨時資料夾中快取 120 秒。 在此期間,blobfuse 不會檢查檔案是否為最新狀態。 參數可以設定為變更預設逾時時間。 當多個客戶端同時修改檔案時,若要避免本機和遠端檔案之間的不一致,建議您縮短快取時間,甚至將其變更為 0,並一律從伺服器取得最新的檔案。
- timeout:掛接作業逾時預設為120秒。 參數可以設定為變更預設逾時時間。 當執行器過多或掛載逾時時,建議提高參數值。
您可以使用這些參數,如下所示:
notebookutils.fs.mount(
"abfss://mycontainer@<accountname>.dfs.core.windows.net",
"/test",
{"fileCacheTimeout": 120, "timeout": 120}
)
Note
基於安全性考慮,建議您避免將認證直接內嵌在程序代碼中。 為了進一步保護您的認證,任何在筆記本輸出中顯示的機密資訊都會被遮蔽。 如需詳細資訊,請參閱 秘密密文。
如何掛載湖倉
將 lakehouse 掛載至 /<mount_name> 的範例程式代碼:
notebookutils.fs.mount(
"abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse",
"/<mount_name>"
)
使用 notebookutils fs API 存取掛接點下的檔案
掛接作業的主要目的是讓客戶使用本機檔系統 API 存取儲存在遠端儲存體帳戶中的資料。 您也可以使用 notebookutils fs API 來存取資料,並將掛載的路徑作為參數。 此路徑格式稍有不同。
假設您使用掛接 API 將 Data Lake Storage Gen2 容器 mycontainer 掛接至 /test 。 當您使用本機檔案系統 API 存取資料時,路徑格式如下所示:
/synfs/notebook/{sessionId}/test/{filename}
當您想要使用 notebookutils fs API 存取資料時,建議您使用 getMountPath() 來取得準確的路徑:
path = notebookutils.fs.getMountPath("/test")
列出目錄:
notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")讀取檔案內容:
notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")建立目錄:
notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
透過本機路徑存取裝載點下的檔案
您可以使用標準檔案系統,輕鬆地在裝載點中讀取和寫入檔案。 以下是 Python 範例:
#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
print(f.write("dummy data"))
如何檢查現有的裝入點
您可以使用 notebookutils.fs.mounts() API 來檢查所有現有的掛接點資訊:
notebookutils.fs.mounts()
如何卸除裝載點
使用下列程式碼卸載掛接點 (在此範例中為 /test ):
notebookutils.fs.unmount("/test")
已知限制
目前掛載是作業層級配置;建議您使用 掛載 API 來檢查掛載點是否存在或不可用。
還未自動套用卸載機制。 當應用程式執行完成時,若要卸載裝入點並釋放磁碟空間,您必須在程式碼中明確呼叫卸載 API。 否則,在應用程式執行完成之後,裝載點仍會存在於節點中。
不支援掛接 ADLS Gen1 儲存體帳戶。
湖屋公用程式
notebookutils.lakehouse 提供專為管理 Lakehouse 專案量身打造的公用程式。 這些公用程式可讓您輕鬆建立、取得、更新及刪除 Lakehouse 成品。
方法概觀
以下是 notebookutils.lakehouse所提供的可用方法概觀:
# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact
# Create Lakehouse with Schema Support
create(name: String, description: String = "", definition: {"enableSchemas": True}): Artifact
# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact
# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact
# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact
# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean
# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]
# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table]
# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table]
使用範例
若要有效地使用這些方法,請考慮下列使用範例:
建立 Lakehouse
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
# Create Lakehouse with Schema Support
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", {"enableSchemas": True})
取得湖畔別墅
artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")
更新資料湖庫
updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")
刪除 Lakehouse
is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")
列出工作區中的湖景屋
artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")
列出 Lakehouse 中的所有資料表
artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")
在 Lakehouse 平台中開始載入資料表的操作
notebookutils.lakehouse.loadTable(
{
"relativePath": "Files/myFile.csv",
"pathType": "File",
"mode": "Overwrite",
"recursive": False,
"formatOptions": {
"format": "Csv",
"header": True,
"delimiter": ","
}
}, "table_name", "artifact_name", "optional_workspace_id")
其他資訊
如需每個方法及其參數的詳細資訊,請使用函式 notebookutils.lakehouse.help("methodName")。
執行階段公用程式
顯示會話內容資訊
透過 notebookutils.runtime.context,您可以獲取當前即時工作階段的上下文信息,包括筆記本名稱、預設數據倉庫、工作區信息、是否為管道運行等。
notebookutils.runtime.context
下表概述屬性。
| Parameter | Explanation |
|---|---|
currentNotebookName |
目前筆記本的名稱 |
currentNotebookId |
目前筆記本的唯一標識碼 |
currentWorkspaceName |
目前工作區的名稱 |
currentWorkspaceId |
目前工作區的標識碼 |
defaultLakehouseName |
如果已定義,則為預設 Lakehouse(湖庫)的顯示名稱 |
defaultLakehouseId |
如果已定義,則為預設 Lakehouse 的識別碼 |
defaultLakehouseWorkspaceName |
如果已定義,則為預設 Lakehouse 的工作區名稱 |
defaultLakehouseWorkspaceId |
如果已定義,則為預設數據湖倉的工作區識別碼 |
currentRunId |
在參考運行中,目前運行的識別碼 |
parentRunId |
在具有巢狀執行的參考執行中,此識別碼是父執行識別碼 |
rootRunId |
在具有巢狀執行的參考執行中,此識別碼是根執行識別碼 |
isForPipeline |
執行是否適用於管線 |
isReferenceRun |
目前執行是否為參考執行 |
referenceTreePath |
巢狀參考執行的樹狀結構僅用於監視 L2 頁面中的快照階層 |
rootNotebookId |
(僅在參考執行中)根筆記本的識別碼。 |
rootNotebookName |
(僅在參考執行中)參考執行中的根筆記本名稱。 |
rootWorkspaceId |
(僅在參考執行中)參考執行中根筆記本的工作區標識碼。 |
rootWorkspaceName |
僅在參考運行中使用的根筆記本工作區名稱。 |
activityId |
目前活動的 Livy 作業識別碼 |
hcRepId |
高併行模式中的 REPL 識別碼 |
clusterId |
Synapse Spark 叢集的身分識別 |
poolName |
正在使用的Spark集區名稱 |
environmentId |
作業執行所在的環境標識碼 |
environmentWorkspaceId |
環境的工作區標識碼 |
userId |
目前使用者的使用者識別碼 |
userName |
目前使用者的用戶名稱 |
會話管理
停止互動式會話
有時候,藉由在程式代碼中呼叫 API 來停止互動式會話,而不是手動按兩下 [停止] 按鈕。 在這種情況下,我們提供 API notebookutils.session.stop() 來支援透過程式代碼停止互動式會話,其適用於 Scala 和 PySpark。
notebookutils.session.stop()
notebookutils.session.stop() API 會在背景中以異步方式停止目前的互動式會話。 它也會停止 Spark 會話,並釋放會話佔用的資源,讓集區中其他會話可以使用。
重新啟動 Python 解釋器
notebookutils.session 公用程式提供重新啟動 Python 解釋器的方式。
notebookutils.session.restartPython()
Considerations:
- 在筆記本參考執行案例中,
restartPython()只會重新啟動目前所參考筆記本的 Python 解釋器。 - 在罕見的情況下,命令可能會因為Spark反映機制而失敗,新增重試可以減輕問題。
變數函式庫工具
Note
Notebooks 中的「變數庫工具」處於預覽狀態。
變數庫可讓您避免在筆記本程式碼中硬式編碼值。 您可以更新連結庫中的值,而不是修改程序代碼。 筆記本會參考變數庫來擷取這些值。 此方法可藉由使用集中管理的連結庫,簡化跨小組和專案重複使用程序代碼。
執行下列命令以取得可用方法的概觀:
notebookutils.variableLibrary.help()
Output
[Preview] notebookutils.variableLibrary is a utility to Variable Library.
Below is overview about the available methods:
get(variableReference: String): String
-> Run the variable value with type.
getLibrary(variableLibraryName: String): VariableLibrary
-> Get the variable library.
Use notebookutils.variableLibrary.help("methodName") for more info about a method.
在變數連結庫中定義變數
先定義變數,再使用 notebookutils.variableLibrary。
從 Notebook 擷取變數連結庫
samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")
samplevl.test_int
samplevl.test_str
val samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")
samplevl.test_int
samplevl.test_str
samplevl <- notebookutils.variableLibrary.getLibrary("sampleVL")
samplevl.test_int
samplevl.test_str
動態使用變數的範例。
samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")
file_path = f"abfss://{samplevl.Workspace_name}@onelake.dfs.fabric.microsoft.com/{samplevl.Lakehouse_name}.Lakehouse/Files/<FileName>.csv"
df = spark.read.format("csv").option("header","true").load(file_path)
display(df)
依參考存取單一變數
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
Note
-
notebookutils.variableLibraryAPI 僅支援存取相同工作區內的變數連結庫。 - 在參考執行期間,子筆記本不支援跨工作區擷取變數庫。
- 筆記本程式代碼會參考變數庫中啟用值集中定義的變數。
已知問題
使用1.2以上的執行階段版本並執行
notebookutils.help()時,列出的 fabricClient、 PBIClient API目前不支援,將在稍後提供。 此外,Scala 筆記本目前不支援 認證 API。Python 筆記本不支援 使用 notebookutils.session 公用程式進行工作階段管理時,停止、 重新啟動Python API。
目前,可變程式庫公用程式不支援 SPN。