次の方法で共有


mssparkutils を使用してリンク サービスでの資格情報をセキュリティで保護する

外部ソースからデータにアクセスすることは、よくあるパターンです。 外部データソースで匿名アクセスが許可されていない限り、資格情報、シークレット、または接続文字列を使用して接続をセキュリティで保護することがおそらく必要になります。

Azure Synapse Analytics では、リソース間の認証に既定で Microsoft Entra パススルーを使用します。 他の資格情報を使用してリソースに接続する必要がある場合は、mssparkutils を直接使用します。 mssparkutils は、リンク サービスまたは Azure Key Vault に格納されている SAS トークン、Microsoft Entra トークン、接続文字列、シークレットを取得するプロセスを簡略化します。

Microsoft Entra パススルーでは、Synapse または個別のサービス プリンシパルに割り当てられたアクセス許可ではなく、Microsoft Entra ID でユーザーとして割り当てられたアクセス許可が使用されます。 たとえば、Microsoft Entra パススルーを使用してストレージ アカウント内の BLOB にアクセスする場合は、そのストレージ アカウントに移動し、BLOB 共同作成者ロールを自分に割り当てる必要があります。

Azure Key Vault からシークレットを取得する場合は、Azure Key Vault にリンクされたサービスを作成することをお勧めします。 Synapse ワークスペースの管理サービス ID (MSI) に、Azure Key Vault に対するシークレット取得権限があることを確認してください。 Synapse は、Synapse ワークスペースの管理サービス ID を使用して Azure Key Vault に対して認証を行います。 リンクされたサービスを使用せずに Azure Key Vault に直接接続する場合は、ユーザーの Microsoft Entra 資格情報を使用して認証を行います。

詳細については、リンクされたサービスに関するページを参照してください。

使用法

トークンとシークレットに関する mssparkutils ヘルプ

この関数は、Synapse でのシークレットとトークン管理に関するヘルプ ドキュメントを表示します。

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

結果を取得します。

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

Azure Data Lake Storage Gen2 にアクセスする

ADLS Gen2 プライマリ ストレージ

プライマリの Azure Data Lake Storage のファイルへのアクセスでは、既定で認証に Microsoft Entra パススルーが使用されるため、mssparkutils を明示的に使用する必要はありません。 パススルー認証で使用される ID は、いくつかの要因によって異なります。 既定では、対話型ノートブックはユーザーの ID を使用して実行されますが、これはワークスペースの管理サービス ID (MSI) に変更できます。 ノートブックのバッチ ジョブと非対話型実行では、ワークスペース MSI が使用されます。

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

ADLS Gen2 ストレージ (リンクされたサービスを使用する場合)

Azure Data Lake Storage Gen2 に接続する際に、Azure Synapse Analytics によって、リンク サービスが統合されたエクスペリエンスが提供されます。 リンク サービスは、アカウント キーサービス プリンシパルマネージド ID、または資格情報を使用して認証するように構成できます。

リンクされたサービスの認証方法がアカウント キーに設定されている場合、リンクされたサービスは指定されたストレージ アカウント キーを使用して認証を行い、SAS キーを要求し、LinkedServiceBasedSASProvider を使用してそれを自動的にストレージ要求に適用します。

Synapse では、ユーザーが特定のストレージ アカウントにリンク サービスを設定できます。 これにより、1 つの Spark アプリケーション/クエリで複数のストレージ アカウントからデータの読み取り/書き込みを行うことが可能になります。 使用されるストレージ アカウントごとに spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName を設定すると、Synapse により、特定の読み取りまたは書き込み操作にどのリンク サービスを使用するかが判断されます。 ただし、が単一ストレージ アカウントのみを処理する Spark ジョブの場合は、ストレージ アカウント名を省略して spark.storage.synapse.linkedServiceName を使用できます。

Note

既定の ABFS ストレージ コンテナーの認証方法を変更することはできません。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

リンクされたサービスの認証方法がマネージド ID またはサービス プリンシパルに設定されている場合、リンクされたサービスは LinkedServiceBasedTokenProvider プロバイダーと共にマネージド ID またはサービス プリンシパルのトークンを使用します。

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

Spark 構成を使用して認証設定を設定する

spark ステートメントを実行するのではなく、spark 構成を使って認証設定を指定することもできます。 すべての spark 構成にはプレフィックス spark. を付ける必要があります。また、すべての hadoop 構成にはプレフィックス spark.hadoop. を付ける必要があります。

Spark 構成の名前 構成値
spark.storage.synapse.teststorage.dfs.core.windows.net.linkedServiceName LINKED SERVICE NAME
spark.hadoop.fs.azure.account.oauth.provider.type.teststorage.dfs.core.windows.net microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider

リンクされたサービスを使用しない ADLS Gen2 ストレージ

SAS キーを使用して ADLS Gen2 ストレージに直接接続します。 ConfBasedSASProvider を使用し、SAS キーを spark.storage.synapse.sas 構成設定に指定します。 SAS トークンは、コンテナー レベル、アカウント レベル、またはグローバルで設定できます。 グローバル レベルでの SAS キーの設定はお勧めしません。ジョブが複数のストレージ アカウントから読み取り/書き込みできなくなるためです。

