Partilhar via


NotebookUtils (antigo MSSparkUtils) para Fabric

O Notebook Utilities (NotebookUtils) é um pacote integrado para ajudá-lo a executar facilmente tarefas comuns no Fabric Notebook. Você pode usar NotebookUtils para trabalhar com sistemas de arquivos, para obter variáveis de ambiente, para encadear blocos de anotações e para trabalhar com segredos. O pacote NotebookUtils está disponível em notebooks PySpark (Python), Scala, SparkR e pipelines Fabric.

Nota

  • MsSparkUtils é oficialmente renomeado para NotebookUtils. O código existente permanece compatível com versões anteriores e não causa nenhuma alteração de quebra. É altamente recomendável atualizar para notebookutils para garantir suporte contínuo e acesso a novos recursos. O namespace mssparkutils será desativado no futuro.
  • NotebookUtils foi projetado para funcionar com o Spark 3.4 (Runtime v1.2) e superior. Todos os novos recursos e atualizações serão suportados exclusivamente com o namespace notebookutils a partir de agora.

Utilitários do sistema de arquivos

notebookutils.fs fornece utilitários para trabalhar com vários sistemas de arquivos, incluindo o Azure Data Lake Storage (ADLS) Gen2 e o Azure Blob Storage. Certifique-se de configurar o acesso ao Azure Data Lake Storage Gen2 e ao Armazenamento Blob do Azure adequadamente.

Execute os seguintes comandos para obter uma visão geral dos métodos disponíveis:

notebookutils.fs.help()

Realização

notebookutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
fastcp(from: String, to: String, recurse: Boolean = true): Boolean -> [Preview] Copies a file or directory via azcopy, possibly across FileSystems
mv(from: String, to: String, createPath: Boolean = false, overwrite: Boolean = false): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory
exists(file: String): Boolean -> Check if a file or directory exists
mount(source: String, mountPoint: String, extraConfigs: Map[String, Any]): Boolean -> Mounts the given remote storage directory at the given mount point
unmount(mountPoint: String): Boolean -> Deletes a mount point
mounts(): Array[MountPointInfo] -> Show information about what is mounted
getMountPath(mountPoint: String, scope: String = ""): String -> Gets the local path of the mount point

Use notebookutils.fs.help("methodName") for more info about a method.

O NotebookUtils funciona com o sistema de arquivos da mesma forma que as APIs do Spark. Tomemos como exemplo o uso de notebookutils.fs.mkdirs() e Fabric lakehouse:

Utilização Caminho relativo da raiz HDFS Caminho absoluto para o sistema de arquivos ABFS Caminho absoluto para o sistema de arquivos local no nó controlador
Casa do lago não padrão Não suportado notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("ficheiro:/<new_dir>")
Casa do lago padrão Diretório em 'Ficheiros' ou 'Tabelas': notebookutils.fs.mkdirs("Files/<new_dir>") notebookutils.fs.mkdirs("abfss://< container_name>@<storage_account_name.dfs.core.windows.net/>< new_dir>") notebookutils.fs.mkdirs("ficheiro:/<new_dir>")

Nota

  • Para o Lakehouse padrão, os caminhos de arquivo são montados em seu Bloco de Anotações com um tempo limite de cache de arquivos padrão de 120 segundos. Isso significa que os arquivos são armazenados em cache na pasta temporária local do Notebook por 120 segundos, mesmo que sejam removidos do Lakehouse. Se pretender alterar a regra de tempo limite, pode desligar os caminhos de ficheiro padrão do Lakehouse e montá-los novamente com um valor diferente de fileCacheTimeout .
  • Para configurações não padrão do Lakehouse, pode-se definir o parâmetro apropriado de fileCacheTimeout durante a montagem dos caminhos do Lakehouse. Definir o tempo limite como 0 garante que o arquivo mais recente seja buscado no servidor Lakehouse.

Listar ficheiros

Para listar o conteúdo de um diretório, use notebookutils.fs.ls('Seu caminho de diretório'). Por exemplo:

notebookutils.fs.ls("Files/tmp") # The relatvie path may work with different base path, details in below 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>")  # The absolute path, like: ABFS file system
notebookutils.fs.ls("file:/tmp")  # The full path of the local file system of driver node

