Proteja credenciais com serviços vinculados usando o mssparkutils

O acesso a dados de fontes externas é um padrão comum. A menos que a fonte de dados externa permita acesso anônimo, é provável que você precise proteger sua conexão com uma credencial, segredo ou cadeia de conexão.

O Azure Synapse Analytics usa a passagem do Microsoft Entra por padrão para autenticação entre recursos. Se você precisar se conectar a um recurso usando outras credenciais, use o mssparkutils diretamente. O pacote mssparkutils simplifica o processo de recuperação de tokens SAS, tokens Microsoft Entra, cadeias de conexão e segredos armazenados em um serviço vinculado ou de um Cofre de Chaves do Azure.

A passagem do Microsoft Entra usa permissões atribuídas a você como um usuário na ID do Microsoft Entra, em vez de permissões atribuídas ao Synapse ou a uma entidade de serviço separada. Por exemplo, se você quiser usar a passagem do Microsoft Entra para acessar um blob em uma conta de armazenamento, então você deve ir para essa conta de armazenamento e atribuir a função de contribuidor de blob a si mesmo.

Ao recuperar segredos do Cofre da Chave do Azure, recomendamos a criação de um serviço vinculado ao seu Cofre da Chave do Azure. Verifique se a identidade de serviço gerenciado (MSI) do espaço de trabalho Synapse tem privilégios de Obter Segredo em seu Cofre de Chaves do Azure. O Synapse será autenticado no Azure Key Vault usando a identidade do serviço gerenciado do espaço de trabalho Synapse. Se você se conectar diretamente ao Cofre da Chave do Azure sem um serviço vinculado, será autenticado usando sua credencial do usuário Microsoft Entra.

Para obter mais informações, consulte Serviços vinculados.

Utilização

Ajuda do mssparkutils para tokens e segredos

Esta função exibe a documentação de ajuda para o gerenciamento de segredos e tokens no Synapse.

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Console.WriteLine(TokenLibrary.help());

Obter resultado:

 getToken(audience: String, name: String): returns AAD token for a given audience, name (optional)
 isValidToken(token: String): returns true if token hasn't expired
 getConnectionStringOrCreds(linkedService: String): returns connection string or credentials for the linked service
 getFullConnectionString(linkedService: String): returns full connection string with credentials for the linked service
 getPropertiesAll(linkedService: String): returns all the properties of the linked service
 getSecret(akvName: String, secret: String, linkedService: String): returns AKV secret for a given AKV linked service, akvName, secret key using workspace MSI
 getSecret(akvName: String, secret: String): returns AKV secret for a given akvName, secret key using user credentials
 getSecretWithLS(linkedService: String, secret: String): returns AKV secret for a given linked service, secret key
 putSecret(akvName: String, secretName: String, secretValue: String): puts AKV secret for a given akvName, secretName
 putSecret(akvName: String, secretName: String, secretValue: String, linkedService: String): puts AKV secret for a given akvName, secretName
 putSecretWithLS(linkedService: String, secretName: String, secretValue: String): puts AKV secret for a given linked service, secretName

Aceder ao Azure Data Lake Storage Gen2

Armazenamento primário ADLS Gen2

O acesso a arquivos do Armazenamento do Azure Data Lake primário usa a passagem do Microsoft Entra para autenticação por padrão e não requer o uso explícito do mssparkutils. A identidade usada na autenticação de passagem difere com base em alguns fatores. Por padrão, os blocos de anotações interativos são executados usando a identidade do usuário, mas ela pode ser alterada para a identidade de serviço gerenciado (MSI) do espaço de trabalho. Trabalhos em lote e execuções não interativas do bloco de anotações usam o MSI do espaço de trabalho.

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")
display(df.limit(10))
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')
display(df.limit(10))

Armazenamento ADLS Gen2 com serviços vinculados

O Azure Synapse Analytics fornece uma experiência de serviços vinculados integrada ao se conectar ao Azure Data Lake Storage Gen2. Os serviços vinculados podem ser configurados para autenticação usando uma Chave de Conta, Entidade de Serviço, Identidade Gerenciada ou Credencial.

Quando o método de autenticação do serviço vinculado é definido como Chave de Conta, o serviço vinculado se autentica usando a chave de conta de armazenamento fornecida, solicita uma chave SAS e a aplica automaticamente à solicitação de armazenamento usando o LinkedServiceBasedSASProvider.

