Orquestre blocos de anotações e modularize o código em blocos de anotações

Saiba como orquestrar blocos de notas e modularizar código em blocos de notas. Veja exemplos e entenda quando usar métodos alternativos para orquestração de notebooks.

Métodos de orquestração e modularização de código

A tabela a seguir compara os métodos disponíveis para orquestrar blocos de anotações e modularizar código em blocos de anotações.

Método Caso de uso Observações
Empregos em Lakeflow Orquestração de notebooks (recomendado) Método recomendado para orquestrar blocos de anotações.
Suporta fluxos de trabalho complexos com dependências de tarefas, agendamento e gatilhos. Fornece uma abordagem robusta e escalável para cargas de trabalho de produção, mas requer instalação e configuração.
dbutils.notebook.run() Orquestração de notebooks Use dbutils.notebook.run() se Trabalhos não puder suportar seu caso de uso, como blocos de anotações em loop em um conjunto dinâmico de parâmetros.
Inicia um novo trabalho efêmero para cada chamada, o que pode aumentar a sobrecarga e carece de recursos avançados de agendamento.
arquivos do espaço de trabalho Modularização de código (recomendado) Método recomendado para modularizar o código.
Modularize o código em arquivos de código reutilizáveis armazenados no espaço de trabalho. Suporta controle de versão com repositórios e integração com IDEs para melhor depuração e testes de unidade. Requer configuração adicional para gerenciar caminhos de arquivo e dependências.
%run Modularização de código Use %run se você não puder acessar arquivos de espaço de trabalho.
Importa funções ou variáveis de outros cadernos executando-as em linha. Útil para prototipagem, mas pode levar a um código fortemente acoplado que é mais difícil de manter. Não suporta passagem de parâmetros ou controle de versão.

%run vs. dbutils.notebook.run()

O %run comando permite incluir outro bloco de notas num bloco de notas. Você pode usar %run para modularizar seu código colocando funções de suporte em um bloco de anotações separado. Você também pode usá-lo para concatenar blocos de anotações que implementam as etapas em uma análise. Ao utilizar o %run, o bloco de anotações chamado é imediatamente executado, e as funções e variáveis nele definidas tornam-se disponíveis no bloco de anotações que o chamou.

A API dbutils.notebook complementa %run porque permite passar parâmetros e retornar valores de um bloco de anotações. Isso permite que você crie fluxos de trabalho e pipelines complexos com dependências. Por exemplo, você pode obter uma lista de arquivos em um diretório e passar os nomes para outro bloco de anotações, o que é impossível com %run. Você também pode criar fluxos de trabalho if-then-else com base em valores de retorno.

Ao contrário do %run, o método dbutils.notebook.run() inicia um novo trabalho para executar o notebook.

Tal como todas as APIs dbutils, estes métodos estão disponíveis apenas em Python e Scala. No entanto, você pode usar dbutils.notebook.run() para invocar um bloco de anotações R.

Utilizar %run para importar um bloco de notas

Neste exemplo, o primeiro bloco de notas define uma função, reverse, que está disponível no segundo bloco de notas depois de utilizar a %run magia para executar shared-code-notebook.

Bloco de notas de código partilhado

Exemplo de importação de bloco de notas

Como ambos os notebooks estão no mesmo diretório no espaço de trabalho, use o prefixo ./ no ./shared-code-notebook para indicar que o caminho deve ser determinado em relação ao notebook em execução no momento. Você pode organizar blocos de anotações em diretórios, como %run ./dir/notebook, ou usar um caminho absoluto como %run /Users/username@organization.com/directory/notebook.

Nota

  • %run deve estar numa célula por si só, porque ele executa todo o notebook em modo inline.
  • Não pode usar para executar um ficheiro Python e %run importar as entidades definidas nesse ficheiro para um caderno. Para importar de um ficheiro Python, veja Modularize o seu código usando ficheiros. Ou, empacote o ficheiro numa biblioteca Python, cria uma biblioteca Azure Databricks a partir dessa biblioteca Python, e instala a biblioteca no cluster que usas para correr o teu notebook.
  • Quando você usa %run para executar um bloco de anotações que contém widgets, por padrão, o bloco de anotações especificado é executado com os valores padrão do widget. Você também pode atribuir valores a widgets; veja Como usar widgets Databricks com %run.

Usar dbutils.notebook.run para iniciar um novo trabalho

Execute um notebook e retorne o seu valor de saída. O método inicia um trabalho efêmero que é executado imediatamente.

Os métodos disponíveis na dbutils.notebook API são run e exit. Ambos os parâmetros e valores de retorno devem ser strings.

run(path: String, timeout_seconds: int, arguments: Map): String

O parâmetro timeout_seconds controla o tempo limite da execução (0 significa que não há tempo limite). A chamada para run lança uma exceção se não terminar dentro do tempo especificado. Se o Azure Databricks estiver inativo por mais de 10 minutos, a execução do notebook falhará independentemente de timeout_seconds.

O parâmetro arguments define os valores do widget do bloco de anotações de destino. Especificamente, se o notebook que está a ser executado tiver um widget chamado A e for passado um par chave-valor ("A": "B") como parte do parâmetro de argumentos na chamada run(), então recuperar o valor do widget A irá devolver "B". Pode encontrar as instruções para criar e trabalhar com widgets na página de widgets do Databricks .

Nota

  • O parâmetro arguments aceita apenas caracteres latinos (conjunto de caracteres ASCII). O uso de caracteres não-ASCII retorna um erro.
  • Os trabalhos criados usando a API devem ser concluídos dbutils.notebook em 30 dias ou menos.

run Utilização

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

linguagem de programação Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

Passar dados estruturados entre blocos de notas

Esta seção ilustra como passar dados estruturados entre blocos de anotações.

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

linguagem de programação Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

Processar erros

Esta seção ilustra como lidar com erros.

Python

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

linguagem de programação Scala

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here, we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

Executar vários cadernos simultaneamente

Pode executar vários notebooks ao mesmo tempo usando construções padrão de Scala e Python como Threads (Scala, Python) e Futures (Scala, Python). Os blocos de anotações de exemplo demonstram como usar essas construções.

  1. Transfira os seguintes quatro blocos de notas. Os cadernos estão escritos em Scala.
  2. Importe os blocos de anotações para uma única pasta no espaço de trabalho.
  3. Execute o bloco de anotações Executar simultaneamente .

Executar simultaneamente o bloco de anotações

Obter caderno

Executar em paralelo no notebook

Obter caderno

Notebook de teste

Obter caderno

Caderno Testing-2

Obter caderno