A API notebookutils.fs.ls() comporta-se de forma diferente ao usar o caminho relativo, dependendo do tipo de notebook.

  • Num caderno do Spark: O caminho relativo refere-se ao caminho ABFSS padrão do Lakehouse. Por exemplo, notebookutils.fs.ls("Files") aponta para o diretório Files no Lakehouse padrão.

    Por exemplo:

    notebookutils.fs.ls("Files/sample_datasets/public_holidays.parquet")
    
  • Em um notebook Python: O caminho relativo é relativo ao diretório de trabalho do sistema de arquivos local, que por padrão é /home/trusted-service-user/work. Portanto, você deve usar o caminho completo em vez de um caminho relativo notebookutils.fs.ls("/lakehouse/default/Files") para acessar o diretório Files no Lakehouse padrão.

    Por exemplo:

    notebookutils.fs.ls("/lakehouse/default/Files/sample_datasets/public_holidays.parquet")
    

Ver propriedades do ficheiro

Esse método retorna as propriedades do arquivo, incluindo nome do arquivo, caminho do arquivo, tamanho do arquivo e se é um diretório e um arquivo.

files = notebookutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size)

Criar novo diretório

Esse método cria o diretório determinado, se ele não existir, e cria todos os diretórios pai necessários.

notebookutils.fs.mkdirs('new directory name')  
notebookutils.fs.mkdirs("Files/<new_dir>")  # works with the default lakehouse files using relative path 
notebookutils.fs.ls("abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<new_dir>")  # based on ABFS file system 
notebookutils.fs.ls("file:/<new_dir>")  # based on local file system of driver node 

Copiar ficheiro

Esse método copia um arquivo ou diretório e oferece suporte à atividade de cópia entre sistemas de arquivos.

notebookutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Nota

Devido às limitações de do atalhodo OneLake, quando for necessário usar notebookutils.fs.cp() para copiar dados de um atalho do tipo S3/GCS, é recomendável utilizar um caminho montado em vez de um caminho abfss.

Cópia de ficheiro eficiente

Este método oferece uma abordagem mais eficiente para copiar ou mover arquivos, particularmente ao lidar com grandes volumes de dados. Para melhorar o desempenho no tecido, é aconselhável utilizar fastcp como um substituto para o método tradicional cp .

Nota

  • notebookutils.fs.fastcp() não suporta a cópia de arquivos no OneLake entre regiões. Neste caso, você pode usar notebookutils.fs.cp() em vez disso.
  • Devido às limitações de do atalhodo OneLake, quando for necessário usar notebookutils.fs.fastcp() para copiar dados de um atalho do tipo S3/GCS, é recomendável utilizar um caminho montado em vez de um caminho abfss.
notebookutils.fs.fastcp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively

Visualizar conteúdo do arquivo

Este método retorna até os primeiros bytes 'maxBytes' do arquivo fornecido como uma String codificada em UTF-8.

notebookutils.fs.head('file path', maxBytes to read)

Mover o ficheiro

Esse método move um arquivo ou diretório e oferece suporte a movimentações entre sistemas de arquivos.

notebookutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
notebookutils.fs.mv('source file or directory', 'destination directory', True, True) # Set the third parameter to True to firstly create the parent directory if it does not exist. Set the last parameter to True to overwrite the updates.

Gravar arquivo

Este método grava a seqüência dada em um arquivo, codificado em UTF-8.

notebookutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Acrescentar conteúdo a um ficheiro

Este método acrescenta a cadeia de caracteres dada a um arquivo, codificado em UTF-8.

notebookutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Nota

  • notebookutils.fs.append() e notebookutils.fs.put() não suportam a escrita simultânea no mesmo ficheiro devido à falta de garantias de atomicidade.
  • Ao usar a API notebookutils.fs.append em um loop de for para gravar no mesmo arquivo, recomendamos adicionar uma instrução sleep em torno de 0,5 s ~ 1 s entre as gravações recorrentes. Essa recomendação ocorre porque a operação de notebookutils.fs.append interna da API flush é assíncrona, portanto, um pequeno atraso ajuda a garantir a integridade dos dados.

Excluir arquivo ou diretório

Esse método remove um arquivo ou diretório.

notebookutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Diretório de montagem/desmontagem

Encontre mais informações sobre o uso detalhado em Montagem e desmontagem de arquivos.

