Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Use as utilidades do notebook para executar um notebook, rodar vários notebooks simultaneamente ou encerrar um notebook retornando um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:
notebookutils.notebook.help()
A tabela a seguir lista os métodos de execução e orquestração do notebook disponíveis:
| Método | Signature | Descrição |
|---|---|---|
run |
run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str |
Executa um notebook e retorna o valor de saída. |
runMultiple |
runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] |
Executa vários notebooks simultaneamente com suporte para relações de dependência. |
validateDAG |
validateDAG(dag: Any): bool |
Valida se uma definição de DAG está estruturada corretamente. |
exit |
exit(value: str): None |
Sai do bloco de anotações atual com um valor. |
Para operações CRUD do notebook (criar, obter, atualizar, excluir, listar), consulte Gerenciar artefatos do notebook.
Observação
O parâmetro config em runMultiple() está disponível apenas no Python. Scala e R não dão suporte a esse parâmetro.
Observação
Os utilitários de notebook não são aplicáveis para definições de trabalho do Apache Spark (SJD).
Referenciar um notebook
O run() método faz referência a um notebook e retorna seu valor de saída. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline. O notebook que está sendo referenciado é executado no pool do Spark em que o notebook chama essa função.
notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)
Por exemplo:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
Valor de retorno
O método run() retorna a string exata passada para notebookutils.notebook.exit(value) no bloco de anotações filho. Se exit() não for chamado no bloco de anotações filho, uma cadeia de caracteres vazia ("") será retornada.
Os notebooks do Fabric também dão suporte à referência de notebooks em workspaces especificando a ID do workspace.
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
Abra o link de instantâneo na saída da célula para inspecionar a execução de referência. O instantâneo captura os resultados da execução e ajuda você a depurar o notebook referenciado.
Configurar os notebooks filho para receber parâmetros
Ao criar um notebook filho chamado através de run() ou runMultiple(), configure uma célula de parâmetro para que o notebook possa receber argumentos do pai:
- Crie uma célula de código com valores de parâmetro padrão.
- Marque a célula como uma célula de parâmetro selecionando Marcar célula como parâmetros na interface do usuário do notebook.
- Durante a execução, os valores da célula de parâmetro são substituídos pelos argumentos passados do pai.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"
Dica
Os valores de saída são sempre cadeias de caracteres. Se precisar de um valor numérico no notebook pai, converta o resultado após recuperá-lo (por exemplo, int(result)).
Considerações
- O notebook de referência entre workspaces é compatível com o runtime versão 1.2 e superior.
- Se você usar os arquivos sob Recurso do Notebook, utilize
notebookutils.nbResPathno notebook referenciado para assegurar que ele direcione para a mesma pasta da execução interativa. - A execução de referência permite que os blocos de anotações filho sejam executados somente se eles usarem a mesma casa de lago que o pai, herdar a casa do pai ou se nenhum deles definir um. A execução será bloqueada se o filho especificar um lakehouse diferente do bloco de anotações pai. Para ignorar essa verificação, defina
useRootDefaultLakehouse: Truenos argumentos. - Não chame
notebookutils.notebook.exit(value)dentro de um blocotry-catch. A chamada de saída, quando encapsulada no tratamento de exceção, não terá efeito.
A referência executa vários notebooks paralelamente
Use notebookutils.notebook.runMultiple() para executar vários blocos de anotações em paralelo ou em uma estrutura topológica predefinida. A API usa uma implementação multithread em uma sessão do Spark, o que significa que o notebook referenciado executa recursos de computação de compartilhamento.
Com notebookutils.notebook.runMultiple(), você pode:
Execute vários notebooks simultaneamente, sem aguardar a conclusão de cada um.
Especifique as dependências e a ordem de execução para seus notebooks usando um formato JSON simples.
Otimize o uso de recursos de computação do Spark e reduza o custo dos seus projetos do Fabric.
Exiba os Instantâneos de cada registro de execução do notebook na saída e depure/monitore as tarefas do notebook convenientemente.
Obtenha o valor de saída de cada atividade executiva e use-as em tarefas downstream.
Execute notebookutils.notebook.help("runMultiple") para exibir mais exemplos e detalhes de uso.
Executar uma lista simples de notebooks
O exemplo a seguir executa uma lista de notebooks em paralelo:
O resultado da execução do notebook raiz é o seguinte:
Valor de retorno
O runMultiple() método retorna um dicionário em que cada chave é o nome da atividade e cada valor é um dicionário com as seguintes chaves:
-
exitVal: a cadeia de caracteres retornada pela chamada do caderno deexit()anotações filho, ou uma cadeia de caracteres vazia seexit()não foi chamada. -
exception: um objeto de erro se a atividade falhou ouNonese foi bem-sucedida.
Executar notebooks com uma estrutura DAG
O exemplo a seguir executa notebooks em uma estrutura DAG usando notebookutils.notebook.runMultiple().
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "Process_1", # activity name, must be unique
"path": "NotebookSimple", # notebook item name
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
"workspace":"WorkspaceName" # both name and id are supported
},
{
"name": "Process_2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200},
"workspace":"id" # both name and id are supported
},
{
"name": "Process_1.1",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["Process_1"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
O resultado da execução do notebook raiz é o seguinte:
Referência de parâmetro DAG
A tabela a seguir descreve cada campo que você pode usar na definição de DAG:
| Campo | Nível | Obrigatório | Descrição |
|---|---|---|---|
activities |
Root | Sim | Uma lista de objetos de atividade que definem os notebooks a serem executados. |
timeoutInSeconds |
Root | No | Tempo limite máximo para todo o DAG. O padrão é 43200 (12 horas). |
concurrency |
Root | No | Número máximo de notebooks que podem ser executados simultaneamente. O padrão é 3 vezes a quantidade disponível de núcleos de CPU. Defina esse valor explicitamente se você precisar de um controle mais rígido ou usar 0 para simultaneidade ilimitada. |
name |
Atividade | Sim | Um nome exclusivo para a atividade. Usado para identificar resultados e definir dependências. |
path |
Atividade | Sim | O nome do item do bloco de anotações ou o caminho a ser executado. |
timeoutPerCellInSeconds |
Atividade | No | Tempo limite máximo para cada célula no caderno filho. O padrão é 90 segundos. |
args |
Atividade | No | Um dicionário de parâmetros a serem passados para o bloco de anotações filho. |
workspace |
Atividade | No | O nome do workspace ou o ID onde o notebook reside. Por padrão, o notebook filho é executado no mesmo workspace que o chamador. |
retry |
Atividade | No | Número de tentativas de repetição se a atividade falhar. O padrão é 0. |
retryIntervalInSeconds |
Atividade | No | Tempo de espera em segundos entre tentativas de repetição. O padrão é 0. |
dependencies |
Atividade | No | Uma lista de nomes de atividades que devem ser concluídas antes de esta atividade começar. |
Referenciar valores de saída entre atividades
Você pode referenciar o valor de saída de uma atividade de dependência no args campo usando a @activity() expressão. Esse padrão permite que você passe dados entre notebooks em um DAG.
DAG = {
"activities": [
{
"name": "Extract",
"path": "ExtractData",
"timeoutPerCellInSeconds": 120,
"args": {"source": "prod_db"}
},
{
"name": "Transform",
"path": "TransformData",
"timeoutPerCellInSeconds": 180,
"args": {
"data_path": "@activity('Extract').exitValue()"
},
"dependencies": ["Extract"]
}
]
}
results = notebookutils.notebook.runMultiple(DAG)
Dica
Use a @activity('activity_name').exitValue() expressão no args campo para passar resultados de uma atividade para outra dentro de um DAG.
Criar um DAG dinâmico
Você pode gerar estruturas DAG programaticamente para cenários como o processamento de fan-out em várias partições:
def create_fan_out_dag(partitions):
activities = []
for partition in partitions:
activities.append({
"name": f"Process_{partition}",
"path": "ProcessPartition",
"timeoutPerCellInSeconds": 180,
"args": {"partition": partition}
})
activities.append({
"name": "Aggregate",
"path": "AggregateResults",
"timeoutPerCellInSeconds": 120,
"dependencies": [f"Process_{p}" for p in partitions]
})
return {"activities": activities, "concurrency": 25}
partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)
results = notebookutils.notebook.runMultiple(dag)
Validar um DAG
Use validateDAG() para verificar se a estrutura do DAG é válida antes da execução. Ele captura problemas como nomes de atividade duplicados, dependências ausentes e referências circulares.
Valor de retorno
O validateDAG() método retornará True se a estrutura da DAG for válida ou gerar uma exceção se a validação falhar.
Dica
Sempre chame validateDAG() antes runMultiple() nos fluxos de trabalho de produção para detectar erros estruturais antecipadamente.
Lidar com falhas do runMultiple
O runMultiple() método retorna um dicionário em que cada chave é o nome da atividade e cada valor contém um exitVal (cadeia de caracteres) e um exception (objeto de erro ou None). Você pode inspecionar resultados parciais mesmo quando algumas atividades falham:
from notebookutils.common.exceptions import RunMultipleFailedException
try:
results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
results = ex.result
for activity_name, result in results.items():
if result["exception"]:
print(f"{activity_name} failed: {result['exception']}")
else:
print(f"{activity_name} succeeded: {result['exitVal']}")
Considerações
- O grau de paralelismo da execução de vários notebooks é restrito ao recurso de computação total disponível de uma sessão do Spark.
- O número padrão de notebooks simultâneos é três vezes a contagem de núcleos de CPU disponíveis. Você pode personalizar esse valor, mas o paralelismo excessivo pode levar a problemas de estabilidade e desempenho devido ao alto uso de recursos de computação. Se surgirem problemas, considere separar notebooks em várias chamadas
runMultipleou reduzir a simultaneidade ajustando o campo de simultaneidade no parâmetro DAG. - O tempo limite padrão para todo o DAG é de 12 horas e o tempo limite padrão para cada célula em um notebook filho é de 90 segundos. Você pode alterar o tempo limite definindo os campos timeoutInSeconds e timeoutPerCellInSeconds no parâmetro DAG.
- Configure
retryeretryIntervalInSecondspara atividades que podem falhar devido a problemas transitórios, como tempos limite de rede ou indisponibilidade de serviço temporário. - Blocos de anotações paralelos compartilham recursos de computação em uma única sessão do Spark. Monitore a utilização de recursos para evitar a pressão de memória e a contenção da CPU.
Sair de um notebook
O exit() método finaliza um caderno com um valor. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline.
Quando você chama uma
exit()função de um caderno interativamente, o caderno do Fabric gera uma exceção, ignora a execução das células seguintes e mantém a sessão do Spark ativa.Quando você orquestra um notebook em um pipeline que chama uma
exit()função, a atividade do notebook devolve um valor de saída. Isso conclui a execução do pipeline e interrompe a sessão do Spark.Quando você chama uma função
exit()em um notebook que está sendo referenciado, o Fabric Spark interrompe a execução adicional do notebook referenciado e continua a executar as próximas células no notebook principal que chama a funçãorun(). Por exemplo: Notebook1 tem três células e chama uma funçãoexit()na segunda célula. O Notebook2 tem cinco células e chamarun(notebook1)na terceira célula. Quando você executa o Notebook2, o Notebook1 para na segunda célula ao atingir aexit()função. Notebook2 continua a executar a quarta célula e a quinta célula.
Comportamento de retorno
O exit() método não retorna um valor. Ele encerra o notebook atual e passa a cadeia de caracteres fornecida para o notebook ou pipeline de chamada.
Observação
A exit() função substitui a saída de célula atual. Para evitar perder a saída de outras instruções de código, chame notebookutils.notebook.exit() em uma célula separada.
Importante
Não chame notebookutils.notebook.exit() dentro de um try-catch bloco. A saída não terá efeito quando encapsulada no tratamento de exceção. A chamada exit() deve estar no topo do código para funcionar corretamente.
Por exemplo:
O notebook Sample1 tem as duas seguintes células:
A célula 1 define um parâmetro de entrada com o valor padrão definido como 10.
A célula 2 sai do notebook com entrada como valor de saída.
Você pode executar Sample1 em outro notebook com valores padrão:
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
Saída:
10
Você pode executar Sample1 em outro notebook e definir o valor de entrada como 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
Saída:
20