Compartilhar via


Execução e orquestração de notebooks pelo NotebookUtils

Use as utilidades do notebook para executar um notebook, rodar vários notebooks simultaneamente ou encerrar um notebook retornando um valor. Execute o seguinte comando para obter uma visão geral dos métodos disponíveis:

notebookutils.notebook.help()

A tabela a seguir lista os métodos de execução e orquestração do notebook disponíveis:

Método Signature Descrição
run run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str Executa um notebook e retorna o valor de saída.
runMultiple runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] Executa vários notebooks simultaneamente com suporte para relações de dependência.
validateDAG validateDAG(dag: Any): bool Valida se uma definição de DAG está estruturada corretamente.
exit exit(value: str): None Sai do bloco de anotações atual com um valor.

Para operações CRUD do notebook (criar, obter, atualizar, excluir, listar), consulte Gerenciar artefatos do notebook.

Observação

O parâmetro config em runMultiple() está disponível apenas no Python. Scala e R não dão suporte a esse parâmetro.

Observação

Os utilitários de notebook não são aplicáveis para definições de trabalho do Apache Spark (SJD).

Referenciar um notebook

O run() método faz referência a um notebook e retorna seu valor de saída. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline. O notebook que está sendo referenciado é executado no pool do Spark em que o notebook chama essa função.

notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)

Por exemplo:

notebookutils.notebook.run("Sample1", 90, {"input": 20 })

Valor de retorno

O método run() retorna a string exata passada para notebookutils.notebook.exit(value) no bloco de anotações filho. Se exit() não for chamado no bloco de anotações filho, uma cadeia de caracteres vazia ("") será retornada.

Os notebooks do Fabric também dão suporte à referência de notebooks em workspaces especificando a ID do workspace.

notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")

Abra o link de instantâneo na saída da célula para inspecionar a execução de referência. O instantâneo captura os resultados da execução e ajuda você a depurar o notebook referenciado.

Captura de tela do resultado de execução de referência.

Captura de tela de um exemplo de instantâneo.

Configurar os notebooks filho para receber parâmetros

Ao criar um notebook filho chamado através de run() ou runMultiple(), configure uma célula de parâmetro para que o notebook possa receber argumentos do pai:

  1. Crie uma célula de código com valores de parâmetro padrão.
  2. Marque a célula como uma célula de parâmetro selecionando Marcar célula como parâmetros na interface do usuário do notebook.
  3. Durante a execução, os valores da célula de parâmetro são substituídos pelos argumentos passados do pai.
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"

Dica

Os valores de saída são sempre cadeias de caracteres. Se precisar de um valor numérico no notebook pai, converta o resultado após recuperá-lo (por exemplo, int(result)).

Considerações

  • O notebook de referência entre workspaces é compatível com o runtime versão 1.2 e superior.
  • Se você usar os arquivos sob Recurso do Notebook, utilize notebookutils.nbResPath no notebook referenciado para assegurar que ele direcione para a mesma pasta da execução interativa.
  • A execução de referência permite que os blocos de anotações filho sejam executados somente se eles usarem a mesma casa de lago que o pai, herdar a casa do pai ou se nenhum deles definir um. A execução será bloqueada se o filho especificar um lakehouse diferente do bloco de anotações pai. Para ignorar essa verificação, defina useRootDefaultLakehouse: True nos argumentos.
  • Não chame notebookutils.notebook.exit(value) dentro de um bloco try-catch. A chamada de saída, quando encapsulada no tratamento de exceção, não terá efeito.

A referência executa vários notebooks paralelamente

Use notebookutils.notebook.runMultiple() para executar vários blocos de anotações em paralelo ou em uma estrutura topológica predefinida. A API usa uma implementação multithread em uma sessão do Spark, o que significa que o notebook referenciado executa recursos de computação de compartilhamento.

Com notebookutils.notebook.runMultiple(), você pode:

  • Execute vários notebooks simultaneamente, sem aguardar a conclusão de cada um.

  • Especifique as dependências e a ordem de execução para seus notebooks usando um formato JSON simples.

  • Otimize o uso de recursos de computação do Spark e reduza o custo dos seus projetos do Fabric.

  • Exiba os Instantâneos de cada registro de execução do notebook na saída e depure/monitore as tarefas do notebook convenientemente.

  • Obtenha o valor de saída de cada atividade executiva e use-as em tarefas downstream.