Utilitários para notebook

Use os Utilitários de Bloco de Anotações para executar um bloco de anotações ou sair de um bloco de anotações com um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:

notebookutils.notebook.help()

Saída:


The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map, workspace: String): String -> This method runs a notebook and returns its exit value.
runMultiple(DAG: Any): Map[String, MsNotebookRunResult] -> [Preview] Runs multiple notebooks concurrently with support for dependency relationships.
validateDAG(DAG: Any): Boolean -> [Preview] This method check if the DAG is correctly defined.

[Preview] Below methods are only support Fabric Notebook.
create(name: String, description: String = "", content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = ""): Artifact -> Create a new Notebook.
get(name: String, workspaceId: String = ""): Artifact -> Get a Notebook by name or id.
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact -> Update a Artifact by name.
delete(name: String, workspaceId: String = ""): Boolean -> Delete a Notebook by name.
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact] -> List all Notebooks in the workspace.
updateDefinition(name: String, content: String = "", defaultLakehouse: String = "", defaultLakehouseWorkspace: String = "", workspaceId: String = "") -> Update the definition of a Notebook.

Use notebookutils.notebook.help("methodName") for more info about a method.

Nota

Os utilitários de notebook não são aplicáveis para definições de trabalho do Apache Spark (SJD).

Referenciar um bloco de notas

Esse método faz referência a um bloco de anotações e retorna seu valor de saída. Você pode executar chamadas de função de aninhamento em um bloco de anotações interativamente ou em um pipeline. O notebook que está a ser referenciado é executado no pool Spark do notebook que invoca esta função.

notebookutils.notebook.run("notebook name", <timeoutSeconds>, <parameterMap>, <workspaceId>)

Por exemplo:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

O caderno no Fabric também oferece suporte à referência de cadernos em vários espaços de trabalho ao especificar a ID do espaço de trabalho.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Você pode abrir o link de instantâneo da execução de referência na saída da célula. O instantâneo captura os resultados da execução do código e permite fazer debug facilmente de uma execução de referência específica.

Captura de ecrã do resultado da execução de referência.

Captura de ecrã de um exemplo de instantâneo.

Nota

  • O bloco de anotações de referência entre espaços de trabalho é suportado pelas versões 1.2 e superiores do runtime.
  • Se utilizares os ficheiros em Recursos do Caderno, usa notebookutils.nbResPath no caderno referenciado para garantir que ele aponte para a mesma pasta da execução interativa.

Referência: executar vários cadernos em paralelo

Importante

Este recurso está em pré-visualização.

O método notebookutils.notebook.runMultiple() permite executar vários blocos de anotações em paralelo ou com uma estrutura topológica predefinida. A API está a usar um mecanismo de implementação multithread dentro de uma sessão Spark, o que significa que as execuções dos notebooks de referência partilham os recursos de computação.

Com notebookutils.notebook.runMultiple(), pode:

  • Execute vários notebooks simultaneamente, sem precisar esperar que cada um termine.

  • Especifique as dependências e a ordem de execução de seus blocos de anotações, usando um formato JSON simples.

  • Otimize o uso dos recursos de computação do Spark e reduza o custo de seus projetos do Fabric.

  • Visualize as capturas de ecrã de cada registro de execução do notebook na saída e depure/monitore convenientemente as tarefas do notebook.

  • Obtenha o valor de saída de cada atividade executiva e use-o em tarefas a jusante.

Você também pode tentar executar o notebookutils.notebook.help("runMultiple") para encontrar o exemplo e o uso detalhado.

Aqui está um exemplo simples de execução de uma lista de blocos de anotações em paralelo usando esse método:


notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

O resultado da execução do bloco de anotações raiz é o seguinte:

Captura de ecrã de uma lista de blocos de notas de referência.

Aqui está um exemplo de execução de blocos de anotações com estrutura topológica usando notebookutils.notebook.runMultiple(). Use esse método para orquestrar facilmente blocos de anotações por meio de uma experiência de código.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 50 # max number of notebooks to run concurrently, default to 50
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

O resultado da execução do bloco de anotações raiz é o seguinte:

Captura de ecrã referenciando uma lista de blocos de notas com parâmetros.

Também fornecemos um método para verificar se o DAG está definido corretamente.

