共用方式為


Microsoft Spark 公用程式簡介

Microsoft Spark 公用程式 (MSSparkUtils) 是內建套件,可協助您輕鬆執行一般工作。 您可以使用 MSSparkUtils 來處理文件系統、取得環境變數、將筆記本鏈結在一起,以及使用秘密。 MSSparkUtils 可在 PySpark (Python)Scala.NET Spark (C#)R (Preview) Notebook 和 Synapse 管線中使用。

必要條件

設定 Azure Data Lake Storage Gen2 的存取權

Synapse Notebook 會使用 Microsoft Entra 傳遞來存取 ADLS Gen2 帳戶。 您必須是 記憶體 Blob 資料參與者 ,才能存取 ADLS Gen2 帳戶(或資料夾)。

Synapse 管線會使用工作區的受控服務識別 (MSI) 來存取記憶體帳戶。 若要在管線活動中使用 MSSparkUtils,您的工作區身分識別必須是 記憶體 Blob 數據參與者 ,才能存取 ADLS Gen2 帳戶(或資料夾)。

請遵循下列步驟,確定您的Microsoft Entra ID 和工作區 MSI 可以存取 ADLS Gen2 帳戶:

  1. 開啟 Azure 入口網站和您想要存取的記憶體帳戶。 您可以瀏覽至您想要存取的特定容器。

  2. 從左側面板中選取 [存取控制] [IAM]。

  3. 選取 [新增 > 角色指派 ] 以開啟 [新增角色指派] 頁面。

  4. 指派下列角色。 如需詳細步驟,請參閱使用 Azure 入口網站指派 Azure 角色

    設定
    角色 儲存體 Blob 資料參與者
    存取權指派對象 USER 和 MANAGEDIDENTITY
    成員 您的Microsoft Entra 帳戶和工作區身分識別

    注意

    受控識別名稱也就是工作區名稱。

    Azure 入口網站中的 [新增角色指派] 頁面。

  5. 選取儲存

您可以透過下列 URL 存取 ADLS Gen2 與 Synapse Spark 上的資料:

abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>

設定對 Azure Blob 儲存體的存取權

Synapse 會使用共用存取簽章 (SAS) 來存取 Azure Blob 儲存體。 若要避免在程式代碼中公開 SAS 金鑰,建議您在 Synapse 工作區中建立新的連結服務至您想要存取的 Azure Blob 儲存體 帳戶。

請遵循下列步驟,為 Azure Blob 儲存體 帳戶新增連結服務:

  1. 開啟 Azure Synapse Studio
  2. 從左側面板中選取 [管理],然後選取 [外部連線] 底下的 [鏈接服務]。
  3. 在右側 [新增鏈接的服務] 面板中搜尋 Azure Blob 儲存體。
  4. 選取 [繼續]。
  5. 選取 Azure Blob 儲存體 帳戶來存取並設定連結的服務名稱。 建議使用驗證方法帳戶密鑰
  6. 選取 [ 測試連線 ] 來驗證設定是否正確。
  7. 選取 [ 先建立 ],然後按兩下 [ 全部 發佈] 以儲存變更。

您可以透過下列 URL,使用 Synapse Spark 存取 Azure Blob 儲存體 上的數據:

wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>

以下是程式代碼範例:

from pyspark.sql import SparkSession

# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely

wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name


val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)

var blob_account_name = "";  // replace with your blob name
var blob_container_name = "";     // replace with your container name
var blob_relative_path = "";  // replace with your relative folder path
var linked_service_name = "";    // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);

spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);

var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";

Console.WriteLine(wasbs_path);

# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name

blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)

print( paste('Remote blob path: ',wasb_path))

設定 Azure 金鑰保存庫 的存取權

