Compartilhar via


NotebookUtils (antigo MSSparkUtils) para Fabric

Notebook Utilities (NotebookUtils) é um pacote interno que ajuda a executar facilmente tarefas comuns no Fabric Notebook. Você pode usar o NotebookUtils para trabalhar com sistemas de arquivos, obter variáveis de ambiente, encadear notebooks e trabalhar com segredos. O pacote NotebookUtils está disponível no PySpark (Python) Scala, notebooks do SparkR e pipelines do Fabric.

Note

  • O MsSparkUtils é renomeado oficialmente para NotebookUtils. O código existente permanece compatível com versões anteriores e não causa alterações significativas. É altamente recomendável atualizar para notebookutils para garantir o suporte contínuo e o acesso a novos recursos. O namespace mssparkutils será desativado no futuro.
  • O NotebookUtils foi projetado para funcionar com o Spark 3.4 (Runtime v1.2) e versões posteriores. Todos os novos recursos e atualizações têm suporte exclusivo com o namespace notebookutils daqui para frente.

Utilitários do sistema de arquivos

Notebookutils.fs fornece utilitários para trabalhar com vários sistemas de arquivos, incluindo o Azure Data Lake Storage (ADLS) Gen2 e o Armazenamento de Blobs do Azure. Configure o acesso ao Azure Data Lake Storage Gen2 e ao Armazenamento de Blobs do Azure adequadamente.

Execute os seguintes comandos para obter uma visão geral dos métodos disponíveis:

notebookutils.fs.help()

Output

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

O NotebookUtils funciona com o sistema de arquivos da mesma forma que as APIs do Spark. Use notebookutils.fs.mkdirs() e Fabric lakehouse, por exemplo:

Usage Caminho relativo da raiz do HDFS Caminho absoluto para o sistema de arquivos ABFS Caminho absoluto para o sistema de arquivos local no nó do driver
Lakehouse não padrão Sem suporte notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
Lakehouse padrão Diretório em "Arquivos" ou "Tabelas": notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("file:/<new_dir>")
  • Para o Lakehouse padrão, os caminhos de arquivo são montados em seu Notebook com um tempo limite de cache de arquivo padrão de 120 segundos. Isso significa que os arquivos são armazenados em cache na pasta temporária local do Notebook por 120 segundos, mesmo que sejam removidos do Lakehouse. Se você quiser alterar a regra de tempo limite, poderá desmontar os caminhos de arquivo padrão do Lakehouse e montá-los novamente com um valor fileCacheTimeout diferente.

  • Para configurações não padrão do Lakehouse, você pode definir o parâmetro fileCacheTimeout apropriado durante a montagem dos caminhos lakehouse. Definir o tempo limite como 0 garante que o arquivo mais recente seja buscado do servidor Lakehouse.

Listar arquivos

Para listar o conteúdo de um diretório, use notebookutils.fs.ls("O caminho do seu diretório"). Por exemplo:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

A API notebookutils.fs.ls() se comporta de forma diferente ao usar o caminho relativo, dependendo do tipo de notebook.

  • Em um notebook do Spark: o caminho relativo se refere ao caminho ABFSS padrão do Lakehouse. Por exemplo, notebookutils.fs.ls("Files") aponta para o diretório Files no Lakehouse padrão.

    Por exemplo:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • Em um notebook do Python: o caminho relativo está relacionado ao diretório de trabalho do sistema de arquivos local, que por padrão é /home/trusted-service-user/work. Portanto, você deve usar o caminho completo em vez de um caminho relativo notebookutils.fs.ls("/lakehouse/default/Files") para acessar o diretório Files no Lakehouse padrão.

    Por exemplo:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Exibir propriedades de arquivo

Retorna as propriedades do arquivo, incluindo o nome, o caminho e o tamanho do arquivo e se ele é um diretório e um arquivo.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Criar um diretório

Cria o diretório especificado se ele não existir e todos os diretórios pai necessários.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Copiar arquivo

Esse método copia um arquivo ou diretório e dá suporte à atividade de cópia em sistemas de arquivos. Configuramos recurse=True para copiar todos os arquivos e diretórios recursivamente.