notebookutils.notebook.validateDAG(DAG)

Nota

  • O grau de paralelismo da execução de vários blocos de anotações é restrito ao recurso de computação total disponível de uma sessão do Spark.
  • O limite superior para atividades de bloco de anotações ou blocos de anotações simultâneos é 50. Exceder esse limite pode levar a problemas de estabilidade e desempenho devido ao alto uso de recursos de computação. Se surgirem problemas, considere separar os blocos de anotações em várias runMultiple chamadas ou reduzir a simultaneidade ajustando o campo de simultaneidade no parâmetro DAG.
  • O tempo limite padrão para todo o DAG é de 12 horas e o tempo limite padrão para cada célula no bloco de anotações infantil é de 90 segundos. Você pode alterar o tempo limite definindo os campos timeoutInSeconds e timeoutPerCellInSeconds no parâmetro DAG.

Sair de um bloco de notas

Esse método sai de um bloco de anotações com um valor. Você pode executar chamadas de função de aninhamento em um bloco de anotações interativamente ou em um pipeline.

  • Quando chama uma função exit() de um notebook interativamente, o notebook Fabric lança uma exceção, ignora a execução das células subsequentes e mantém a sessão Spark viva.

  • Quando se orquestra um notebook num pipeline que chama uma função exit(), a atividade do notebook devolve com um valor de saída. Isso conclui a execução do pipeline e interrompe a sessão do Spark.

  • Quando chamas uma função exit() num bloco de notas que está a ser referenciado, o Fabric Spark interrompe a execução subsequente do bloco de notas referenciado e continua a executar as próximas células no bloco de notas principal que chama a função run(). Por exemplo: Notebook1 tem três células e chama uma função exit() na segunda célula. O Notebook2 tem cinco células e as chamadas são executadas (notebook1) na terceira célula. Quando você executa o Notebook2, o Notebook1 para na segunda célula ao pressionar a função exit( ). O Notebook2 continua a executar a sua quarta célula e a quinta célula.

notebookutils.notebook.exit("value string")

Nota

A função exit() substitui a saída da célula atual. Para evitar perder a saída de outras instruções de código, chame notebookutils.notebook.exit() em uma célula separada.

Por exemplo:

Exemplo1 bloco de notas com as seguintes duas células:

  • A célula 1 define um parâmetro de entrada com o valor padrão definido como 10.

  • A célula 2 sai do caderno com input como valor de saída.

Captura de tela mostrando um bloco de anotações de exemplo da função de saída.

Você pode executar o Sample1 em outro bloco de anotações com valores padrão:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Saída:

Notebook is executed successfully with exit value 10

Você pode executar o Sample1 em outro bloco de anotações e definir o valor de entrada como 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Saída:

Notebook is executed successfully with exit value 20

Gerenciar artefatos de bloco de anotações

notebookutils.notebook fornece utilitários especializados para gerenciar itens do Notebook programaticamente. Essas APIs podem ajudá-lo a criar, obter, atualizar e excluir itens do Bloco de Anotações facilmente.

Para utilizar esses métodos de forma eficaz, considere os seguintes exemplos de uso:

Criando um bloco de notas

with open("/path/to/notebook.ipynb", "r") as f:
    content = f.read()

artifact = notebookutils.notebook.create("artifact_name", "description", "content", "default_lakehouse_name", "default_lakehouse_workspace_id", "optional_workspace_id")

Obter o conteúdo de um Bloco de Notas

artifact = notebookutils.notebook.get("artifact_name", "optional_workspace_id")

Atualizar um bloco de notas

updated_artifact = notebookutils.notebook.update("old_name", "new_name", "optional_description", "optional_workspace_id")
updated_artifact_definition = notebookutils.notebook.updateDefinition("artifact_name",  "content", "default_lakehouse_name", "default_Lakehouse_Workspace_name", "optional_workspace_id")

Eliminar um Bloco de Notas

is_deleted = notebookutils.notebook.delete("artifact_name", "optional_workspace_id")

Listando blocos de anotações em um espaço de trabalho

artifacts_list = notebookutils.notebook.list("optional_workspace_id")

Utilitários de Funções de Dados do Utilizador (UDF)