ストレージ コンテナーごとの SAS 構成

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

ストレージ アカウントごとの SAS 構成

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

すべてのストレージ アカウントの SAS 構成

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

MSAL を使用してトークンを取得する (カスタムのアプリ資格情報を使用)

ABFS ストレージ ドライバーが認証に MSAL を直接使用するように構成されている場合、プロバイダーはトークンをキャッシュしません。 これにより、信頼性の問題が発生する可能性があります。 Synapse Spark の一部である ClientCredsTokenProvider を使用することをお勧めします。

%%spark
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.$source_full_storage_account_name", "<Entra AppId>")
spark.conf.set("fs.azure.account.oauth2.client.secret.$source_full_storage_account_name", "<Entra app secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.$source_full_storage_account_name", "https://login.microsoftonline.com/<tenantid>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{source_full_storage_account_name}.linkedServiceName", "<Entra AppId>")
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{source_full_storage_account_name}.linkedServiceName", "<Entra app secret>")
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{source_full_storage_account_name}.linkedServiceName", "https://login.microsoftonline.com/<tenantid>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

SAS トークンを使用する ADLS Gen2 ストレージ (Azure Key Vault から)

Azure Key Vault シークレットに格納されている SAS トークンを使用して ADLS Gen2 ストレージに接続します。

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

その他のリンクされたサービスの TokenLibrary

他のリンクされたサービスに接続するには、TokenLibrary を直接呼び出すことができます。

getConnectionString()

接続文字列を取得するには、getConnectionString 関数を使用し、リンクされたサービスの名前を渡します。

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

getPropertiesAll は、リンク サービスのすべてのプロパティを取得するために Scala と Python で使用できるヘルパー関数です

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

出力は次のようになります

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.windows.net/',
    'Database': None
}

GetSecret()

Azure Key Vault に格納されているシークレットを取得する場合は、Synapse ワークスペース内で Azure Key Vault へのリンクされたサービスを作成することをお勧めします。 Azure Key Vault に対するシークレット取得アクセス許可を Synapse ワークスペースの管理サービス ID に付与する必要があります。 リンクされたサービスは、管理サービス ID を使用して Azure Key Vault サービスに接続して、シークレットを取得します。 それ以外の場合は、Azure Key Vault に直接接続すると、ユーザーの Microsoft Entra 資格情報が使用されます。 この場合、ユーザーに Azure Key Vault に対するシークレット取得アクセス許可を付与する必要があります。

政府機関向けクラウドでは、キー コンテナーの完全修飾ドメイン名を指定してください。

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

Azure Key Vault からシークレットを取得するには、mssparkutils.credentials.getSecret() 関数を使用します。


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Spark ランタイムでサポートされているリンク サービス接続

Azure Synapse Analytics では、(パイプラインや他の Azure 製品からの) さまざまなリンク サービス接続がサポートされていますが、それらのすべてが Spark ランタイムでサポートされているわけではありません。 サポートされているリンク サービスの一覧を次に示します。

  • Azure Blob Storage
  • Azure AI サービス
  • Azure Cosmos DB
  • Azure Data Explorer
  • Azure Database for MySQL
  • Azure Database for PostgreSQL
  • Azure Data Lake Store (Gen1)
  • Azure Key Vault
  • Azure Machine Learning
  • Azure Purview
  • Azure SQL データベース
  • Azure SQL Data Warehouse (専用およびサーバーレス)
  • Azure Storage

mssparkutils.credentials.getToken()

サービスに直接アクセスするために OAuth ベアラー トークンが必要な場合は、getToken メソッドを使用できます。 次のリソースがサポートされています。

サービス名 API 呼び出しで使用される文字列リテラル
Azure Storage Storage
Azure Key Vault Vault
Azure Management AzureManagement
Azure SQL Data Warehouse (Dedicated and Serverless) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
Azure Database for MySQL AzureOSSDB
Azure Database for MariaDB AzureOSSDB
Azure Database for PostgreSQL AzureOSSDB

Spark ランタイムでサポートされていないリンク サービス アクセス

リンク サービスにアクセスする次のメソッドは、Spark ランタイムでサポートされていません。

  • パラメーター化されたリンク サービスに引数を渡す
  • ユーザー割り当てマネージド ID (UAMI) を使用する接続
  • Notebook または SparkJobDefinition がマネージド ID として実行される場合に、Keyvault リソースにベアラー トークンを取得する
    • 別の方法として、アクセス トークンを取得するのではなく、Keyvault にリンクされるサービスを作成し、ノートブックまたはバッチ ジョブからシークレットを取得することもできます
  • Azure Cosmos DB 接続では、キー ベースのアクセスのみサポートされます。 トークン ベースのアクセスはサポートされていません。

ノートブックまたは Spark ジョブの実行中に、リンク サービスを使用してトークン/シークレットを取得する要求が失敗し、"BadRequest" を示すエラー メッセージが表示される場合があります。 これは、多くの場合、リンク サービスの構成の問題が原因です。 このエラー メッセージが表示された場合は、リンク サービスの構成をチェックしてください。 ご質問がある場合は、Azure portal で Microsoft Azure サポートにお問い合わせください。