Introdução aos utilitários do Microsoft Spark
Os utilitários do Microsoft Spark (MSSparkUtils) são um pacote interno para ajudar você a executar tarefas comuns com facilidade. Você pode usar o MSSparkUtils para trabalhar com sistemas de arquivos, obter variáveis de ambiente, encadear notebooks e trabalhar com segredos. MSSparkUtils estão disponíveis em PySpark (Python)
, Scala
, .NET Spark (C#)
e R (Preview)
notebooks e pipelines Synapse.
Pré-requisitos
Configurar o acesso ao Azure Data Lake Storage Gen2
Os notebooks do Synapse usam a passagem do Microsoft Entra para acessar as contas do ADLS Gen2. Você precisa ser um Colaborador de dados de blob de armazenamento para acessar a conta de ADLS Gen2 (ou a pasta).
Os pipelines do Synapse usam a MSI (identidade do serviço gerenciado) para acessar as contas de armazenamento. Para usar o MSSparkUtils em suas atividades de pipeline, sua identidade do workspace precisa ser Colaborador de dados de blob de armazenamento para acessar a conta do ADLS Gen2 (ou a pasta).
Siga estas etapas para garantir que sua ID do Microsoft Entra e a MSI do workspace tenham acesso à conta do ADLS Gen2:
Abra o portal do Azure e a conta de armazenamento que você deseja acessar. Você pode navegar até o contêiner específico que deseja acessar.
Selecione Controle de acesso (IAM) no painel esquerdo.
Selecione Adicionar>Adicionar atribuição de função para abrir a página Adicionar atribuição de função.
Atribua a função a seguir. Para ver as etapas detalhadas, confira Atribuir funções do Azure usando o portal do Azure.
Configuração Valor Função Colaborador de dados de blob de armazenamento Atribuir acesso a USER e MANAGEDIDENTITY Membros sua conta do Microsoft Entra e sua identidade de workspace Observação
O nome da identidade gerenciada também é o nome do workspace.
Selecione Salvar.
Você pode acessar os dados no ADLS Gen2 com o Spark do Azure Synapse por meio da seguinte URL:
abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>
Configurar o acesso ao Armazenamento de Blobs do Azure
O Synapse usa a SAS (assinatura de acesso compartilhado) para acessar o Armazenamento de Blobs do Azure. Para evitar a exposição de chaves SAS no código, é recomendável criar um serviço vinculado no workspace do Azure Synapse para a conta do Armazenamento de Blobs do Azure que você deseja acessar.
Siga estas etapas para adicionar um novo serviço vinculado para uma conta do Armazenamento de Blobs do Azure:
- Abra o Azure Synapse Studio.
- Selecione Gerenciar no painel esquerdo e selecione Serviços vinculados em Conexões externas.
- Pesquise Armazenamento de Blobs do Azure no painel de Novo serviço vinculado à direita.
- Selecione Continuar.
- Selecione a conta do Armazenamento de Blobs do Azure para acessar e configurar o nome do serviço vinculado. Sugira usar a Chave de conta para o Método de autenticação.
- Selecione Testar conectividade para validar se as configurações estão corretas.
- Selecione Criar primeiro e clique em Publicar tudo para salvar suas alterações.
Você pode acessar dados no Armazenamento de Blobs do Azure com o Spark do Azure Synapse por meio da seguinte URL:
wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>
Veja um exemplo de código:
from pyspark.sql import SparkSession
# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name
blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Allow SPARK to access from Blob remotely
wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)
spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name
val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)
var blob_account_name = ""; // replace with your blob name
var blob_container_name = ""; // replace with your container name
var blob_relative_path = ""; // replace with your relative folder path
var linked_service_name = ""; // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);
spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);
var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";
Console.WriteLine(wasbs_path);
# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name
blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)
print( paste('Remote blob path: ',wasb_path))
Configurar o acesso ao Azure Key Vault
Você pode adicionar um Azure Key Vault como um serviço vinculado para gerenciar suas credenciais no Azure Synapse. Siga estas etapas para adicionar um Azure Key Vault como um serviço vinculado do Azure Synapse:
Abra o Azure Synapse Studio.
Selecione Gerenciar no painel esquerdo e selecione Serviços vinculados em Conexões externas.
Pesquise Azure Key Vault no painel de Novo serviço vinculado à direita.
Selecione a conta do Azure Key Vault para acessar e configurar o nome do serviço vinculado.
Selecione Testar conectividade para validar se as configurações estão corretas.
Selecione Criar primeiro e clique em Publicar tudo para salvar suas alterações.
Os notebooks do Synapse usam a passagem do Microsoft Entra para acessar o Azure Key Vault. Os pipelines do Azure Synapse usam a identidade do workspace (MSI) para acessar o Azure Key Vault. Para garantir que seu código funcione tanto no notebook quanto no pipeline do Synapse, recomendamos conceder permissão de acesso secreto para sua conta do Microsoft Entra e a identidade do workspace.
Siga estas etapas para conceder acesso secreto à sua identidade de espaço de trabalho:
- Abra o portal do Azure e o Azure Key Vault que você deseja acessar.
- Selecione as Políticas de acesso no painel esquerdo.
- Selecione Adicionar Política de Acesso:
- Escolha Gerenciamento de Certificados, Chaves e Segredos como modelo de configuração.
- Selecione sua conta do Microsoft Entra e sua identidade do workspace (igual ao nome do workspace) na entidade de segurança selecionada ou verifique se ela já está atribuída.
- Selecione Selecionar e Adicionar.
- Selecione o botão Salvar para confirmar as alterações.
Utilitários do sistema de arquivos
mssparkutils.fs
fornece utilitários para trabalhar com vários sistemas de arquivos, incluindo o ADLS Gen2 (Azure Data Lake Storage Gen2) e o Armazenamento de Blobs do Azure. Configure o acesso ao Azure Data Lake Storage Gen2 e ao Armazenamento de Blobs do Azure adequadamente.
Execute os seguintes comandos para obter uma visão geral dos métodos disponíveis:
from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()
Resulta em:
mssparkutils.fs provides utilities for working with various FileSystems.
Below is overview about the available methods:
cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
Use mssparkutils.fs.help("methodName") for more info about a method.
Listar arquivos
Liste o conteúdo de um diretório.
mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")
Exibir propriedades de arquivo
Retorna as propriedades do arquivo, incluindo o nome do arquivo, o caminho do arquivo, o tamanho do arquivo, a hora de modificação do arquivo e se ele é um diretório e um arquivo.
files = mssparkutils.fs.ls('Your directory path')
for file in files:
print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}
Criar um diretório
Cria o diretório especificado se ele não existir e todos os diretórios pai necessários.
mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")
Copiar arquivo
Copia um arquivo ou diretório. Oferece suporte à cópia em sistemas de arquivos.
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)
Arquivo de cópia com desempenho
Esse método fornece uma maneira mais rápida de copiar ou mover arquivos, especialmente grandes volumes de dados.
mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively
Observação
O método só dá suporte ao Azure Synapse Runtime para Apache Spark 3.3 e ao Azure Synapse Runtime para Apache Spark 3.4.
Visualizar o conteúdo do arquivo
Retorna até os primeiros “maxBytes” bytes do arquivo fornecido como uma cadeia de caracteres codificada em UTF-8.
mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)
Mover arquivo
Move um arquivo ou diretório. Oferece suporte à movimentação entre sistemas de arquivos.
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
Gravar arquivo
Grava a cadeia de caracteres fornecida em um arquivo, codificada em UTF-8.
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
Acrescentar conteúdo a um arquivo
Acrescenta a cadeia de caracteres especificada a um arquivo, codificada em UTF-8.
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
Observação
mssparkutils.fs.append()
e mssparkutils.fs.put()
não suportam gravação simultânea no mesmo arquivo devido à falta de garantias de atomicidade.
Excluir arquivo ou diretório
Remove um arquivo ou um diretório.
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
Utilitários de notebook
Sem suporte.
Use os Utilitários de Notebook MSSparkUtils para executar ou sair de um notebook com um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:
mssparkutils.notebook.help()
Obtenha resultados:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Observação
Os utilitários de notebook não são aplicáveis para definições de trabalho do Apache Spark (SJD).
Referenciar um notebook
Referencia um notebook e retorna seu valor de saída. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline. O notebook que está sendo referenciado será executado no pool do Spark em que o notebook chama essa função.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Por exemplo:
mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })
Após a conclusão da execução, você verá um link de instantâneo chamado 'Exibir execução do notebook: Nome do Notebook' mostrado na saída da célula. Clique no link para ver o instantâneo para esta execução específica.
A referência executa vários notebooks paralelamente
O método mssparkutils.notebook.runMultiple()
permite executar vários notebooks paralelamente ou com uma estrutura topológica predefinida. A API está usando um mecanismo de implementação de vários threads em uma sessão do Spark, o que significa que os recursos de computação são compartilhados pelas execuções do notebook de referência.
Com mssparkutils.notebook.runMultiple()
, você pode:
Execute vários notebooks simultaneamente, sem aguardar a conclusão de cada um.
Especifique as dependências e a ordem de execução para seus notebooks usando um formato JSON simples.
Otimize o uso dos recursos de computação do Spark e reduza o custo de seus projetos do Synapse.
Exiba os Instantâneos de cada registro de execução do notebook na saída e depure/monitore as tarefas do notebook convenientemente.
Obtenha o valor de saída de cada atividade executiva e use-as em tarefas downstream.
Você também pode tentar executar o mssparkutils.notebook.help("runMultiple") para localizar o exemplo e o uso detalhado.
Este é um exemplo simples de como executar uma lista de notebooks paralelamente usando este método:
mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])
O resultado da execução do notebook raiz é o seguinte:
Veja a seguir um exemplo de execução de notebooks com estrutura topológica usando mssparkutils.notebook.runMultiple()
. Use esse método para orquestrar facilmente notebooks por meio de uma experiência de código.
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "NotebookSimple", # activity name, must be unique
"path": "NotebookSimple", # notebook path
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
},
{
"name": "NotebookSimple2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200}
},
{
"name": "NotebookSimple2.2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
}
]
}
mssparkutils.notebook.runMultiple(DAG)
Observação
Sair de um notebook
Sai de um notebook com um valor. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline.
Quando você chama uma função exit() de um notebook interativamente, o Azure Synapse gera uma exceção, ignora as células de subsequência em execução e mantém a sessão do Spark ativa.
Quando você orquestra um notebook que chama uma função
exit()
em um pipeline do Synapse, o Azure Synapse retorna um valor de saída, conclui a execução de pipeline e interrompe a sessão do Spark.Quando você chama uma função
exit()
em um notebook que está sendo referenciado, o Azure Synapse interrompe a execução adicional no notebook em questão e continua executando as próximas células no notebook que chamam a funçãorun()
. Por exemplo: Notebook1 tem três células e chama uma funçãoexit()
na segunda célula. O Notebook2 tem cinco células e chamarun(notebook1)
na terceira célula. Quando você executa Notebook2, Notebook1 é interrompido na segunda célula quando ocorre a funçãoexit()
. Notebook2 continuará a executar a quarta célula e a quinta célula.
mssparkutils.notebook.exit("value string")
Por exemplo:
O notebook Sample1 fica em folder/ com estas duas células:
- A célula 1 define um parâmetro de entrada com valor padrão 10.
- A célula 2 sai do notebook com entrada como valor de saída.
Você pode executar Sample1 em outro notebook com valores padrão:
exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)
Resulta em:
Sample1 run success with input is 10
Você pode executar Sample1 em outro notebook e definir o valor de entrada como 20:
exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)
Resulta em:
Sample1 run success with input is 20
Use os Utilitários de Notebook MSSparkUtils para executar ou sair de um notebook com um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:
mssparkutils.notebook.help()
Obtenha resultados:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Referenciar um notebook
Referencia um notebook e retorna seu valor de saída. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline. O notebook que está sendo referenciado será executado no pool do Spark em que o notebook chama essa função.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Por exemplo:
mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))
Após a conclusão da execução, você verá um link de instantâneo chamado 'Exibir execução do notebook: Nome do Notebook' mostrado na saída da célula. Clique no link para ver o instantâneo para esta execução específica.
Sair de um notebook
Sai de um notebook com um valor. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline.
Quando você chama uma função
exit()
em um notebook interativamente, o Azure Synapse gera uma exceção, ignora as células de subsequência em execução e mantém a sessão do Spark ativa.Quando você orquestra um notebook que chama uma função
exit()
em um pipeline do Synapse, o Azure Synapse retorna um valor de saída, conclui a execução de pipeline e interrompe a sessão do Spark.Quando você chama uma função
exit()
em um notebook que está sendo referenciado, o Azure Synapse interrompe a execução adicional no notebook em questão e continua executando as próximas células no notebook que chamam a funçãorun()
. Por exemplo: Notebook1 tem três células e chama uma funçãoexit()
na segunda célula. O Notebook2 tem cinco células e chamarun(notebook1)
na terceira célula. Quando você executa Notebook2, Notebook1 é interrompido na segunda célula quando ocorre a funçãoexit()
. Notebook2 continuará a executar a quarta célula e a quinta célula.
mssparkutils.notebook.exit("value string")
Por exemplo:
O notebook Sample1 fica em mssparkutils/folder/ com estas duas células:
- A célula 1 define um parâmetro de entrada com valor padrão 10.
- A célula 2 sai do notebook com entrada como valor de saída.
Você pode executar Sample1 em outro notebook com valores padrão:
val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)
Resulta em:
exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10
Você pode executar Sample1 em outro notebook e definir o valor de entrada como 20:
val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)
Resulta em:
exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20
Use os Utilitários de Notebook MSSparkUtils para executar ou sair de um notebook com um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:
mssparkutils.notebook.help()
Obtenha resultados:
The notebook module.
exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.
Referenciar um notebook
Referencia um notebook e retorna seu valor de saída. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline. O notebook que está sendo referenciado será executado no pool do Spark em que o notebook chama essa função.
mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)
Por exemplo:
mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))
Após a conclusão da execução, você verá um link de instantâneo chamado 'Exibir execução do notebook: Nome do Notebook' mostrado na saída da célula. Clique no link para ver o instantâneo para esta execução específica.
Sair de um notebook
Sai de um notebook com um valor. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline.
Quando você chama uma função
exit()
em um notebook interativamente, o Azure Synapse gera uma exceção, ignora as células de subsequência em execução e mantém a sessão do Spark ativa.Quando você orquestra um notebook que chama uma função
exit()
em um pipeline do Synapse, o Azure Synapse retorna um valor de saída, conclui a execução de pipeline e interrompe a sessão do Spark.Quando você chama uma função
exit()
em um notebook que está sendo referenciado, o Azure Synapse interrompe a execução adicional no notebook em questão e continua executando as próximas células no notebook que chamam a funçãorun()
. Por exemplo: Notebook1 tem três células e chama uma funçãoexit()
na segunda célula. O Notebook2 tem cinco células e chamarun(notebook1)
na terceira célula. Quando você executa Notebook2, Notebook1 é interrompido na segunda célula quando ocorre a funçãoexit()
. Notebook2 continuará a executar a quarta célula e a quinta célula.
mssparkutils.notebook.exit("value string")
Por exemplo:
O notebook Sample1 fica em folder/ com estas duas células:
- A célula 1 define um parâmetro de entrada com valor padrão 10.
- A célula 2 sai do notebook com entrada como valor de saída.
Você pode executar Sample1 em outro notebook com valores padrão:
exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)
Resulta em:
Sample1 run success with input is 10
Você pode executar Sample1 em outro notebook e definir o valor de entrada como 20:
exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)
Resulta em:
Sample1 run success with input is 20
Utilitários de credenciais
Você pode usar os Utilitários de credenciais do MSSparkUtils para obter os tokens de acesso de serviços vinculados e gerenciar segredos no Azure Key Vault.
Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:
mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()
Obter resultado:
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
Observação
Atualmente, não há suporte para getSecretWithLS(linkedService, secret) no C#.
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
Obter o token
Retorna o token do Microsoft Entra para um determinado público-alvo, nome (opcional). A tabela abaixo lista todos os tipos de público disponíveis:
Tipo de público | Literal de cadeia de caracteres a ser usado na chamada à API |
---|---|
Armazenamento do Azure | Storage |
Cofre de Chave do Azure | Vault |
Gerenciamento do Azure | AzureManagement |
SQL do Azure Data Warehouse (dedicado e sem servidor) | DW |
Azure Synapse | Synapse |
Repositório Azure Data Lake | DataLakeStore |
Fábrica de dados do Azure | ADF |
Azure Data Explorer | AzureDataExplorer |
Banco de Dados do Azure para MySQL | AzureOSSDB |
Banco de Dados do Azure para MariaDB | AzureOSSDB |
Banco de Dados do Azure para PostgreSQL | AzureOSSDB |
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')
Validar token
Retorna true se o token não tiver expirado.
mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')
Obter cadeia de conexão ou credenciais para o serviço vinculado
Retorna a cadeia de conexão ou credenciais para o serviço vinculado.
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
Obter o segredo usando a identidade do workspace
Retorna o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando a identidade do workspace. Configure o acesso ao Azure Key Vault adequadamente.
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
Obter segredo usando as credenciais do usuário
Retorna o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando as credenciais do usuário.
mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')
Colocar o segredo usando a identidade do workspace
Coloca o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando a identidade do workspace. Configure o acesso ao Azure Key Vault adequadamente.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')
Colocar o segredo usando a identidade do workspace
Coloca o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando a identidade do workspace. Configure o acesso ao Azure Key Vault adequadamente.
mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")
Colocar o segredo usando a identidade do workspace
Coloca o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando a identidade do workspace. Configure o acesso ao Azure Key Vault adequadamente.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')
Colocar o segredo usando as credenciais do usuário
Coloca o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando as credenciais do usuário.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')
Colocar o segredo usando as credenciais do usuário
Coloca o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando as credenciais do usuário.
mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')
Colocar o segredo usando as credenciais do usuário
Coloca o segredo do Azure Key Vault para um determinado nome do Azure Key Vault, nome do segredo e nome do serviço vinculado usando as credenciais do usuário.
mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")
Utilitários de ambiente
Execute os seguintes comandos para obter uma visão geral dos métodos disponíveis:
mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()
Obter resultado:
getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id
Obter nome de usuário
Retorna o nome de usuário atual.
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()
Obter ID de usuário
Retorna a ID de usuário atual.
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()
Obter ID do trabalho
Retorna a ID do trabalho.
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()
Obter o nome do workspace
Retorna o nome do workspace.
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()
Obter nome do pool
Retorna o nome do pool do Spark.
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()
Obter a ID do cluster
Retorna a ID do cluster atual.
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()
Contexto de runtime
Os utilitários de runtime Mssparkutils expuseram 3 propriedades de runtime. Você pode usar o contexto de runtime Mssparkutils para obter as propriedades listadas como abaixo:
- Notebookname – Nome do notebook atual, sempre retornará um valor para os modos interativo e de pipeline.
- Pipelinejobid – ID de execução de pipeline, retornará valor no modo de pipeline e retornará uma cadeia de caracteres vazia no modo interativo.
- Activityrunid – ID da execução de atividade do notebook, retornará valor no modo de pipeline e retornará uma cadeia de caracteres vazia no modo interativo.
Atualmente, o contexto de runtime oferece suporte a Python e Scala.
mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context
Gerenciamento da sessão
Interromper uma sessão interativa
Em vez de clicar manualmente no botão Parar, às vezes é mais conveniente interromper uma sessão interativa chamando uma API no código. Para esses casos, fornecemos uma API mssparkutils.session.stop()
para dar suporte à interrupção da sessão interativa por meio do código, ela está disponível para Scala e Python.
mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()
A API interromperá a sessão interativa atual de modo assíncrono em segundo plano, interromperá a sessão do Spark e liberará os recursos ocupados pela sessão para que eles estejam disponíveis para outras sessões no mesmo pool.
Observação
Não é recomendável chamar APIs internas de linguagem, como sys.exit
no Scala ou sys.exit()
no Python em seu código, porque essas APIs simplesmente encerram o processo do interpretador, deixando a sessão do Spark ativa e os recursos não liberados.
Dependências do pacote
Se você deseja desenvolver notebooks ou trabalhos localmente e precisa fazer referência aos pacotes relevantes para dicas de compilação/IDE, você pode usar os seguintes pacotes.