notebookutils.fs.cp('source file or directory', 'destination file or directory', recurse=True)

Note

Devido às limitações de atalho do OneLake, quando você precisa usar notebookutils.fs.cp() para copiar dados do atalho do tipo S3/GCS, é recomendável usar um caminho montado em vez de um caminho abfss.

Arquivo de cópia com desempenho

Esse método oferece uma abordagem mais eficiente para copiar ou mover arquivos, principalmente ao lidar com grandes volumes de dados. Para melhorar o desempenho no Fabric, é aconselhável utilizar fastcp como substituto do método tradicional cp.

notebookutils.fs.fastcp('source file or directory', 'destination file or directory', recurse=True)

Considerations:

  • notebookutils.fs.fastcp() não dá suporte à cópia de arquivos no OneLake entre regiões. Nesse caso, você pode usar notebookutils.fs.cp().
  • Devido às limitações de atalho do OneLake, quando você precisa usar notebookutils.fs.fastcp() para copiar dados do atalho do tipo S3/GCS, é recomendável usar um caminho montado em vez de um caminho abfss.

Visualizar o conteúdo do arquivo

Este método retorna até os primeiros "maxBytes" bytes do arquivo fornecido como uma string codificada em UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Mover arquivo

Esse método move um arquivo ou diretório e dá suporte a movimentações entre sistemas de arquivos.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Gravar arquivo

Esse método grava a cadeia de caracteres fornecida em um arquivo, codificada em UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Acrescentar conteúdo a um arquivo

Esse método acrescenta a cadeia de caracteres especificada a um arquivo, codificada em UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Considerations:

  • notebookutils.fs.append() e notebookutils.fs.put() não dão suporte à gravação simultânea no mesmo arquivo devido à falta de garantias de atomicidade.
  • Ao usar a API notebookutils.fs.append em um loop for para gravar no mesmo arquivo, recomendamos adicionar uma instrução sleep em torno de 0,5 s ~ 1 s entre as gravações recorrentes. Essa recomendação ocorre porque a operação de notebookutils.fs.append interna da API flush é assíncrona, portanto, um pequeno atraso ajuda a garantir a integridade dos dados.

Excluir arquivo ou diretório

Esse método remove um arquivo ou diretório. Definimos recurse=True para remover todos os arquivos e diretórios recursivamente.

notebookutils.fs.rm('file path', recurse=True) 

Diretório de montagem/desmontagem

Obtenha mais informações sobre o uso detalhado em Montagem e desmontagem de arquivos.

Utilitários de bloco de anotações

Use os Utilitários de Notebook para executar um notebook ou sair de um notebook com um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:

notebookutils.notebook.help()

Output:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> This method check if the DAG is correctly defined.

Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Note

Os utilitários de notebook não são aplicáveis para definições de trabalho do Apache Spark (SJD).

Referenciar um notebook

Esse método faz referência a um notebook e retorna seu valor de saída. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline. O notebook que está sendo referenciado é executado no pool do Spark em que o notebook chama essa função.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Por exemplo:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

O bloco de anotações do Fabric também dá suporte à referência de notebooks em vários workspaces especificando a ID do workspace.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Você pode abrir o link de instantâneo da execução de referência na saída da célula. O instantâneo captura os resultados da execução de código e permite que você depure facilmente uma execução de referência.

Captura de tela do resultado de execução de referência.

Captura de tela de um exemplo de instantâneo.

Considerations:

  • O notebook de referência entre workspaces é compatível com o runtime versão 1.2 e superior.
  • Se você usar os arquivos em Recurso do Notebook, use notebookutils.nbResPath no notebook referenciado para garantir que ele aponte para a mesma pasta que a execução interativa.
  • A execução de referência permite que os blocos de anotações filho sejam executados somente se eles usarem a mesma casa de lago que o pai, herdar a casa do pai ou se nenhum deles definir um. A execução será bloqueada se o filho especificar um lakehouse diferente para o bloco de anotações pai. Para ignorar essa verificação, defina useRootDefaultLakehouse: True.

