共用方式為


使用 mssparkutils 保護具有連結服務的認證

從外部來源存取數據是常見的模式。 除非外部數據源允許匿名存取,否則您可能需要使用認證、秘密或 連接字串 來保護連線。

Azure Synapse Analytics 預設會使用 Microsoft Entra 傳遞在資源之間進行驗證。 如果您需要使用其他認證連線到資源,請直接使用 mssparkutils。 mssparkutils 套件可簡化擷取 SAS 令牌、Microsoft Entra 令牌、連接字串,以及儲存在連結服務或 Azure 金鑰保存庫 中的秘密的程式。

Microsoft Entra 傳遞會使用在 Microsoft Entra 識別符中以使用者身分指派給您的許可權,而不是指派給 Synapse 或個別服務主體的許可權。 例如,如果您想要使用 Microsoft Entra 傳遞來存取記憶體帳戶中的 Blob,則您應該移至該記憶體帳戶,並將 Blob 參與者角色指派給自己。

從 Azure 金鑰保存庫 擷取秘密時,建議您建立 Azure 金鑰保存庫 的連結服務。 請確定 Synapse 工作區受控服務識別 (MSI) 具有 Azure 金鑰保存庫 的秘密取得許可權。 Synapse 會使用 Synapse 工作區受控服務識別向 Azure 金鑰保存庫 進行驗證。 如果您直接連線到沒有連結服務的 Azure 金鑰保存庫,您將使用使用者 Microsoft Entra 認證進行驗證。

如需詳細資訊,請參閱 鏈接的服務

使用方式

令牌和秘密的 mssparkutils 說明

此函式會顯示 Synapse 中秘密和令牌管理的說明檔。

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

取得結果:

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

存取 Azure Data Lake Storage Gen2

ADLS Gen2 主要 儲存體

從主要 Azure Data Lake 存取檔案 儲存體 預設會使用 Microsoft Entra 傳遞進行驗證,而且不需要明確使用 mssparkutils。 傳遞驗證中使用的身分識別會根據幾個因素而有所不同。 根據預設,互動式筆記本會使用使用者的身分識別來執行,但可以變更為工作區受控服務識別 (MSI)。 批次作業和筆記本的非互動式執行會使用工作區 MSI。

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

具有連結服務的 ADLS Gen2 記憶體

Azure Synapse Analytics 會在連線到 Azure Data Lake 儲存體 Gen2 時提供整合式連結服務體驗。 鏈接的服務可以設定為使用帳戶密鑰、服務主體受控識別認證進行驗證。

當連結服務驗證方法設定為帳戶密鑰時,鏈接服務會使用提供的記憶體帳戶密鑰進行驗證、要求 SAS 金鑰,並使用 LinkedServiceBasedSASProvider 自動將它套用至記憶體要求

Synapse 可讓使用者設定特定記憶體帳戶的連結服務。 這可讓您從 單一 Spark 應用程式/查詢中的多個記憶體帳戶 讀取/寫入數據。 設定 spark.storage.synapse.{針對將使用的每個記憶體帳戶,source_full_storage_account_name}.linkedServiceName,Synapse 會找出要用於特定讀取/寫入作業的連結服務。 不過,如果我們的 Spark 作業只處理單一記憶體帳戶,我們只要省略記憶體帳戶名稱並使用 spark.storage.synapse.linkedServiceName

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

當鏈接的服務驗證方法設定為 受控識別 或服務 主體時,鏈接服務會搭配 LinkedServiceBasedTokenProvider 提供者使用受控識別或服務主體令牌。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

沒有連結服務的 ADLS Gen2 記憶體

使用 SAS 金鑰直接 連線 至 ADLS Gen2 記憶體。 使用並提供 ConfBasedSASProvider SAS 金鑰給組 spark.storage.synapse.sas 態設定。 SAS 令牌可以在容器層級、帳戶層級或全域設定。 不建議在全域層級設定SAS金鑰,因為作業將無法從多個記憶體帳戶讀取/寫入。