Execute notebookutils.notebook.help("runMultiple") para exibir mais exemplos e detalhes de uso.

Executar uma lista simples de notebooks

O exemplo a seguir executa uma lista de notebooks em paralelo:

notebookutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

O resultado da execução do notebook raiz é o seguinte:

Captura de tela de referência de uma lista de notebooks.

Valor de retorno

O runMultiple() método retorna um dicionário em que cada chave é o nome da atividade e cada valor é um dicionário com as seguintes chaves:

  • exitVal: a cadeia de caracteres retornada pela chamada do caderno de exit() anotações filho, ou uma cadeia de caracteres vazia se exit() não foi chamada.
  • exception: um objeto de erro se a atividade falhou ou None se foi bem-sucedida.

Executar notebooks com uma estrutura DAG

O exemplo a seguir executa notebooks em uma estrutura DAG usando notebookutils.notebook.runMultiple().

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "Process_1", # activity name, must be unique
            "path": "NotebookSimple", # notebook item name
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
            "workspace":"WorkspaceName" # both name and id are supported
        },
        {
            "name": "Process_2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200},
            "workspace":"id" # both name and id are supported
        },
        {
            "name": "Process_1.1",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["Process_1"] # list of activity names that this activity depends on
        }
    ],
    "timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
    "concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})

O resultado da execução do notebook raiz é o seguinte:

Captura de tela de referência de uma lista de notebooks com parâmetros.

Referência de parâmetro DAG

A tabela a seguir descreve cada campo que você pode usar na definição de DAG:

Campo Nível Obrigatório Descrição
activities Root Sim Uma lista de objetos de atividade que definem os notebooks a serem executados.
timeoutInSeconds Root No Tempo limite máximo para todo o DAG. O padrão é 43200 (12 horas).
concurrency Root No Número máximo de notebooks que podem ser executados simultaneamente. O padrão é 3 vezes a quantidade disponível de núcleos de CPU. Defina esse valor explicitamente se você precisar de um controle mais rígido ou usar 0 para simultaneidade ilimitada.
name Atividade Sim Um nome exclusivo para a atividade. Usado para identificar resultados e definir dependências.
path Atividade Sim O nome do item do bloco de anotações ou o caminho a ser executado.
timeoutPerCellInSeconds Atividade No Tempo limite máximo para cada célula no caderno filho. O padrão é 90 segundos.
args Atividade No Um dicionário de parâmetros a serem passados para o bloco de anotações filho.
workspace Atividade No O nome do workspace ou o ID onde o notebook reside. Por padrão, o notebook filho é executado no mesmo workspace que o chamador.
retry Atividade No Número de tentativas de repetição se a atividade falhar. O padrão é 0.
retryIntervalInSeconds Atividade No Tempo de espera em segundos entre tentativas de repetição. O padrão é 0.
dependencies Atividade No Uma lista de nomes de atividades que devem ser concluídas antes de esta atividade começar.

Referenciar valores de saída entre atividades

Você pode referenciar o valor de saída de uma atividade de dependência no args campo usando a @activity() expressão. Esse padrão permite que você passe dados entre notebooks em um DAG.

DAG = {
    "activities": [
        {
            "name": "Extract",
            "path": "ExtractData",
            "timeoutPerCellInSeconds": 120,
            "args": {"source": "prod_db"}
        },
        {
            "name": "Transform",
            "path": "TransformData",
            "timeoutPerCellInSeconds": 180,
            "args": {
                "data_path": "@activity('Extract').exitValue()"
            },
            "dependencies": ["Extract"]
        }
    ]
}

results = notebookutils.notebook.runMultiple(DAG)

Dica

Use a @activity('activity_name').exitValue() expressão no args campo para passar resultados de uma atividade para outra dentro de um DAG.

Criar um DAG dinâmico

Você pode gerar estruturas DAG programaticamente para cenários como o processamento de fan-out em várias partições:

def create_fan_out_dag(partitions):
    activities = []

    for partition in partitions:
        activities.append({
            "name": f"Process_{partition}",
            "path": "ProcessPartition",
            "timeoutPerCellInSeconds": 180,
            "args": {"partition": partition}
        })

    activities.append({
        "name": "Aggregate",
        "path": "AggregateResults",
        "timeoutPerCellInSeconds": 120,
        "dependencies": [f"Process_{p}" for p in partitions]
    })

    return {"activities": activities, "concurrency": 25}

partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)

results = notebookutils.notebook.runMultiple(dag)