Synapse permite que os usuários definam o serviço vinculado para uma conta de armazenamento específica. Isso torna possível ler/gravar dados de várias contas de armazenamento em um único aplicativo/consulta spark. Uma vez que definimos spark.storage.synapse.{ source_full_storage_account_name}.linkedServiceName para cada conta de armazenamento que será usada, o Synapse descobre qual serviço vinculado usar para uma determinada operação de leitura/gravação. No entanto, se nosso trabalho de faísca lida apenas com uma única conta de armazenamento, podemos simplesmente omitir o nome da conta de armazenamento e usar spark.storage.synapse.linkedServiceName

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.auth.type.$source_full_storage_account_name", "SAS")
sc.hadoopConfiguration.set(s"fs.azure.sas.token.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Set the required configs
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<lINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.auth.type.{source_full_storage_account_name}", "SAS")
sc._jsc.hadoopConfiguration().set(f"fs.azure.sas.token.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")

# Python code
df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

Quando o método de autenticação do serviço vinculado é definido como Identidade Gerenciada ou Entidade de Serviço, o serviço vinculado usará a Identidade Gerenciada ou o token da Entidade de Serviço com o provedor LinkedServiceBasedTokenProvider.

val sc = spark.sparkContext
val source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(s"spark.storage.synapse.$source_full_storage_account_name.linkedServiceName", "<LINKED SERVICE NAME>")
sc.hadoopConfiguration.set(s"fs.azure.account.oauth.provider.type.$source_full_storage_account_name", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider") 
val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
# Python code
source_full_storage_account_name = "teststorage.dfs.core.windows.net"
spark.conf.set(f"spark.storage.synapse.{source_full_storage_account_name}.linkedServiceName", "<LINKED SERVICE NAME>")
sc._jsc.hadoopConfiguration().set(f"fs.azure.account.oauth.provider.type.{source_full_storage_account_name}", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<DIRECTORY PATH>')

df.show()

Armazenamento ADLS Gen2 sem serviços vinculados

Conecte-se ao armazenamento ADLS Gen2 diretamente usando uma chave SAS. Use o ConfBasedSASProvider e forneça a chave SAS para a spark.storage.synapse.sas definição de configuração. Os tokens SAS podem ser definidos no nível do contêiner, da conta ou global. Não recomendamos a configuração de chaves SAS em nível global, pois o trabalho não poderá ler/gravar de mais de uma conta de armazenamento.

Configuração SAS por contêiner de armazenamento

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<CONTAINER>.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

Configuração SAS por conta de armazenamento

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<ACCOUNT>.dfs.core.windows.net", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.<ACCOUNT>.dfs.core.windows.net.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

Configuração SAS de todas as contas de armazenamento

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark

sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.ConfBasedSASProvider")
spark.conf.set("spark.storage.synapse.sas", "<SAS KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

Armazenamento ADLS Gen2 com o Azure Key Vault

Conecte-se ao armazenamento ADLS Gen2 usando um token SAS armazenado no segredo do Cofre da Chave do Azure.

%%spark
sc.hadoopConfiguration.set("fs.azure.account.auth.type", "SAS")
sc.hadoopConfiguration.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

val df = spark.read.csv("abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>")

display(df.limit(10))
%%pyspark
sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type", "SAS")
sc._jsc.hadoopConfiguration().set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.AkvBasedSASProvider")
spark.conf.set("spark.storage.synapse.akv", "<AZURE KEY VAULT NAME>")
spark.conf.set("spark.storage.akv.secret", "<SECRET KEY>")

df = spark.read.csv('abfss://<CONTAINER>@<ACCOUNT>.dfs.core.windows.net/<FILE PATH>')

display(df.limit(10))

TokenLibrary para outros serviços vinculados

Para se conectar a outros serviços vinculados, você pode fazer uma chamada direta para a TokenLibrary.

getConnectionString()

Para recuperar a cadeia de conexão, use a função getConnectionString e passe o nome do serviço vinculado.

%%spark
// retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%pyspark
# retrieve connectionstring from mssparkutils

mssparkutils.credentials.getFullConnectionString("<LINKED SERVICE NAME>")
%%csharp
// retrieve connectionstring from TokenLibrary

using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetConnectionString(<LINKED SERVICE NAME>);
Console.WriteLine(connectionString);

getPropertiesAll()

O getPropertiesAll é uma função auxiliar disponível em Scala e Python para obter todas as propriedades de um serviço vinculado

%%pyspark
import json
# retrieve connectionstring from mssparkutils

json.loads(mssparkutils.credentials.getPropertiesAll("<LINKED SERVICE NAME>"))

A saída terá a seguinte aparência

{
    'AuthType': 'Key',
    'AuthKey': '[REDACTED]',
    'Id': None,
    'Type': 'AzureBlobStorage',
    'Endpoint': 'https://storageaccount.blob.core.windows.net/',
    'Database': None
}

GetSecret()

Para recuperar um segredo armazenado do Cofre de Chaves do Azure, recomendamos que você crie um serviço vinculado ao Cofre de Chaves do Azure dentro do espaço de trabalho Sinapse. A identidade do serviço gerenciado do espaço de trabalho Synapse precisará receber permissão GET Secrets para o Cofre da Chave do Azure. O serviço vinculado usará a identidade do serviço gerenciado para se conectar ao serviço Azure Key Vault para recuperar o segredo. Caso contrário, conectar-se diretamente ao Cofre de Chaves do Azure usará a credencial Microsoft Entra do usuário. Nesse caso, o usuário precisará receber as permissões Obter Segredo no Cofre da Chave do Azure.

Em nuvens governamentais, forneça o nome de domínio totalmente qualificado do keyvault.

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>" [, <LINKED SERVICE NAME>])

Para recuperar um segredo do Cofre de Chaves do Azure, use a função mssparkutils.credentials.getSecret().


mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")

mssparkutils.credentials.getSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>")
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Utils;

string connectionString = TokenLibrary.GetSecret("<AZURE KEY VAULT NAME>", "<SECRET KEY>", "<LINKED SERVICE NAME>");
Console.WriteLine(connectionString);

Conexões de serviço vinculado suportadas pelo tempo de execução do Spark

Embora o Azure Synapse Analytics ofereça suporte a uma variedade de conexões de serviço vinculado (de pipelines e outros produtos do Azure), nem todas elas são suportadas pelo tempo de execução do Spark. Aqui está a lista de serviços vinculados suportados:

  • Armazenamento de Blobs do Azure
  • Serviços de IA do Azure
  • Azure Cosmos DB
  • Azure Data Explorer
  • Base de Dados do Azure para MySQL
  • Base de Dados do Azure para PostgreSQL
  • Repositório Azure Data Lake (Gen1)
  • Azure Key Vault
  • Azure Machine Learning
  • Azure Purview
  • Base de Dados SQL do Azure
  • Azure SQL Data Warehouse (dedicado e sem servidor)
  • Armazenamento do Azure

mssparkutils.credentials.getToken()

Quando você precisa de um token de portador OAuth para acessar serviços diretamente, você pode usar o getToken método. Os seguintes recursos são suportados:

Nome do Serviço Literal de cadeia de caracteres a ser usado na chamada de API
Armazenamento do Azure Storage
Azure Key Vault Vault
Gestão do Azure AzureManagement
Azure SQL Data Warehouse (dedicado e sem servidor) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
Base de Dados do Azure para MySQL AzureOSSDB
Azure Database for MariaDB AzureOSSDB
Base de Dados do Azure para PostgreSQL AzureOSSDB

Acesso ao serviço vinculado sem suporte a partir do tempo de execução do Spark

Os seguintes métodos de acesso aos serviços vinculados não são suportados a partir do tempo de execução do Spark:

  • Passando argumentos para o serviço vinculado parametrizado
  • Conexões com identidades gerenciadas atribuídas pelo usuário (UAMI)
  • As identidades gerenciadas atribuídas pelo sistema não são suportadas no recurso Keyvault
  • Para conexões do Azure Cosmos DB, o acesso baseado em chave sozinho é suportado. O acesso baseado em token não é suportado.

Ao executar um bloco de anotações ou um trabalho do Spark, as solicitações para obter um token/segredo usando um serviço vinculado podem falhar com uma mensagem de erro que indica 'BadRequest'. Isso geralmente é causado por um problema de configuração com o serviço vinculado. Se você vir essa mensagem de erro, verifique a configuração do seu serviço vinculado. Se você tiver alguma dúvida, entre em contato com o Suporte do Microsoft Azure no portal do Azure.