notebookutils.udf fornece utilitários projetados para integrar o código do Notebook com as UDFs (User Data Functions). Esses utilitários permitem que você acesse funções de um item UDF dentro do mesmo espaço de trabalho ou em espaços de trabalho diferentes. Em seguida, você pode invocar funções dentro de um item UDF conforme necessário.

Aqui está uma visão geral dos métodos disponíveis:

# Get functions
myFunctions = notebookutils.udf.getFunctions('UDFItemName') # Get functions from UDF within the same workspace
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId') # Get functions from UDF across different workspace

# Additional helper method to return all functions, their respective parameters, and types.
display(myFunctions.functionDetails)
display(myFunctions.itemDetails)

# Invoke the function
myFunctions.functionName('value1', 'value2')
myFunctions.functionName(parameter1='value1', parameter2='value2'...) # Another way to invoke the function

Recuperar funções de um UDF

myFunctions = notebookutils.udf.getFunctions('UDFItemName')
myFunctions = notebookutils.udf.getFunctions('UDFItemName', 'workspaceId')
var myFunctions = notebookutils.udf.getFunctions("UDFItemName")
var myFunctions = notebookutils.udf.getFunctions("UDFItemName", "workspaceId")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName")
myFunctions <- notebookutils.udf.getFunctions("UDFItemName", "workspaceId")

Invocar uma função

myFunctions.functionName('value1', 'value2'...)
val res = myFunctions.functionName('value1', 'value2'...)
myFunctions$functionName('value1', 'value2'...)

Exibir detalhes de um item UDF

display([myFunctions.itemDetails])
display(Array(myFunctions.itemDetails))
myFunctions$itemDetails()

Exibir detalhes da função para um UDF

display(myFunctions.functionDetails)
display(myFunctions.functionDetails)
myFunctions$functionDetails()

Ferramentas de credenciais

Você pode usar os Utilitários de Credenciais para obter tokens de acesso e gerenciar segredos em um Cofre de Chaves do Azure.

Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:

notebookutils.credentials.help()

Saída:

Help on module notebookutils.credentials in notebookutils:

NAME
    notebookutils.credentials - Utility for credentials operations in Fabric

FUNCTIONS
    getSecret(akvName, secret) -> str
        Gets a secret from the given Azure Key Vault.
        :param akvName: The name of the Azure Key Vault.
        :param secret: The name of the secret.
        :return: The secret value.
    
    getToken(audience) -> str
        Gets a token for the given audience.
        :param audience: The audience for the token.
        :return: The token.
    
    help(method_name=None)
        Provides help for the notebookutils.credentials module or the specified method.
        
        Examples:
        notebookutils.credentials.help()
        notebookutils.credentials.help("getToken")
        :param method_name: The name of the method to get help with.

DATA
    creds = <notebookutils.notebookutils.handlers.CredsHandler.CredsHandler...

FILE
    /home/trusted-service-user/cluster-env/trident_env/lib/python3.10/site-packages/notebookutils/credentials.py

Obter token

getToken retorna um token Microsoft Entra para um determinado público e nome (opcional). A lista a seguir mostra as chaves de audiência atualmente disponíveis:

  • Storage Audience Resource: "armazenamento"
  • Recurso do Power BI: "pbi"
  • Azure Key Vault Resource: "keyvault"
  • Synapse RTA KQL DB Recurso: "kusto"

Execute o seguinte comando para obter o token:

notebookutils.credentials.getToken('audience Key')

Obter segredo usando credenciais de usuário

getSecret retorna um segredo do Azure Key Vault para um determinado endpoint e nome do segredo usando credenciais de usuário.

notebookutils.credentials.getSecret('https://<name>.vault.azure.net/', 'secret name')

Montagem e desmontagem de ficheiros

O Fabric suporta os seguintes cenários de montagem no pacote Microsoft Spark Utilities. Você pode usar as APIs mount, unmount, getMountPath() e mounts() para anexar armazenamento remoto (ADLS Gen2) a todos os nós de trabalho (nó de driver e nós de trabalho). Depois que o ponto de montagem de armazenamento estiver instalado, use a API de arquivo local para acessar os dados como se estivessem armazenados no sistema de arquivos local.

Como montar uma conta ADLS Gen2

O exemplo a seguir ilustra como montar o Azure Data Lake Storage Gen2. A montagem do armazenamento Blob funciona de forma semelhante.