Validar um DAG

Use validateDAG() para verificar se a estrutura do DAG é válida antes da execução. Ele captura problemas como nomes de atividade duplicados, dependências ausentes e referências circulares.

notebookutils.notebook.validateDAG(DAG)

Valor de retorno

O validateDAG() método retornará True se a estrutura da DAG for válida ou gerar uma exceção se a validação falhar.

Dica

Sempre chame validateDAG() antes runMultiple() nos fluxos de trabalho de produção para detectar erros estruturais antecipadamente.

Lidar com falhas do runMultiple

O runMultiple() método retorna um dicionário em que cada chave é o nome da atividade e cada valor contém um exitVal (cadeia de caracteres) e um exception (objeto de erro ou None). Você pode inspecionar resultados parciais mesmo quando algumas atividades falham:

from notebookutils.common.exceptions import RunMultipleFailedException

try:
    results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
    results = ex.result

for activity_name, result in results.items():
    if result["exception"]:
        print(f"{activity_name} failed: {result['exception']}")
    else:
        print(f"{activity_name} succeeded: {result['exitVal']}")

Considerações

  • O grau de paralelismo da execução de vários notebooks é restrito ao recurso de computação total disponível de uma sessão do Spark.
  • O número padrão de notebooks simultâneos é três vezes a contagem de núcleos de CPU disponíveis. Você pode personalizar esse valor, mas o paralelismo excessivo pode levar a problemas de estabilidade e desempenho devido ao alto uso de recursos de computação. Se surgirem problemas, considere separar notebooks em várias chamadas runMultiple ou reduzir a simultaneidade ajustando o campo de simultaneidade no parâmetro DAG.
  • O tempo limite padrão para todo o DAG é de 12 horas e o tempo limite padrão para cada célula em um notebook filho é de 90 segundos. Você pode alterar o tempo limite definindo os campos timeoutInSeconds e timeoutPerCellInSeconds no parâmetro DAG.
  • Configure retry e retryIntervalInSeconds para atividades que podem falhar devido a problemas transitórios, como tempos limite de rede ou indisponibilidade de serviço temporário.
  • Blocos de anotações paralelos compartilham recursos de computação em uma única sessão do Spark. Monitore a utilização de recursos para evitar a pressão de memória e a contenção da CPU.

Sair de um notebook

O exit() método finaliza um caderno com um valor. Você pode executar o aninhamento de chamadas de função em um notebook interativamente ou em um pipeline.

  • Quando você chama uma exit() função de um caderno interativamente, o caderno do Fabric gera uma exceção, ignora a execução das células seguintes e mantém a sessão do Spark ativa.

  • Quando você orquestra um notebook em um pipeline que chama uma exit() função, a atividade do notebook devolve um valor de saída. Isso conclui a execução do pipeline e interrompe a sessão do Spark.

  • Quando você chama uma função exit() em um notebook que está sendo referenciado, o Fabric Spark interrompe a execução adicional do notebook referenciado e continua a executar as próximas células no notebook principal que chama a função run(). Por exemplo: Notebook1 tem três células e chama uma função exit() na segunda célula. O Notebook2 tem cinco células e chama run(notebook1) na terceira célula. Quando você executa o Notebook2, o Notebook1 para na segunda célula ao atingir a exit() função. Notebook2 continua a executar a quarta célula e a quinta célula.

notebookutils.notebook.exit("value string")

Comportamento de retorno

O exit() método não retorna um valor. Ele encerra o notebook atual e passa a cadeia de caracteres fornecida para o notebook ou pipeline de chamada.

Observação

A exit() função substitui a saída de célula atual. Para evitar perder a saída de outras instruções de código, chame notebookutils.notebook.exit() em uma célula separada.

Importante

Não chame notebookutils.notebook.exit() dentro de um try-catch bloco. A saída não terá efeito quando encapsulada no tratamento de exceção. A chamada exit() deve estar no topo do código para funcionar corretamente.

Por exemplo:

O notebook Sample1 tem as duas seguintes células:

  • A célula 1 define um parâmetro de entrada com o valor padrão definido como 10.

  • A célula 2 sai do notebook com entrada como valor de saída.

Captura de tela mostrando um notebook de exemplo da função de saída.

Você pode executar Sample1 em outro notebook com valores padrão:

exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)

Saída:

10

Você pode executar Sample1 em outro notebook e definir o valor de entrada como 20:

exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)

Saída:

20