A referência executa vários notebooks paralelamente

O método notebookutils.notebook.runMultiple() permite executar vários notebooks paralelamente ou com uma estrutura topológica predefinida. A API está usando um mecanismo de implementação de vários threads em uma sessão do Spark, o que significa que o notebook de referência compartilha os recursos de computação.

Com notebookutils.notebook.runMultiple(), você pode:

  • Execute vários notebooks simultaneamente, sem aguardar a conclusão de cada um.

  • Especifique as dependências e a ordem de execução para seus notebooks usando um formato JSON simples.

  • Otimize o uso de recursos de computação do Spark e reduza o custo dos seus projetos do Fabric.

  • Exiba os Instantâneos de cada registro de execução do notebook na saída e depure/monitore as tarefas do notebook convenientemente.

  • Obtenha o valor de saída de cada atividade executiva e use-as em tarefas downstream.

Você também pode tentar executar o notebookutils.notebook.help("runMultiple") para localizar o exemplo e o uso detalhado.

Este é um exemplo simples de como executar uma lista de notebooks paralelamente usando este método:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

O resultado da execução do notebook raiz é o seguinte:

Captura de tela de referência de uma lista de notebooks.

Aqui está um exemplo de execução de blocos de anotações com estrutura topológica usando notebookutils.notebook.runMultiple(). Use esse método para orquestrar facilmente notebooks por meio de uma experiência de código.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

O resultado da execução do notebook raiz é o seguinte:

Captura de tela de referência de uma lista de notebooks com parâmetros.

Também oferecemos um método para verificar se o DAG está definido corretamente.

notebookutils.notebook.validateDAG(DAG)

Considerations:

  • O grau de paralelismo da execução de vários notebooks é restrito ao recurso de computação total disponível de uma sessão do Spark.
  • O número padrão de notebooks simultâneos é 50 para o notebook Spark, enquanto o padrão é 25 para o Python Notebook. Você pode personalizar esse valor, mas o paralelismo excessivo pode levar a problemas de estabilidade e desempenho devido ao alto uso de recursos de computação. Se surgirem problemas, considere separar blocos de anotações em várias runMultiple chamadas ou reduzir a simultaneidade ajustando o campo de simultaneidade no parâmetro DAG.
  • O tempo limite padrão para todo o DAG é de 12 horas, e o tempo limite padrão para cada célula no notebook secundário é de 90 segundos. Você pode alterar o tempo limite definindo os campos timeoutInSeconds e timeoutPerCellInSeconds no parâmetro DAG.

Sair de um notebook

Esse método sai de um notebook com um valor. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline.

  • Quando você chama uma função exit() de um notebook interativamente, o bloco de anotações do Fabric gera uma exceção, ignora a execução de células subsequentes e mantém a sessão spark ativa.

  • Quando você orquestra um notebook em um pipeline que chama uma função exit(), a atividade do notebook retorna com um valor de saída. Isso conclui a execução do pipeline e interrompe a sessão do Spark.

  • Quando você chama uma função exit() em um notebook que está sendo referenciado, o Fabric Spark interromperá a execução adicional do notebook referenciado e continuará executando as próximas células no notebook principal que chama a função run( ). Por exemplo: o Notebook1 tem três células e chama uma função exit() na segunda célula. O Notebook2 tem cinco células e chamadas executadas(notebook1) na terceira célula. Quando você executa o Notebook2, o Notebook1 para na segunda célula ao atingir a função exit( ). Notebook2 continua a executar a quarta célula e a quinta célula.

notebookutils.notebook.exit("value string")

Note

A função exit() substitui a saída da célula atual. Para evitar perder a saída de outras instruções de código, chame notebookutils.notebook.exit() em uma célula separada.

Por exemplo:

Bloco de anotações de exemplo1 com duas células a seguir:

  • A célula 1 define um parâmetro de entrada com o valor padrão definido como 10.

  • A célula 2 sai do notebook com a entrada como valor de saída.

Captura de tela mostrando um notebook de exemplo da função de saída.

