你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

通过使用 mssparkutils 的链接服务保护凭据

访问来自外部源的数据是一种常见的模式。 除非外部数据源允许匿名访问,否则很可能需要使用凭据、机密或连接字符串来保护连接。

默认情况下,Azure Synapse Analytics 使用 Microsoft Entra 传递进行资源之间的身份验证。 如果需要使用其他凭据连接到资源,请直接使用 mssparkutils。 mssparkutils 包简化了检索存储在链接服务或 Azure 密钥保管库中的 SAS 令牌、Microsoft Entra 令牌、连接字符串和机密的过程。

Microsoft Entra 传递使用你作为 Microsoft Entra ID 中的用户获得的权限,而不是使用分配给 Synapse 或单独服务主体的权限。 例如,若要使用 Microsoft Entra 传递来访问存储帐户中的 Blob,则应转到该存储帐户,将 Blob 参与者角色分配给自己。

从 Azure Key Vault 检索机密时,建议为 Azure Key Vault 创建链接服务。 确保 Synapse 工作区托管服务标识 (MSI) 对你的 Azure Key Vault 具有机密获取特权。 Synapse 将使用 Synapse 工作区托管服务标识向 Azure Key Vault 进行身份验证。 如果没有链接服务而直接连接到 Azure Key Vault,将使用用户 Microsoft Entra 凭据进行身份验证。

有关详细信息,请参阅链接服务

使用情况

令牌和机密的 mssparkutils 帮助

此函数显示 Synapse 中机密和令牌管理的帮助文档。

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

获取结果:

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

访问 Azure Data Lake Storage Gen2

ADLS Gen2 主存储

访问 Azure Data Lake Storage 中的文件时,默认使用 Microsoft Entra 传递进行身份验证,不需要显式使用 mssparkutils。 直通身份验证中使用的标识因几个因素而异。 默认情况下,将会使用用户的标识来执行交互式笔记本,但可以将其更改为工作区托管服务标识 (MSI)。 批处理作业和笔记本的非交互式执行使用工作区 MSI。

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

ADLS Gen2 存储与链接服务

在连接到 Azure Data Lake Storage Gen2 时,Azure Synapse Analytics 可提供集成的链接服务体验。 可以将链接服务配置为使用帐户密钥服务主体托管标识凭据进行身份验证。

当链接服务身份验证方法设置为“帐户密钥”时,链接服务将使用提供的存储帐户密钥进行身份验证,请求 SAS 密钥,并使用 LinkedServiceBasedSASProvider 将其自动应用于存储请求。

Synapse 支持用户为特定存储帐户设置链接服务。 这样,就可以从单个 Spark 应用程序/查询中的多个存储帐户读取/写入数据。 为要使用的每个存储帐户设置 spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName 后,Synapse 可确定要用于特定读取/写入操作的链接服务。 但是,如果 Spark 作业仅处理单个存储帐户,则只需省略存储帐户名称,并使用 spark.storage.synapse.linkedServiceName

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

如果将链接服务身份验证方法设置为“托管标识”或“服务主体”,则链接服务会将托管标识或服务主体令牌用于 LinkedServiceBasedTokenProvider 提供程序。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

ADLS Gen2 存储(无链接服务)

使用 SAS 密钥直接连接到 ADLS Gen2 存储。 使用 ConfBasedSASProvider 并为 spark.storage.synapse.sas 配置设置提供 SAS 密钥。 可以在容器级别、帐户级别或全局级别设置 SAS 令牌。 不建议在全局级别设置 SAS 密钥,因为作业将无法从多个存储帐户读取/写入。

每个存储容器的 SAS 配置

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

每个存储帐户的 SAS 配置

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

所有存储帐户的 SAS 配置

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

ADLS Gen2 存储与 Azure Key Vault

使用存储在 Azure Key Vault 机密中的 SAS 令牌连接到 ADLS Gen2 存储。

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

其他链接服务的 TokenLibrary

若要连接到其他链接服务,可以直接调用 TokenLibrary。

getConnectionString()

若要检索连接字符串,请使用 getConnectionString 函数并传入链接服务名称

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

getPropertiesAll 是 Scala 和 Python 中提供的帮助程序函数,用于获取链接服务的所有属性

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

输出类似于

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.windows.net/',
    'Database': None
}

GetSecret()

若要检索存储在 Azure Key Vault 中的机密,建议在 Synapse 工作区内创建到 Azure Key Vault 的链接服务。 需要向 Synapse 工作区托管服务标识授予对 Azure Key Vault 的 GET Secrets 权限。 链接服务将使用托管服务标识来连接到 Azure Key Vault 服务,以检索机密。 否则,直接连接到 Azure Key Vault 将使用用户的 Microsoft Entra 凭据。 在这种情况下,需要向用户授予 Azure Key Vault 中的“获取机密”权限。

在政府云中,请提供 keyvault 的完全限定域名。

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

要从 Azure 密钥保管库检索机密,请使用 mssparkutils.credentials.getSecret() 函数。


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Spark 运行时支持的链接服务连接

虽然 Azure Synapse Analytics 支持来自管道和其他 Azure 产品的各种链接服务连接,但 Spark 运行时并不支持所有这些服务连接。 下面是支持的链接服务列表:

  • Azure Blob 存储
  • Azure AI 服务
  • Azure Cosmos DB
  • Azure 数据资源管理器
  • Azure Database for MySQL
  • Azure Database for PostgreSQL
  • Azure Data Lake Store (Gen1)
  • Azure Key Vault
  • Azure 机器学习
  • Azure Purview
  • Azure SQL 数据库
  • Azure SQL Data Warehouse(专用和无服务器)
  • Azure 存储

mssparkutils.credentials.getToken()

如果需要使用 OAuth 持有者令牌直接访问服务,则可以使用 getToken 方法。 支持以下资源:

服务名称 要在 API 调用中使用的字符串文本
Azure 存储 Storage
Azure Key Vault Vault
Azure 管理 AzureManagement
Azure SQL Data Warehouse(专用和无服务器) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure 数据工厂 ADF
Azure 数据资源管理器 AzureDataExplorer
Azure Database for MySQL AzureOSSDB
Azure Database for MariaDB AzureOSSDB
Azure Database for PostgreSQL AzureOSSDB

不支持从 Spark 运行时进行链接服务访问

Spark 运行时不支持以下访问链接服务的方法:

  • 将参数传递给参数化链接服务
  • 具有用户分配的托管标识 (UAMI) 的连接
  • 密钥保管库资源不支持系统分配的托管标识
  • 对于 Azure Cosmos DB 连接,仅支持基于密钥的访问。 不支持基于令牌的访问。

运行笔记本或 Spark 作业时,使用链接服务获取令牌/机密的请求可能会失败,并显示指示“BadRequest”的错误消息。 这通常是由链接服务的配置问题引起的。 如果看到此错误消息,请检查链接服务的配置。 如有任何问题,请通过 Azure 门户联系 Microsoft Azure 支持部门。