Este exemplo pressupõe que tenha uma conta do Data Lake Storage Gen2 chamada storegen2, que tem um contêiner chamado mycontainer que deseja montar em /test na sessão do Spark do notebook.

Captura de tela mostrando onde selecionar um contêiner para montar.

Para montar o contêiner chamado mycontainer, notebookutils primeiro precisa verificar se você tem permissão para acessar o contêiner. Atualmente, o Fabric suporta dois métodos de autenticação para a operação de montagem do gatilho: accountKey e sastoken.

Montagem através de token de assinatura de acesso partilhado ou chave de conta

O NotebookUtils suporta a passagem explícita de uma chave de conta ou token de assinatura de acesso compartilhado (SAS) como parâmetro para a montagem do destino.

Por motivos de segurança, recomendamos que você armazene chaves de conta ou tokens SAS no Cofre de Chaves do Azure (como mostra a captura de tela a seguir). Em seguida, você pode recuperá-los usando a API notebookutils.credentials.getSecret . Para obter mais informações sobre o Azure Key Vault, consulte Sobre as chaves de conta de armazenamento gerenciado do Azure Key Vault.

Captura de tela mostrando onde os segredos são armazenados em um Cofre de Chaves do Azure.

Código de exemplo para o método accountKey :

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
accountKey = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"accountKey":accountKey}
)

Código de exemplo para sastoken:

# get access token for keyvault resource
# you can also use full audience here like https://vault.azure.net
sasToken = notebookutils.credentials.getSecret("<vaultURI>", "<secretName>")
notebookutils.fs.mount(  
    "abfss://mycontainer@<accountname>.dfs.core.windows.net",  
    "/test",  
    {"sasToken":sasToken}
)

Parâmetros de montagem:

  • fileCacheTimeout: Os blobs são armazenados em cache na pasta temporária local por 120 segundos por padrão. Durante esse tempo, o blobfuse não verifica se o arquivo está atualizado ou não. O parâmetro pode ser definido para alterar o tempo limite padrão. Quando vários clientes modificam arquivos ao mesmo tempo, para evitar inconsistências entre arquivos locais e remotos, recomendamos encurtar o tempo de cache, ou até mesmo alterá-lo para 0, e sempre obter os arquivos mais recentes do servidor.
  • Tempo limite: O tempo limite da operação de montagem é de 120 segundos por padrão. O parâmetro pode ser definido para alterar o tempo limite padrão. Quando há muitos executores ou quando a montagem atinge o tempo limite, recomendamos aumentar o valor.

Você pode usar estes parâmetros assim:

notebookutils.fs.mount(
   "abfss://mycontainer@<accountname>.dfs.core.windows.net",
   "/test",
   {"fileCacheTimeout": 120, "timeout": 120}
)

Nota

Por motivos de segurança, é aconselhável evitar a incorporação de credenciais diretamente no código. Para proteger ainda mais as suas credenciais, quaisquer segredos exibidos nos resultados dos blocos de notas são redigidos. Para obter mais informações, consulte Redação secreta.

Como montar uma casa no lago

Código de exemplo para montar uma casa de lago em /<mount_name>:

notebookutils.fs.mount( 
 "abfss://<workspace_name>@onelake.dfs.fabric.microsoft.com/<lakehouse_name>.Lakehouse", 
 "/<mount_name>"
)

Acessar arquivos sob o ponto de montagem usando a API notebookutils fs

O principal objetivo da operação de montagem é permitir que os clientes acessem os dados armazenados em uma conta de armazenamento remoto com uma API de sistema de arquivos local. Você também pode acessar os dados usando a API notebookutils fs com um caminho montado como parâmetro. Este formato de caminho é um pouco diferente.

Suponha que você montou o contêiner do Data Lake Storage Gen2 mycontainer em /test usando a API de montagem. Quando você acessa os dados com uma API de sistema de arquivos local, o formato de caminho é assim:

/synfs/notebook/{sessionId}/test/{filename}

Quando você quiser acessar os dados usando a API notebookutils fs, recomendamos usar getMountPath() para obter o caminho preciso:

path = notebookutils.fs.getMountPath("/test")
  • Listar diretórios:

    notebookutils.fs.ls(f"file://{notebookutils.fs.getMountPath('/test')}")
    
  • Leia o conteúdo do arquivo:

    notebookutils.fs.head(f"file://{notebookutils.fs.getMountPath('/test')}/myFile.txt")
    
  • Crie um diretório:

    notebookutils.fs.mkdirs(f"file://{notebookutils.fs.getMountPath('/test')}/newdir")
    

Aceda aos ficheiros no ponto de montagem através do caminho local

Você pode facilmente ler e gravar os arquivos no ponto de montagem usando o sistema de arquivos padrão. Aqui está um exemplo de Python:

#File read
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "r") as f:
    print(f.read())
#File write
with open(notebookutils.fs.getMountPath('/test2') + "/myFile.txt", "w") as f:
    print(f.write("dummy data"))

Como verificar os pontos de montagem existentes

Você pode usar a API notebookutils.fs.mounts() para verificar todas as informações de ponto de montagem existentes:

notebookutils.fs.mounts()

Como desmontar o ponto de montagem

Use o código a seguir para desmontar o ponto de montagem (/test neste exemplo):

notebookutils.fs.unmount("/test")

Limitações conhecidas

  • A montagem atual é uma configuração ao nível do trabalho; recomendamos que use a API de montagens para verificar se um ponto de montagem existe ou não está disponível.

  • O mecanismo de desmontagem não é aplicado automaticamente. Quando a execução do aplicativo terminar, para desmontar o ponto de montagem e liberar o espaço em disco, você precisará chamar explicitamente uma API de desmontagem em seu código. Caso contrário, o ponto de montagem ainda existirá no nó após a conclusão da execução do aplicativo.

  • Não há suporte para a montagem de uma conta de armazenamento ADLS Gen1.

Utilitários Lakehouse

notebookutils.lakehouse fornece utilitários de forma otimizada para gerenciar itens Lakehouse. Esses utilitários permitem que você crie, obtenha, atualize e exclua artefatos do Lakehouse sem esforço.

Visão geral dos métodos

Aqui está uma visão geral dos métodos disponíveis fornecidos por notebookutils.lakehouse:

# Create a new Lakehouse artifact
create(name: String, description: String = "", definition: ItemDefinition = null, workspaceId: String = ""): Artifact

# Retrieve a Lakehouse artifact
get(name: String, workspaceId: String = ""): Artifact

# Get a Lakehouse artifact with properties
getWithProperties(name: String, workspaceId: String = ""): Artifact

# Update an existing Lakehouse artifact
update(name: String, newName: String, description: String = "", workspaceId: String = ""): Artifact

# Delete a Lakehouse artifact
delete(name: String, workspaceId: String = ""): Boolean 

# List all Lakehouse artifacts
list(workspaceId: String = "", maxResults: Int = 1000): Array[Artifact]

# List all tables in a Lakehouse artifact
listTables(lakehouse: String, workspaceId: String = "", maxResults: Int = 1000): Array[Table] 

# Starts a load table operation in a Lakehouse artifact
loadTable(loadOption: collection.Map[String, Any], table: String, lakehouse: String, workspaceId: String = ""): Array[Table] 

Exemplos de utilização

Para utilizar esses métodos de forma eficaz, considere os seguintes exemplos de uso:

Criando uma Lakehouse

artifact = notebookutils.lakehouse.create("artifact_name", "Description of the artifact", "optional_workspace_id")

Obter uma Lakehouse

artifact = notebookutils.lakehouse.get("artifact_name", "optional_workspace_id")
artifact = notebookutils.lakehouse.getWithProperties("artifact_name", "optional_workspace_id")

Atualizando uma Lakehouse

updated_artifact = notebookutils.lakehouse.update("old_name", "new_name", "Updated description", "optional_workspace_id")

Excluindo uma Lakehouse

is_deleted = notebookutils.lakehouse.delete("artifact_name", "optional_workspace_id")

Listando Lakehouses em um espaço de trabalho

artifacts_list = notebookutils.lakehouse.list("optional_workspace_id")

Listando todas as tabelas em Lakehouse

artifacts_tables_list = notebookutils.lakehouse.listTables("artifact_name", "optional_workspace_id")

Iniciando uma operação de carregamento de tabela num Lakehouse