Você pode executar o Sample1 em outro notebook com valores padrão:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Output:

Notebook is executed successfully with exit value 10

Você pode executar o Sample1 em outro bloco de anotações e definir o valor de entrada como 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Output:

Notebook is executed successfully with exit value 20

Gerenciar artefatos de notebook

notebookutils.notebook fornece utilitários especializados para gerenciar itens do Notebook programaticamente. Essas APIs ajudam a criar, obter, atualizar e excluir itens do Notebook com facilidade.

Para utilizar esses métodos de forma eficaz, considere os seguintes exemplos de uso:

Criar um Caderno

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Obter conteúdo de um Notebook

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Atualizar um Notebook

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Excluindo um notebook

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Listando notebooks em um workspace

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Utilitários de função de dados do usuário (UDF)

notebookutils.udf fornece utilitários projetados para integrar o código do Notebook com Funções de Dados do Usuário (UDFs). Esses utilitários permitem que você acesse funções de um item UDF dentro do mesmo espaço de trabalho ou em espaços de trabalho diferentes. Você pode então invocar funções dentro de um item UDF conforme necessário.

Aqui estão alguns exemplos de como usar os utilitários UDF:

# Get functions from a UDF item
myFunctions = notebookutils.udf.getFunctions('UDFItemName')
# Or from another workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')

# Display function and item details
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)

# Invoke a function
myFunctions.functionName('value1', 'value2')
# Or with named parameters
myFunctions.functionName(parameter1='value1', parameter2='value2')

Recuperar funções de uma UDF

myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")

Invocar uma função

myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)

Exibir detalhes de um item UDF

display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()

Exibir detalhes da função para um UDF

display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()

Utilitários de credenciais

Você pode usar os Utilitários de credenciais para obter tokens de acesso e gerenciar segredos em um Azure Key Vault.

Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:

notebookutils.credentials.help()

Output:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Obter o token

getToken retorna um token do Microsoft Entra para um determinado público e nome (opcional). A lista a seguir mostra as chaves de público disponíveis no momento:

  • Recurso de Audiência de Armazenamento: "armazenamento"
  • Recurso do Power BI: "pbi"
  • Recurso do Azure Key Vault: "keyvault"
  • Recurso de BD KQL do Synapse RTA: "kusto"

Execute o seguinte comando para obter o token:

notebookutils.credentials.getToken('audience Key')

Considerations:

  • Escopos de token com 'pbi' como público-alvo podem mudar ao longo do tempo. No momento, há suporte para os escopos a seguir.

  • Quando você chama notebookutils.credentials.getToken("pbi"), o token retornado terá escopo limitado se o notebook estiver em execução em uma entidade de serviço. O token não tem o escopo de serviço completo do Fabric. Se o notebook estiver em execução sob a identidade do usuário, o token ainda terá o escopo completo do serviço Fabric, mas isso poderá mudar com melhorias de segurança. Para garantir que o token tenha o escopo de serviço completo do Fabric, use a autenticação MSAL em vez da API notebookutils.credentials.getToken . Para obter mais informações, consulte Authenticate with Microsoft Entra ID.

  • Veja a seguir a lista de escopos que o token tem ao chamar notebookutils.credentials.getToken com a chave de audiência pbi na identidade da entidade de serviço:

    • Lakehouse.ReadWrite.All
    • MLExperiment.ReadWrite.All
    • MLModel.ReadWrite.All
    • Notebook.ReadWrite.All
    • SparkJobDefinition.ReadWrite.All
    • Workspace.ReadWrite.All
    • Dataset.ReadWrite.All

Obter segredo

getSecret retorna um segredo do Azure Key Vault para um determinado ponto de extremidade e nome de segredo do Azure Key Vault usando credenciais de usuário.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Montagem e desmontagem de arquivos

O Fabric dá suporte aos seguintes cenários de montagem no pacote Microsoft Spark Utilities. Você pode usar as APIs mount, unmount, getMountPath()e mounts() para anexar o armazenamento remoto (ADLS Gen2) a todos os nós de trabalho (nós de driver e nós de trabalho). Depois que o ponto de montagem do armazenamento estiver configurado, use uma API de arquivo local para acessar dados como se estivessem armazenados no sistema de arquivos local.