您可以將 Azure 金鑰保存庫 新增為連結服務,以在 Synapse 中管理您的認證。 請遵循下列步驟,將 Azure 金鑰保存庫 新增為 Synapse 鏈接服務:

  1. 開啟 Azure Synapse Studio

  2. 從左側面板中選取 [管理],然後選取 [外部連線] 底下的 [鏈接服務]。

  3. 在右側 [新增鏈接的服務] 面板中搜尋 Azure 金鑰保存庫。

  4. 選取 [Azure 金鑰保存庫 帳戶] 來存取並設定連結的服務名稱。

  5. 選取 [ 測試連線 ] 來驗證設定是否正確。

  6. 選取 [ 先建立 ],然後按兩下 [ 全部 發佈] 以儲存變更。

Synapse Notebook 會使用 Microsoft Entra 傳遞來存取 Azure 金鑰保存庫。 Synapse 管線會使用工作區身分識別(MSI)來存取 Azure 金鑰保存庫。 若要確保您的程式代碼同時在筆記本和 Synapse 管線中運作,建議您為 Microsoft Entra 帳戶和工作區身分識別授與秘密訪問許可權。

請遵循下列步驟來授與工作區身分識別的秘密存取權:

  1. 開啟您想要存取的 Azure 入口網站 和 Azure 金鑰保存庫。
  2. 從左側面板中選取 [ 存取原則 ]。
  3. 選取 [新增存取原則]:
    • 選擇 [金鑰]、[密碼] 和 [憑證管理 ] 作為組態範本。
    • 在選取主體中選取 您的Microsoft Entra 帳戶工作區身 分識別(與您的工作區名稱相同),或確定已指派它。
  4. 選取 [ 選取新增]。
  5. 選取 [儲存 ] 按鈕以認可變更。

檔案系統公用程式

mssparkutils.fs提供使用各種文件系統的公用程式,包括 Azure Data Lake Storage Gen2 (ADLS Gen2) 和 Azure Blob 儲存體。 請確定您已適當地設定 Azure Data Lake Storage Gen2Azure Blob 儲存體的存取權。

執行下列命令以取得可用方法的概觀:

from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()

產生結果:


mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory

Use mssparkutils.fs.help("methodName") for more info about a method.

列出檔案

列出目錄的內容。

mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")

檢視檔案屬性

傳回檔案屬性,包括檔名、檔案路徑、檔案大小、檔案修改時間,以及它是否為目錄和檔案。

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
    file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
    Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
    writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}

建立新的目錄

如果指定的目錄不存在,則建立指定的目錄,以及任何必要的父目錄。

mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")

複製檔案

複製檔案或目錄。 支援跨檔案系統複製。

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)

高效能複製檔案

此方法提供更快速的方式複製或移動檔案,特別是大量資料。

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively

注意

此方法僅支持 適用於 Apache Spark 3.3 的 Azure Synapse Runtime 和 適用於 Apache Spark 3.4 的 Azure Synapse Runtime。

預覽檔案內容

傳回指定檔案的第一個 'maxBytes' 位元組,做為以UTF-8編碼的字串。

mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)

移動檔案

移動檔案或目錄。 支援跨檔案系統移動。

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

寫入檔案

將指定的字串寫出至以UTF-8編碼的檔案。

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

將內容附加至檔案

將指定的字串附加至以UTF-8編碼的檔案。

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

注意

mssparkutils.fs.append()mssparkutils.fs.put() 不支援並行寫入相同的檔案,因為缺乏不可部分完成性保證。

刪除檔案或目錄

拿掉檔案或目錄。

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

筆記本公用程式

不支援。

您可以使用 MSSparkUtils Notebook 公用程式來執行筆記本,或使用 值結束筆記本。 執行下列命令,以取得可用方法的概觀:

mssparkutils.notebook.help()

取得結果:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

注意

筆記本公用程式不適用於 Apache Spark 工作定義 (SJD)。

筆記本參考

參考筆記本並傳回其結束值。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。 所參考的筆記本將會在筆記本呼叫此函式的 Spark 集區上執行。


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

例如:

mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })

執行完成之後,您會看到名為「檢視筆記本執行:筆記本名稱的快照連結顯示在數據格輸出中,您可以單擊連結以查看此特定回合的快照集。

