從另一個筆記本執行 Databricks 筆記本

重要

針對筆記本協調流程,請使用 Databricks 作業。 針對程式代碼模組化案例,請使用工作區檔案。 只有在無法使用 Databricks 作業實作使用案例時,才應該使用本文所述的技術,例如,在動態參數集上循環筆記本,或如果您沒有工作區檔案存取權。 如需詳細資訊,請參閱 Databricks 作業共用程序代碼

%rundbutils.notebook.run() 的比較

%run命令可讓您在筆記本中包含另一個筆記本。 您可以使用 %run 將支援函式放在個別的筆記本中,將程式代碼模組化。 您也可以使用它來串連可實作分析中步驟的筆記本。 當您使用 %run時,會立即執行呼叫的筆記本,並在呼叫筆記本中定義函式和變數可供使用。

dbutils.notebook API 是 的補碼,%run因為它可讓您將參數傳遞至筆記本並傳回值。 這可讓您建置具有相依性的複雜工作流程和管線。 例如,您可以取得目錄中的檔案清單,並將名稱傳遞至另一個筆記本,而無法使用 %run。 您也可以根據傳回值建立 if-then-else 工作流程,或使用相對路徑呼叫其他筆記本。

不同於 %run,方法 dbutils.notebook.run() 會啟動新的作業來執行筆記本。

這些方法,就像所有 dbutils API 一樣,只能在 Python 和 Scala 中使用。 不過,您可以使用 來叫用 dbutils.notebook.run() R 筆記本。

使用 %run 匯入筆記本

在此範例中,第一個筆記本會定義函式 , reverse此函式會在您使用 %run magic執行 shared-code-notebook之後,在第二個筆記本中提供。

共用程式代碼筆記本

筆記本匯入範例

由於這兩個筆記本都位於工作區中的相同目錄中,因此請使用 中的./shared-code-notebook前置詞./來指出路徑應該相對於目前執行中的筆記本解析。 您可以將筆記本組織成目錄,例如 %run ./dir/notebook,或使用之類的 %run /Users/username@organization.com/directory/notebook絕對路徑。

注意

  • %run必須單獨在儲存格中,因為它會內嵌執行整個筆記本。
  • 您無法使用 %run 來執行 Python 檔案,以及import該檔案中定義的實體到筆記本。 若要從 Python 檔案匯入,請參閱 使用檔案將程式代碼模組化。 或者,將檔案封裝到 Python 連結庫、從該 Python 連結庫建立 Azure Databricks 連結庫 ,並將 連結庫安裝到您用來執行筆記本的叢集中
  • 當您使用 %run 來執行包含小工具的筆記本時,根據預設,指定的筆記本會以小工具的預設值執行。 您也可以將值傳入 Widget;請參閱 搭配 %run 使用 Databricks 小工具。

dbutils.notebook API

API rundbutils.notebook可用的方法是 與 exit。 參數和傳回值都必須是字串。

run(path: String, timeout_seconds: int, arguments: Map): String

執行筆記本並傳回其結束值。 方法會啟動立即執行的暫時作業。

參數 timeout_seconds 會控制執行的逾時 (0 表示沒有逾時):如果在指定時間內未完成,則 run 呼叫 會擲回例外狀況。 如果 Azure Databricks 已關閉超過 10 分鐘,則不論 為何 timeout_seconds,筆記本執行都會失敗。

參數會 arguments 設定目標筆記本的 Widget 值。 具體來說,如果您正在執行的筆記本有名為 A的 Widget,而且您會將機碼/值組 ("A": "B") 當做自變數參數 run() 的一部分傳遞給呼叫,則擷取小工具 A 的值會傳回 "B"。 您可以在 Databricks 小工具一文中找到 建立和使用小工具 的指示。

注意

  • 參數 arguments 只接受拉丁字元 (ASCII 字元集)。 使用非 ASCII 字元會傳回錯誤。
  • 使用 API 建立的 dbutils.notebook 作業必須在 30 天內完成。

run 使用

Python

dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})

Scala

dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))

run 範例

假設您有名為 workflows 的筆記本,其中包含名為 foo 的 Widget,其會列印小工具的值:

dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))

執行 dbutils.notebook.run("workflows", 60, {"foo": "bar"}) 會產生下列結果:

具有小工具的筆記本

小工具具有您使用、 "bar"傳入dbutils.notebook.run()的值,而不是預設值。

exit(value: String): void 以值結束筆記本。 如果您使用 方法呼叫筆記本 run ,這是傳回的值。

dbutils.notebook.exit("returnValue")

在作業中呼叫 dbutils.notebook.exit 會導致筆記本順利完成。 如果您想要讓作業失敗,請擲回例外狀況。

例子

在下列範例中,您會根據 的結果DataImportNotebook,將DataImportNotebook自變數傳遞給並執行不同的筆記本(DataCleaningNotebookErrorHandlingNotebook)。

if-else 範例

當程式代碼執行時,您會看到執行中筆記本的連結:

執行筆記本的連結

若要檢視執行的詳細數據,請按下筆記本連結 筆記本作業 #xxxx

暫時筆記本執行的結果

傳遞結構化數據

本節說明如何在筆記本之間傳遞結構化數據。

Python

# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.

## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))

# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
  "status": "OK",
  "table": "my_data"
}))

## In caller notebook
import json

result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))

Scala

// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.

/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))

// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.

/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")

/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))

// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.

/** In callee notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))

/** In caller notebook */

// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper

// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)

val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))

處理錯誤

本節說明如何處理錯誤。

Python

# Errors throw a WorkflowException.

def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
  num_retries = 0
  while True:
    try:
      return dbutils.notebook.run(notebook, timeout, args)
    except Exception as e:
      if num_retries > max_retries:
        raise e
      else:
        print("Retrying error", e)
        num_retries += 1

run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)

Scala

// Errors throw a WorkflowException.

import com.databricks.WorkflowException

// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
  var numTries = 0
  while (true) {
    try {
      return dbutils.notebook.run(notebook, timeout, args)
    } catch {
      case e: WorkflowException if numTries < maxTries =>
        println("Error, retrying: " + e)
    }
    numTries += 1
  }
  "" // not reached
}

runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)

同時執行多個筆記本

您可以使用標準 Scala 和 Python 建構來同時執行多個筆記本,例如線程(Scala、Python)和 Futures (ScalaPython)。 範例筆記本會示範如何使用這些建構。

  1. 下載下列 4 個筆記本。 筆記本是以 Scala 撰寫。
  2. 將筆記本匯入工作區中的單一資料夾。
  3. 執行並行執行筆記本。

同時執行筆記本

取得筆記本

以平行筆記本執行

取得筆記本

測試筆記本

取得筆記本

Testing-2 筆記本

取得筆記本