Como montar uma conta do ADLS Gen2

O exemplo a seguir ilustra como montar o Azure Data Lake Storage Gen2. A montagem do Armazenamento de Blobs e do Compartilhamento de Arquivos do Azure funciona da mesma forma.

Este exemplo pressupõe que você tenha uma conta do Data Lake Storage Gen2 chamada storegen2, que tem um contêiner chamado mycontainer que você deseja montar para /testar na sessão spark do notebook.

Captura de tela mostrando onde selecionar um contêiner para montar.

Para montar o contêiner chamado mycontainer, o notebookutils primeiro precisa verificar se você tem permissão para acessar o contêiner. Atualmente, o Fabric dá suporte a dois métodos de autenticação para a operação de montagem de gatilho: accountKey e sastoken.

Montagem por meio de token de assinatura de acesso compartilhado ou chave de conta

O NotebookUtils suporta a passagem explícita de uma chave de acesso à conta ou de um token de Assinatura de Acesso Compartilhado (SAS) como um parâmetro para realizar a montagem do destino.

Por motivos de segurança, recomendamos que você armazene chaves de conta ou tokens SAS no Azure Key Vault (como mostra a captura de tela de exemplo a seguir). Em seguida, você pode recuperá-los usando a API notebookutils.credentials.getSecret . Para obter mais informações, consulte Sobre as chaves de conta de armazenamento gerenciadas do Azure Key Vault.

Captura de tela mostrando onde os segredos ficam armazenados em um Azure Key Vault.

Código de exemplo para o método accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Código de exemplo para sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Parâmetros de montagem:

  • fileCacheTimeout: os blobs são armazenados em cache na pasta temporária local por 120 segundos por padrão. Durante esse tempo, o blobfuse não verifica se o arquivo está atualizado ou não. O parâmetro pode ser definido para alterar o tempo limite padrão. Quando vários clientes modificam arquivos ao mesmo tempo, para evitar inconsistências entre arquivos locais e remotos, recomendamos reduzir o tempo de cache ou até mesmo alterá-los para 0 e sempre obter os arquivos mais recentes do servidor.
  • tempo limite: o tempo limite da operação de montagem é de 120 segundos por padrão. O parâmetro pode ser definido para alterar o tempo limite padrão. Quando há muitos executores ou quando a montagem atinge o tempo limite, recomendamos aumentar o valor.

Você pode usar esses parâmetros como este:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Note

Por motivos de segurança, é aconselhável evitar a inclusão de credenciais diretamente no código. Para proteger ainda mais suas credenciais, todos os segredos exibidos nas saídas do notebook serão editados. Para obter mais informações, consulte a redação secreta.

Como montar um lakehouse

Código de exemplo para montar uma lakehouse em /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Acessar arquivos no ponto de montagem usando a API notebookutils fs

A principal finalidade da operação de montagem é permitir o acesso dos clientes aos dados armazenados em uma conta de armazenamento remoto por meio da API do sistema de arquivos local. Você também pode acessar os dados usando a API notebookutils fs com um caminho montado como parâmetro. Esse formato de caminho é um pouco diferente.

Suponha que você tenha montado o mycontainer de contêiner do Data Lake Storage Gen2 para /testar usando a API de montagem. Quando você acessa os dados usando a API do sistema de arquivos local, o formato do caminho tem a seguinte aparência:

/synfs/notebook/{sessionId}/test/{filename}

Quando você deseja acessar os dados usando a API notebookutils fs, recomendamos usar getMountPath() para obter o caminho preciso:

path = notebookutils.fs.getMountPath("/test")
  • Listar diretórios:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Ler conteúdo do arquivo:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Criar um diretório:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Acessar arquivos no ponto de montagem por meio do caminho local

Você pode ler e gravar facilmente os arquivos no ponto de montagem usando o sistema de arquivos padrão. Aqui está um exemplo do Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Como verificar pontos de montagem existentes