貼齊連結 Python 的螢幕快照

參考以平行方式執行多個筆記本

方法 mssparkutils.notebook.runMultiple() 可讓您平行執行多個筆記本,或使用預先定義的拓撲結構。 API 會在 Spark 工作階段中使用多執行緒實作機制,這表示參考筆記本會共用計算資源。

使用 mssparkutils.notebook.runMultiple(),您可以:

  • 同時執行多個筆記本,而不需要等待每個筆記本完成。

  • 使用簡單的 JSON 格式,指定筆記本的相依性和執行順序。

  • 優化 Spark 計算資源的使用,並降低 Synapse 專案的成本。

  • 檢視輸出中每個筆記本執行記錄的快照集,並方便偵錯/監視筆記本工作。

  • 取得每個執行活動的結束值,並在下游工作中使用這些值。

您也可以嘗試執行 mssparkutils.notebook.help(“runMultiple”) 來尋找範例和詳細的使用方式。

以下是使用此方法平行執行筆記本清單的簡單範例:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

根筆記本的執行結果如下所示:

參考筆記本清單的螢幕擷取畫面。

以下是使用 mssparkutils.notebook.runMultiple() 執行具有拓撲結構的筆記本範例。 使用此方法,輕鬆地透過程式碼體驗協調筆記本。

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG)

注意

編輯筆記本

以一個值退出筆記本。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。

  • 當您以互動方式從筆記本呼叫 exit() 函式時,Azure Synapse 會擲回例外狀況、略過執行中的子序列單元格,並讓 Spark 會話保持運作。

  • 當您協調在 Synapse 管線中呼叫 exit() 函式的筆記本時,Azure Synapse 會傳回結束值、完成管線執行,以及停止 Spark 會話。

  • 當您在所參考的筆記本中呼叫函 exit() 式時,Azure Synapse 會停止所參考筆記本中的進一步執行,並繼續在呼叫 run() 函式的筆記本中執行下一個單元格。 例如:Notebook1 有三個儲存格,並呼叫第二個儲存格中的函 exit() 式。 Notebook2 有五個儲存格,並在第三個數據格中呼叫 run(notebook1) 。 當您執行 Notebook2 時,Notebook1 會在按下 exit() 函式時停止在第二個儲存格。 Notebook2 將繼續執行第四個儲存格和第五個儲存格。

mssparkutils.notebook.exit("value string")

例如:

Sample1 筆記本會在 folder/ 底下找到下列兩個儲存格:

  • 單元格 1 定義 輸入 參數,並將預設值設定為 10。
  • 單元格 2 會結束筆記本,並輸入為結束值。

範例筆記本的螢幕快照

您可以使用預設值在另一個筆記本中執行 Sample1


exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

產生結果:

Sample1 run success with input is 10

您可以在另一個筆記本中執行 Sample1,並將輸入值設定為 20:

exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)

產生結果:

Sample1 run success with input is 20

您可以使用 MSSparkUtils Notebook 公用程式來執行筆記本,或使用 值結束筆記本。 執行下列命令,以取得可用方法的概觀:

mssparkutils.notebook.help()

取得結果:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

筆記本參考

參考筆記本並傳回其結束值。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。 所參考的筆記本將會在筆記本呼叫此函式的 Spark 集區上執行。


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

例如:

mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))

執行完成之後,您會看到名為「檢視筆記本執行:筆記本名稱的快照連結顯示在數據格輸出中,您可以單擊連結以查看此特定回合的快照集。

貼齊連結 scala 的螢幕快照

編輯筆記本

