共用方式為


Microsoft Spark 公用程序 (MSSparkUtils) for Fabric

Microsoft Spark 公用程式 (MSSparkUtils) 是一個內建套件,可協助您輕鬆地執行一般工作。 您可以使用 MSSparkUtils 來處理文件系統、取得環境變數、將筆記本鏈結在一起,以及使用秘密。 MSSparkUtils 套件可在 PySpark (Python) Scala、SparkR 筆記本和網狀架構管線中使用。

檔系統公用程式

mssparkutils.fs 提供使用各種文件系統的公用程式,包括 Azure Data Lake Storage (ADLS) Gen2 和 Azure Blob 儲存體。 請確定您已設定 Azure Data Lake Storage Gen2 的存取權,並適當地 Azure Blob 儲存體

執行下列命令以取得可用方法的概觀:

from notebookutils import mssparkutils
mssparkutils.fs.help()

輸出

mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(from: String, to: String, recurse: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use mssparkutils.fs.help("methodName") for more info about a method.

MSSparkUtils 會以與 Spark API 相同的方式與文件系統搭配使用。 以 mssparkuitls.fs.mkdirs() 和 Fabric Lakehouse 使用量為例:

使用方式 HDFS 根目錄的相對路徑 ABFS 檔案系統的絕對路徑 驅動程式節點中本機檔系統的絕對路徑
非預設湖屋 不支援 mssparkutils.fs.mkdirs(“abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>”) mssparkutils.fs.mkdirs(“file:/<new_dir>”)
默認湖屋 “Files” 或 “Tables” 下的目錄: mssparkutils.fs.mkdirs(“Files/<new_dir>”) mssparkutils.fs.mkdirs(“abfss://< container_name>@<storage_account_name.dfs.core.windows.net/<> new_dir>”) mssparkutils.fs.mkdirs(“file:/<new_dir>”)

列出檔案

若要列出目錄的內容,請使用 mssparkutils.fs.ls(「您的目錄路徑」)。 例如:

mssparkutils.fs.ls("Files/tmp") # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/tmp")  # based on local file system of driver node 

檢視檔案屬性

此方法會傳回檔案屬性,包括檔名、檔案路徑、檔案大小,以及其是否為目錄和檔案。

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

建立新目錄

如果指定的目錄不存在,這個方法會建立指定的目錄,並建立任何必要的父目錄。

mssparkutils.fs.mkdirs('new directory name')  
mssparkutils.fs. mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
mssparkutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
mssparkutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

複製檔案

此方法會複製檔案或目錄,並支援跨檔案系統複製活動。

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

高效能複製檔案

此方法提供更快速的方式複製或移動檔案,特別是大量數據。

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

預覽檔案內容

這個方法會傳回指定檔案的第一個 『maxBytes』 位元組,做為以 UTF-8 編碼的 String。

mssparkutils.fs.head('file path', maxBytes to read)

移動檔案

此方法會移動檔案或目錄,並支援跨檔案系統移動。

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

寫入檔案

這個方法會將指定的字串寫出至以 UTF-8 編碼的檔案。

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

將內容附加至檔案

這個方法會將指定的字串附加至以 UTF-8 編碼的檔案。

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

刪除檔案或目錄

此方法會移除檔案或目錄。

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

掛接/卸除目錄

在檔案掛接和卸除尋找詳細使用方式的詳細資訊。

筆記本公用程式

使用 MSSparkUtils Notebook 公用程式來執行筆記本,或使用 值結束筆記本。 執行下列命令以取得可用方法的概觀:

mssparkutils.notebook.help()

輸出:


exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

注意

筆記本公用程式不適用於 Apache Spark 作業定義(SJD)。

參考筆記本

此方法會參考筆記本,並傳回其結束值。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。 所參考的筆記本會在呼叫此函式的筆記本 Spark 集區上執行。

mssparkutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

例如:

mssparkutils.notebook.run("Sample1", 90, {"input": 20 })

網狀架構筆記本也支持藉由指定 工作區標識碼,跨多個工作區參考筆記本。

mssparkutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

您可以在資料格輸出中開啟參考執行的快照集連結。 快照集會擷取程式代碼執行結果,並可讓您輕鬆地偵錯參考執行。

參考執行結果的螢幕快照。

快照範例的螢幕快照。

注意

  • 運行時間 1.2 版和更新版本支援跨工作區參考筆記本。
  • 如果您使用 Notebook 資源底下的檔案,請在mssparkutils.nbResPath參考的筆記本中使用 ,確定它指向與互動式執行相同的資料夾。

參考以平行方式執行多個筆記本

重要

這項功能處於預覽狀態

方法 mssparkutils.notebook.runMultiple() 可讓您平行執行多個筆記本,或使用預先定義的拓撲結構。 API 會在Spark會話中使用多線程實作機制,這表示參考筆記本會共用計算資源。

使用 mssparkutils.notebook.runMultiple(),您可以:

  • 同時執行多個筆記本,而不需要等待每個筆記本完成。

  • 使用簡單的 JSON 格式,指定筆記本的相依性和執行順序。

  • 優化 Spark 計算資源的使用,並降低 Fabric 專案的成本。

  • 檢視輸出中每個筆記本執行記錄的快照集,並方便偵錯/監視筆記本工作。

  • 取得每個執行活動的結束值,並在下游工作中使用這些值。

您也可以嘗試執行 mssparkutils.notebook.help(“runMultiple”) 來尋找範例和詳細的使用方式。

以下是使用此方法平行執行筆記本清單的簡單範例:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

根筆記本的執行結果如下所示:

參考筆記本清單的螢幕快照。

以下是使用 mssparkutils.notebook.runMultiple()執行具有拓撲結構的筆記本範例。 使用此方法,輕鬆地透過程式代碼體驗協調筆記本。

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

根筆記本的執行結果如下所示:

使用參數參考筆記本清單的螢幕快照。

注意

  • 多個筆記本執行的平行處理原則程度僅限於Spark會話的總可用計算資源。
  • 中的 msspakrutils.notebook.runMultiple() 筆記本活動上限為 50,因為計算資源使用量,超過 50 個筆記本活動可能會有穩定性和效能問題。 如果您仍想要在 API 中使用更多筆記本活動,您可以將 spark 設定 'spark.notebookutils.runmultiple.limit' 設定為較大的值作為因應措施。 您可以在附加的環境中或使用 %%configure 命令來設定 Spark 屬性。
  • 如果您想要使用超過 50 個筆記本活動,若要避免快照集和進度列內容大小超過筆記本內容大小上限總計,建議您執行下列命令來停用豐富的 UX 功能。
    com.microsoft.spark.notebook.common.Configs.notebookRunSnapshotEnabled = false
    

結束筆記本

這個方法會結束具有 值的筆記本。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。

  • 當您以互動方式從筆記本呼叫 exit() 函式時,Fabric 筆記本會擲回例外狀況、略過執行後續單元格,並讓 Spark 會話保持運作。

  • 當您在管線中協調呼叫 exit() 函式的筆記本時,Notebook 活動會傳回具有結束值、完成管線執行,並停止 Spark 工作階段。

  • 當您在所參考的筆記本中呼叫 exit() 函式時,Fabric Spark 將會停止進一步執行參考的筆記本,並繼續在呼叫 run() 函式的主要筆記本中執行下一個單元格。 例如:Notebook1 有三個儲存格,並呼叫第二個 儲存格中的 exit() 函式。 Notebook2 的第三個數據格中有五個數據格並呼叫 run(notebook1)。 當您執行 Notebook2 時,Notebook1 會在達到 exit() 函式時停止在第二個單元格。 Notebook2 會繼續執行第四個儲存格和第五個儲存格。

mssparkutils.notebook.exit("value string")

例如:

具有下列兩個儲存格的 Sample1 筆記本:

  • 單元格 1 會 定義預設值設定為 10 的輸入 參數。

  • 單元格 2 會結束筆記本,輸入為結束值。

顯示結束函式範例筆記本的螢幕快照。

您可以使用預設值在另一個筆記本中執行 Sample1

exitVal = mssparkutils.notebook.run("Sample1")
print (exitVal)

輸出:

Notebook executed successfully with exit value 10

您可以在另一個筆記本中執行 Sample1,並將輸入值設定為 20:

exitVal = mssparkutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

輸出:

Notebook executed successfully with exit value 20

認證公用程式

您可以使用 MSSparkUtils 認證公用程式來取得存取令牌,以及管理 Azure 金鑰保存庫 中的秘密。

執行下列命令以取得可用方法的概觀:

mssparkutils.credentials.help()

輸出:

getToken(audience, name): returns AAD token for a given audience, name (optional)
getSecret(keyvault_endpoint, secret_name): returns secret for a given Key Vault and secret name

取得權杖

getToken 會針對指定的對象和名稱傳回Microsoft Entra 令牌(選擇性)。 下列清單顯示目前可用的物件金鑰:

  • 記憶體物件資源:“記憶體”
  • Power BI 資源:“pbi”
  • Azure 金鑰保存庫 資源:“keyvault”
  • Synapse RTA KQL DB 資源:“kusto”

執行下列命令以取得權杖:

mssparkutils.credentials.getToken('audience Key')

使用用戶認證取得秘密

getSecret 會使用使用者認證,針對指定的 Azure 金鑰保存庫 端點和秘密名稱,傳回 Azure 金鑰保存庫 秘密。

mssparkutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

檔案掛接和卸除

Fabric 支援Microsoft Spark 公用程式套件中的下列掛接案例。 您可以使用掛接、卸除getMountPath()mounts() API,將遠端記憶體 (ADLS Gen2) 連結至所有工作節點(驅動程序節點和背景工作節點)。 儲存載入點就緒之後,請使用本機檔案 API 來存取數據,就好像儲存在本機文件系統中一樣。

如何掛接 ADLS Gen2 帳戶

下列範例說明如何掛接 Azure Data Lake Storage Gen2。 掛接 Blob 記憶體的運作方式類似。

此範例假設您有一個名為 storegen2 的 Data Lake Storage Gen2 帳戶,而該帳戶有一個名為 mycontainer 的容器,而您想要掛接至 /test筆記本 Spark 會話。

顯示要掛接容器的位置螢幕快照。

若要掛接名為 mycontainer 的容器, mssparkutils 必須先檢查您是否有權存取容器。 目前,Fabric 支援兩種觸發程式掛接作業的驗證方法: accountKeysastoken

透過共用存取簽章令牌或帳戶密鑰掛接

MSSparkUtils 支援明確傳遞帳戶密鑰或 共用存取簽章 (SAS) 令牌作為掛接目標的參數。

基於安全性考慮,我們建議您將帳戶密鑰或 SAS 令牌儲存在 Azure 金鑰保存庫 中(如下列螢幕快照所示)。 接著,您可以使用 mssparkutils.credentials.getSecret API 來擷取它們。 如需 Azure 金鑰保存庫 的詳細資訊,請參閱關於 Azure 金鑰保存庫 受控記憶體帳戶密鑰

顯示秘密儲存在 Azure 金鑰保存庫 的螢幕快照。

accountKey 方法的範例程序代碼:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

sastoken範例程序代碼:

from notebookutils import mssparkutils  
# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = mssparkutils.credentials.getSecret("<vaultURI>", "<secretName>")
mssparkutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

注意

如果無法使用,您可能需要匯 mssparkutils 入:

from notebookutils import mssparkutils

掛接參數:

  • fileCacheTimeout:Blob 預設會在本機暫存資料夾中快取 120 秒。 在此期間,Blobfuse 不會檢查檔案是否為最新狀態。 參數可以設定為變更預設逾時時間。 當多個用戶端同時修改檔案時,為了避免本機和遠端檔案之間的不一致,建議您縮短快取時間,甚至將其變更為 0,並且一律從伺服器取得最新的檔案。
  • timeout:掛接作業逾時預設為120秒。 參數可以設定為變更預設逾時時間。 當執行程式太多或掛接逾時時,建議增加值。

您可以使用這些參數,如下所示:

mssparkutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

注意

基於安全性考慮,我們建議您不要將認證儲存在程序代碼中。 為了進一步保護您的認證,我們會在筆記本輸出中修訂您的秘密。 如需詳細資訊,請參閱 秘密修訂

如何掛接湖屋

將 Lakehouse 掛接至 /test 的範例程式代碼:

from notebookutils import mssparkutils 
mssparkutils.fs.mount( 
 "abfss://<workspace_id>@msit-onelake.dfs.fabric.microsoft.com/<lakehouse_id>", 
 "/test"
)

使用 mssparktuils fs API 存取裝入點下的檔案

掛接作業的主要目的是讓客戶使用本機檔系統 API 存取儲存在遠端記憶體帳戶中的數據。 您也可以使用 mssparkutils fs API 搭配掛接路徑做為參數來存取數據。 此路徑格式稍有不同。

假設您使用掛接 API 將 Data Lake Storage Gen2 容器 mycontainer 掛接至 /test 。 當您使用本機檔案系統 API 存取資料時,路徑格式如下所示:

/synfs/notebook/{sessionId}/test/{filename}

當您想要使用 mssparkutils fs API 存取數據時,建議您使用 getMountPath() 來取得正確的路徑:

path = mssparkutils.fs.getMountPath("/test")
  • 列出目錄:

    mssparkutils.fs.ls(f"file://{mssparkutils.fs.getMountPath('/test')}")
    
  • 讀取檔案內容:

    mssparkutils.fs.head(f"file://{mssparkutils.fs.getMountPath('/test')}/myFile.txt")
    
  • 建立目錄:

    mssparkutils.fs.mkdirs(f"file://{mssparkutils.fs.getMountPath('/test')}/newdir")
    

透過本機路徑存取裝入點下的檔案

您可以使用標準檔案系統,輕鬆地在裝入點中讀取和寫入檔案。 以下是 Python 範例:

#File read
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(mssparkutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

如何檢查現有的裝入點

您可以使用 mssparkutils.fs.mounts() API 來檢查所有現有的裝入點資訊:

mssparkutils.fs.mounts()

如何卸除裝入點

使用下列程式代碼來卸載入點 (/在此範例中測試 ):

mssparkutils.fs.unmount("/test")

已知的限制

  • 目前的掛接是作業層級組態;建議您使用 掛接 API 來檢查載入點是否存在或無法使用。

  • 取消掛接機制不是自動的。 當應用程式執行完成時,若要卸載裝入點並釋放磁碟空間,您必須在程式代碼中明確呼叫卸載 API。 否則,在應用程式執行完成之後,裝入點仍會存在於節點中。

  • 不支援掛接 ADLS Gen1 記憶體帳戶。

Lakehouse 公用程式

mssparkutils.lakehouse 提供專為管理 Lakehouse 成品量身打造的公用程式。 這些公用程式可讓用戶輕鬆建立、擷取、更新和刪除 Lakehouse 成品。

注意

只有運行時間 1.2+ 版才支援 Lakehouse API。

方法概觀

以下是 所提供的 mssparkutils.lakehouse可用方法概觀:

# Create a new Lakehouse artifact
create(name: String, description: String = "", workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean

# List all Lakehouse artifacts
list(workspaceId: String = ""): Array[Artifact]

使用範例

若要有效地使用這些方法,請考慮下列使用範例:

建立 Lakehouse 成品

artifact = mssparkutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

擷取 Lakehouse 成品

artifact = mssparkutils.lakehouse.get("artifact_name", "optional_workspace_id")

更新 Lakehouse 成品

updated_artifact = mssparkutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

刪除 Lakehouse 成品

is_deleted = mssparkutils.lakehouse.delete("artifact_name", "optional_workspace_id")

列出 Lakehouse 成品

artifacts_list = mssparkutils.lakehouse.list("optional_workspace_id")

其他資訊

如需每個方法及其參數的詳細資訊,請使用 函式 mssparkutils.lakehouse.help("methodName")

使用 MSSparkUtils 的 Lakehouse 公用程式,管理 Lakehouse 成品會變得更有效率,並整合到您的網狀架構管線中,增強整體數據管理體驗。

您可以隨意探索這些公用程式,並將其併入您的網狀架構工作流程,以便順暢地進行 Lakehouse 成品管理。

運行時間公用程式

顯示工作階段內容資訊

您可以 mssparkutils.runtime.context 透過 取得目前即時會話的內容資訊,包括筆記本名稱、預設 Lakehouse、工作區資訊,如果是管線執行等等。

mssparkutils.runtime.context

已知問題

使用高於 1.2 的運行時間版本並執行 mssparkutils.help()時,目前不支援列出的 fabricClient倉儲工作區 API,進一步提供。