Você pode usar a API notebookutils.fs.mounts() para verificar todas as informações de ponto de montagem existentes:

notebookutils.fs.mounts()

Como desmontar o ponto de montagem

Use o código a seguir para desmontar o ponto de montagem (/teste neste exemplo):

notebookutils.fs.unmount("/test")

Limitações conhecidas

  • A montagem atual é uma configuração de nível de trabalho; Recomendamos que você use a API de montagem para verificar se um ponto de montagem existe ou não está disponível.

  • O mecanismo de desmontagem não é aplicado automaticamente. Quando a execução do aplicativo for concluída, para desmontar o ponto de montagem e liberar o espaço em disco, você precisará chamar explicitamente uma API de desmontagem em seu código. Caso contrário, o ponto de montagem ainda existirá no nó após a conclusão da execução do aplicativo.

  • Não há suporte para a montagem de uma conta de armazenamento do ADLS Gen1.

Utilitários lakehouse

notebookutils.lakehouse fornece ferramentas personalizadas para gerenciar itens do Lakehouse. Esses utilitários permitem criar, obter, atualizar e excluir artefatos do Lakehouse facilmente.

Visão geral dos métodos

Aqui está uma visão geral dos métodos disponíveis fornecidos pelo notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Create Lakehouse with Schema Support
create(name: String, description: String = "", definition: {"enableSchemas": True}): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Exemplos de uso

Para utilizar esses métodos de forma eficaz, considere os seguintes exemplos de uso:

Criar um Lakehouse

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")
# Create Lakehouse with Schema Support
artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", {"enableSchemas": True})

Obter um Lakehouse

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Atualizar um Lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Excluir um Lakehouse

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Listar os Lakehouses em um workspace

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Listar todas as tabelas em um Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Iniciar uma operação de carregamento de tabela em um Lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Informações adicionais

Para obter informações mais detalhadas sobre cada método e seus parâmetros, utilize a função notebookutils.lakehouse.help("methodName").

Utilitários de runtime

Mostrar as informações de contexto da sessão

Com notebookutils.runtime.context você pode obter as informações de contexto da sessão ao vivo atual, incluindo o nome do notebook, o lakehouse padrão, as informações do workspace, se se trata de uma execução de pipeline, etc.

notebookutils.runtime.context

A tabela a seguir descreve as propriedades.

Parameter Explanation
currentNotebookName O nome do notebook atual
currentNotebookId A ID exclusiva do notebook atual
currentWorkspaceName O nome do espaço de trabalho atual
currentWorkspaceId O identificador do espaço de trabalho atual
defaultLakehouseName O nome de exibição do lakehouse padrão, se definido
defaultLakehouseId O ID do lakehouse padrão, se definido
defaultLakehouseWorkspaceName O nome do workspace do lakehouse padrão, caso esteja definido
defaultLakehouseWorkspaceId O ID do workspace do lakehouse padrão, se definido
currentRunId Em uma execução de referência, a ID de execução atual
parentRunId Em uma execução de referência com execuções aninhadas, essa ID é a ID de execução pai
rootRunId Em uma execução de referência com execuções aninhadas, essa ID é a ID da execução raiz
isForPipeline Se a execução é para um pipeline
isReferenceRun Se a execução atual é uma execução de referência
referenceTreePath A estrutura de árvore de execuções de referência aninhadas, usada apenas para a hierarquia de instantâneos na página de monitoramento L2
rootNotebookId (Somente na execução de referência) O ID do notebook principal em uma execução de referência.
rootNotebookName (Somente na execução de referência) O nome do notebook raiz em uma execução de referência.
rootWorkspaceId (Somente na execução de referência) A ID do workspace do notebook raiz em uma execução de referência.
rootWorkspaceName (Somente na execução de referência) O nome do workspace do notebook raiz em uma execução de referência.
activityId A ID do trabalho do Livy para a atividade atual
hcRepId O ID do REPL no modo de alta concorrência
clusterId A identidade do cluster Spark do Synapse
poolName O nome do pool do Spark que está sendo usado
environmentId A ID do ambiente em que o trabalho está sendo executado
environmentWorkspaceId O ID do ambiente de trabalho
userId A ID do usuário atual
userName O nome de usuário do usuário atual