以一個值退出筆記本。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。

  • 當您以互動方式呼叫 exit() 函式筆記本時,Azure Synapse 會擲回例外狀況、略過執行中的子序列單元格,以及讓Spark會話保持運作。

  • 當您協調在 Synapse 管線中呼叫 exit() 函式的筆記本時,Azure Synapse 會傳回結束值、完成管線執行,以及停止 Spark 會話。

  • 當您在所參考的筆記本中呼叫函 exit() 式時,Azure Synapse 會停止所參考筆記本中的進一步執行,並繼續在呼叫 run() 函式的筆記本中執行下一個單元格。 例如:Notebook1 有三個儲存格,並呼叫第二個儲存格中的函 exit() 式。 Notebook2 有五個儲存格,並在第三個數據格中呼叫 run(notebook1) 。 當您執行 Notebook2 時,Notebook1 會在按下 exit() 函式時停止在第二個儲存格。 Notebook2 將繼續執行第四個儲存格和第五個儲存格。

mssparkutils.notebook.exit("value string")

例如:

Sample1 Notebook 位於 mssparkutils/folder/ 底下,並具有下列兩個單元格:

  • 單元格 1 定義 輸入 參數,並將預設值設定為 10。
  • 單元格 2 會結束筆記本,並輸入為結束值。

範例筆記本的螢幕快照

您可以使用預設值在另一個筆記本中執行 Sample1


val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)

產生結果:

exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10

您可以在另一個筆記本中執行 Sample1,並將輸入值設定為 20:

val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)

產生結果:

exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20

您可以使用 MSSparkUtils Notebook 公用程式來執行筆記本,或使用 值結束筆記本。 執行下列命令,以取得可用方法的概觀:

mssparkutils.notebook.help()

取得結果:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

筆記本參考

參考筆記本並傳回其結束值。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。 所參考的筆記本將會在筆記本呼叫此函式的 Spark 集區上執行。


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

例如:

mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))

執行完成之後,您會看到名為「檢視筆記本執行:筆記本名稱的快照連結顯示在數據格輸出中,您可以單擊連結以查看此特定回合的快照集。

編輯筆記本

以一個值退出筆記本。 您可以在筆記本中以互動方式或在管線中執行巢狀函數呼叫。

  • 當您以互動方式呼叫 exit() 函式筆記本時,Azure Synapse 會擲回例外狀況、略過執行中的子序列單元格,以及讓Spark會話保持運作。

  • 當您協調在 Synapse 管線中呼叫 exit() 函式的筆記本時,Azure Synapse 會傳回結束值、完成管線執行,以及停止 Spark 會話。

  • 當您在所參考的筆記本中呼叫函 exit() 式時,Azure Synapse 會停止所參考筆記本中的進一步執行,並繼續在呼叫 run() 函式的筆記本中執行下一個單元格。 例如:Notebook1 有三個儲存格,並呼叫第二個儲存格中的函 exit() 式。 Notebook2 有五個儲存格,並在第三個數據格中呼叫 run(notebook1) 。 當您執行 Notebook2 時,Notebook1 會在按下 exit() 函式時停止在第二個儲存格。 Notebook2 將繼續執行第四個儲存格和第五個儲存格。

mssparkutils.notebook.exit("value string")

例如:

Sample1 筆記本會在 folder/ 底下找到下列兩個儲存格:

  • 單元格 1 定義 輸入 參數,並將預設值設定為 10。
  • 單元格 2 會結束筆記本,並輸入為結束值。

範例筆記本的螢幕快照

您可以使用預設值在另一個筆記本中執行 Sample1


exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

產生結果:

Sample1 run success with input is 10

您可以在另一個筆記本中執行 Sample1,並將輸入值設定為 20:

exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)

產生結果:

Sample1 run success with input is 20

認證公用程式

您可以使用 MSSparkUtils 認證公用程式來取得連結服務的存取令牌,以及管理 Azure 金鑰保存庫 中的秘密。

執行下列命令,以取得可用方法的概觀:

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()

取得結果:

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

注意

C# 目前不支援 getSecretWithLS(linkedService, secret) 。

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

取得權杖

傳回指定物件名稱(選擇性)的Microsoft Entra 標記。 下表列出所有可用的物件類型:

物件類型 要在 API 呼叫中使用的字串常值
Azure 儲存體 Storage
Azure Key Vault Vault
Azure 管理 AzureManagement
Azure SQL 資料倉儲 (專用與無伺服器) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
適用於 MySQL 的 Azure 資料庫 AzureOSSDB
適用於 MariaDB 的 Azure 資料庫 AzureOSSDB
適用於 PostgreSQL 的 Azure 資料庫 AzureOSSDB
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')

驗證令牌

如果 Token 尚未過期,則傳回 true。

mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')

取得連結服務的 連接字串 或認證

傳回連結服務 連接字串 或認證。

mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')

使用工作區身分識別取得秘密

使用工作區身分識別,傳回指定 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱的 Azure 金鑰保存庫 秘密。 請確定您已適當地設定 Azure 金鑰保存庫存取權。

mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')

使用使用者認證取得秘密

使用使用者認證傳回指定 Azure 金鑰保存庫 名稱、秘密名稱和連結服務名稱的 Azure 金鑰保存庫 秘密。

mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')

使用工作區身分識別放置秘密

使用工作區身分識別,為指定的 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱放置 Azure 金鑰保存庫 秘密。 請確定您已適當地設定 Azure 金鑰保存庫存取權。

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

使用工作區身分識別放置秘密

使用工作區身分識別,為指定的 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱放置 Azure 金鑰保存庫 秘密。 請確定您已適當地設定 Azure 金鑰保存庫存取權。

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")

使用工作區身分識別放置秘密

使用工作區身分識別,為指定的 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱放置 Azure 金鑰保存庫 秘密。 請確定您已適當地設定 Azure 金鑰保存庫存取權。

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

使用使用者認證放置秘密

使用使用者認證,為指定的 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱放置 Azure 金鑰保存庫 秘密。

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

使用使用者認證放置秘密

使用使用者認證,為指定的 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱放置 Azure 金鑰保存庫 秘密。

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

使用使用者認證放置秘密

使用使用者認證,為指定的 Azure 金鑰保存庫 名稱、秘密名稱和鏈接服務名稱放置 Azure 金鑰保存庫 秘密。

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")

環境公用程式

執行下列命令以取得可用方法的概觀:

mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()

取得結果:

getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id

取得用戶名稱

傳回目前的用戶名稱。

mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()

取得使用者標識碼

傳回目前的使用者標識碼。

mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()

取得作業標識碼

傳回作業標識碼。

mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()

取得工作區名稱

傳回工作區名稱。

mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()

取得集區名稱

傳回 Spark 集區名稱。

mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()

取得叢集 ID

傳回目前的叢集標識碼。

mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()

運行時間內容

Mssparkutils 運行時間公用程序公開 3 個運行時間屬性,您可以使用 mssparkutils 運行時間內容來取得如下所列的屬性:

  • Notebookname - 目前筆記本的名稱,一律會傳回互動式模式和管線模式的值。
  • Pipelinejobid - 管線執行標識符會傳回管線模式中的值,並在互動式模式中傳回空字串。
  • Activityrunid - 筆記本活動執行標識符,會在管線模式中傳回值,並在互動式模式中傳回空字串。

目前運行時間內容同時支援 Python 和 Scala。

mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
    writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context

工作階段管理

停止互動式會話

有時候,藉由在程式代碼中呼叫 API 來停止互動式會話,而不是手動按兩下 [停止] 按鈕。 針對這類情況,我們提供 API mssparkutils.session.stop() 以支援透過程式代碼停止互動式會話,其適用於 Scala 和 Python。

mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()

mssparkutils.session.stop() API 會在背景異步停止目前的互動式會話,它會停止 Spark 工作階段,並釋放會話佔用的資源,使其可供相同集區中的其他會話使用。

注意

我們不建議在程式代碼中呼叫 sys.exit 像是 Scala 或 sys.exit() Python 中的語言內建 API,因為這類 API 只會終止解釋器程式,讓 Spark 會話保持運作,且不會釋放資源。

套件相依性

如果您想要在本機開發筆記本或作業,而且需要參考編譯/IDE 提示的相關套件,您可以使用下列套件。

下一步