notebookutils.lakehouse.loadTable(
    {
        "relativePath": "Files/myFile.csv",
        "pathType": "File",
        "mode": "Overwrite",
        "recursive": False,
        "formatOptions": {
            "format": "Csv",
            "header": True,
            "delimiter": ","
        }
    }, "table_name", "artifact_name", "optional_workspace_id")

Informações adicionais

Para obter informações mais detalhadas sobre cada método e seus parâmetros, utilize a notebookutils.lakehouse.help("methodName") função.

Utilitários de ambiente de execução

Mostrar as informações de contexto da sessão

Com notebookutils.runtime.context, pode-se obter as informações de contexto da sessão ao vivo atual, incluindo o nome do notebook, lakehouse padrão, informações do espaço de trabalho, se é uma execução de pipeline, etc.

notebookutils.runtime.context

A tabela abaixo descreve as propriedades.

Parâmetro Explicação
currentNotebookName O nome do caderno atual
currentNotebookId O identificador exclusivo do caderno atual
currentWorkspaceName O nome do espaço de trabalho atual
currentWorkspaceId A ID do espaço de trabalho atual
defaultLakehouseName O nome de exibição da casa do lago padrão, se definido
defaultLakehouseId O ID da casa do lago padrão, se definido
defaultLakehouseWorkspaceName O nome do espaço de trabalho do lakehouse predefinido, caso esteja definido
defaultLakehouseWorkspaceId O ID do espaço de trabalho do lakehouse padrão, se estiver definido
currentRunId Em uma execução de referência, o ID de execução atual
parentRunId Numa execução de referência com execuções aninhadas, esta é a ID da execução principal.
rootRunId Em uma execução de referência com execuções aninhadas, este é o ID de execução raiz
isForPipeline Se o processo é destinado a um pipeline
isReferenceRun Se a execução atual é uma execução de referência
referenceTreePath A estrutura de árvore das execuções de referência aninhadas, utilizada exclusivamente para a hierarquia de instantâneos na página de monitorização L2
rootNotebookId (Apenas em execução de referência) O ID do bloco de notas raiz numa execução de referência.
rootNotebookName (Apenas numa execução de referência) O nome do caderno raiz numa execução de referência.
rootWorkspaceId (Apenas em execução de referência) O ID do espaço de trabalho do notebook raiz em uma execução de referência.
rootWorkspaceName (Apenas numa execução de referência) O nome do espaço de trabalho do notebook raiz numa execução de referência.
activityId O ID do trabalho Livy para a atividade atual
hcRepId O ID do REPL no modo de alta concorrência
clusterId A identidade do cluster Synapse Spark
poolName O nome da piscina Spark que está sendo usada
environmentId O ID do ambiente onde o trabalho está sendo executado
environmentWorkspaceId A ID do espaço de trabalho do ambiente
userId O ID de usuário do usuário atual
userName O nome de usuário do usuário atual

Gestão de sessões

Parar uma sessão interativa

Em vez de clicar manualmente no botão parar, às vezes é mais conveniente parar uma sessão interativa chamando uma API no código. Para esses casos, fornecemos uma API notebookutils.session.stop() para suportar a interrupção da sessão interativa via código, está disponível para Scala e PySpark.

notebookutils.session.stop()

notebookutils.session.stop() API interrompe a sessão interativa atual de forma assíncrona em segundo plano. Ele também interrompe a sessão do Spark e libera recursos ocupados pela sessão, para que fiquem disponíveis para outras sessões no mesmo pool.

Reinicie o interpretador Python

O utilitário notebookutils.session fornece uma maneira de reiniciar o interpretador Python.

notebookutils.session.restartPython()

Nota

  • No caso de execução de referência ao bloco de anotações, restartPython() apenas reinicia o interpretador Python do bloco de anotações atual em referência.
  • Em casos raros, o comando pode falhar devido ao mecanismo de reflexão do Spark, adicionar nova tentativa pode atenuar o problema.

Problema conhecido

  • Ao usar a versão runtime acima de 1.2 e executar notebookutils.help(), as APIs listadas fabricClient, PBIClient não são suportadas por enquanto, mas estarão disponíveis no futuro. Além disso, a API de credenciais não é suportada em blocos de anotações Scala por enquanto.

  • O bloco de anotações Python não suporta as APIs stop, reiniciarPython ao usar o utilitário notebookutils.session para gerenciamento de sessão.