Gerenciamento de sessão

Parar uma sessão interativa

Em vez de clicar manualmente no botão parar, às vezes é mais conveniente interromper uma sessão interativa chamando uma API no código. Para esses casos, fornecemos uma notebookutils.session.stop() de API para dar suporte à interrupção da sessão interativa por meio do código, ela está disponível para Scala e PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API interrompe a sessão interativa atual de forma assíncrona em segundo plano. Ele também interrompe a sessão do Spark e libera os recursos ocupados pela sessão, portanto, eles estão disponíveis para outras sessões no mesmo pool.

Reiniciar o interpretador do Python

O utilitário notebookutils.session fornece uma maneira de reiniciar o interpretador do Python.

notebookutils.session.restartPython()

Considerations:

  • No caso de execução de referência do notebook, restartPython() apenas reinicia o interpretador de Python do notebook que está sendo referenciado.
  • Em casos raros, o comando pode falhar devido ao mecanismo de reflexão do Spark; adicionar uma tentativa pode atenuar o problema.

Utilitários de biblioteca de variáveis

Note

"Utilitários da Biblioteca de Variáveis" em Notebooks está em Versão Prévia.

As bibliotecas de variáveis permitem que você evite embutir valores fixos no código do notebook. Você pode atualizar os valores na biblioteca em vez de modificar o código. O notebook faz referência à biblioteca de variáveis para recuperar esses valores. Essa abordagem simplifica a reutilização de código entre equipes e projetos utilizando uma biblioteca gerenciada centralmente.

Execute os seguintes comandos para obter uma visão geral dos métodos disponíveis:

notebookutils.variableLibrary.help()

Output

[Preview] notebookutils.variableLibrary is a utility to Variable Library.

Below is overview about the available methods:

get(variableReference: String): String
-> Run the variable value with type.
getLibrary(variableLibraryName: String): VariableLibrary
-> Get the variable library.
Use notebookutils.variableLibrary.help("methodName") for more info about a method.

Definir a variável em sua Biblioteca de Variáveis

Defina as variáveis primeiro antes de usar notebookutils.variableLibrary.

Captura de tela da lista de variáveis na biblioteca de variáveis.

Recuperar a biblioteca de variáveis do Notebook

samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")

samplevl.test_int
samplevl.test_str
val samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")

samplevl.test_int
samplevl.test_str
samplevl <- notebookutils.variableLibrary.getLibrary("sampleVL")

samplevl.test_int
samplevl.test_str

Exemplo para usar dinamicamente a variável.

samplevl = notebookutils.variableLibrary.getLibrary("sampleVL")

file_path = f"abfss://{samplevl.Workspace_name}@onelake.dfs.fabric.microsoft.com/{samplevl.Lakehouse_name}.Lakehouse/Files/<FileName>.csv" 
df = spark.read.format("csv").option("header","true").load(file_path) 

display(df) 

Acessar uma única variável por referência

notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_int)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_str)")
notebookutils.variableLibrary.get("$(/**/samplevl/test_bool)")

Note

  • A notebookutils.variableLibrary API dá suporte apenas ao acesso a bibliotecas de variáveis no mesmo workspace.
  • Não há suporte para a recuperação de bibliotecas de variáveis em espaços de trabalho em notebooks filho durante uma execução de referência.
  • O código do notebook faz referência às variáveis definidas no conjunto de valores ativos da Biblioteca de Variáveis.

Problemas conhecidos

  • Ao usar a versão de runtime acima da 1.2 e executar notebookutils.help(), as APIs de FabricClient, PBIClient listadas por enquanto, estarão disponíveis posteriormente. Além disso, a API de Credenciais não tem suporte em notebooks Scala por enquanto.

  • O notebook Python não dá suporte às APIs stop, restartPython ao usar o utilitário notebookutils.session para gerenciamento de sessão.

  • Atualmente, não há suporte para SPN para utilitários de biblioteca de variáveis.