每個記憶體容器的SAS設定

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

每個記憶體帳戶的SAS設定

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

所有記憶體帳戶的SAS設定

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

使用 Azure 金鑰保存庫 的 ADLS Gen2 記憶體

使用儲存在 Azure 金鑰保存庫 秘密中的 SAS 令牌,連線 至 ADLS Gen2 記憶體。

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

其他鏈接服務的 TokenLibrary

若要連線到其他鏈接服務,您可以直接呼叫 TokenLibrary。

get 連線 ionString()

若要擷取 連接字串,請使用 get 連線 ionString 函式並傳入連結的服務名稱

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

getPropertiesAll 是 Scala 和 Python 中提供的協助程式函式,可取得連結服務的所有屬性

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

輸出看起來會像這樣

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.windows.net/',
    'Database': None
}

GetSecret()

若要從 Azure 金鑰保存庫 擷取儲存的秘密,建議您在 Synapse 工作區內建立 Azure 金鑰保存庫 的鏈接服務。 Synapse 工作區受控服務識別必須獲得 Azure 金鑰保存庫 的 GET 秘密許可權。 鏈接的服務會使用受控服務識別來連線到 Azure 金鑰保存庫 服務以擷取秘密。 否則,直接連線到 Azure 金鑰保存庫 會使用使用者的 Microsoft Entra 認證。 在此情況下,用戶必須獲得 Azure 金鑰保存庫 中的取得秘密許可權。

在政府雲端中,請提供keyvault的完整功能變數名稱。

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

若要從 Azure 金鑰保存庫 擷取秘密,請使用 mssparkutils.credentials.getSecret() 函式。


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Spark 運行時間支援的連結服務連線

雖然 Azure Synapse Analytics 支援各種不同的連結服務連線(來自管線和其他 Azure 產品),但並非所有連結服務連線都受 Spark 運行時間支援。 以下是支援的連結服務清單:

  • Azure Blob 儲存體
  • Azure AI 服務
  • Azure Cosmos DB
  • Azure Data Explorer
  • 適用於 MySQL 的 Azure 資料庫
  • 適用於 PostgreSQL 的 Azure 資料庫
  • Azure Data Lake Store (Gen1)
  • Azure Key Vault
  • Azure Machine Learning
  • Azure Purview
  • Azure SQL Database
  • Azure SQL 數據倉儲 (專用和無伺服器)
  • Azure 儲存體

mssparkutils.credentials.getToken()

當您需要 OAuth 持有人令牌來直接存取服務時,您可以使用 getToken 方法。 支援下列資源:

服務名稱 要用於 API 呼叫的字串常值
Azure 儲存體 Storage
Azure Key Vault Vault
Azure 管理 AzureManagement
Azure SQL 數據倉儲 (專用和無伺服器) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure 資料總管 AzureDataExplorer
適用於 MySQL 的 Azure 資料庫 AzureOSSDB
適用於 MariaDB 的 Azure 資料庫 AzureOSSDB
適用於 PostgreSQL 的 Azure 資料庫 AzureOSSDB

Spark 運行時間不支援的連結服務存取

Spark 運行時間不支援下列存取連結服務的方法:

  • 將自變數傳遞至參數化連結服務
  • 具有使用者指派受控識別的 連線 集 (UAMI)
  • Keyvault 資源不支持系統指派的受控識別
  • 針對 Azure Cosmos DB 連線,僅支援金鑰型存取。 不支援令牌型存取。

執行筆記本或 Spark 作業時,使用連結服務取得令牌/秘密的要求可能會失敗,並出現指出 『BadRequest』 的錯誤訊息。 這通常是因為連結服務的設定問題所造成。 如果您看到此錯誤訊息,請檢查連結服務的設定。 如果您有任何問題,請在 Azure 入口網站 連絡 Microsoft Azure 支援服務