瞭解如何協調筆記本,並將筆記本中的程式代碼模組化。 請參閱範例並瞭解何時使用筆記本自動化流程的其他方法。
協調流程和程式代碼模組化方法
下表比較可用來協調筆記本的方法,並將筆記本中的程式代碼模組化。
| 方法 | 用例 | 筆記 |
|---|---|---|
| Lakeflow 作業 | 筆記本編排(建議) | 協調筆記本的建議方法。 支援具有工作相依性、排程和觸發程序的複雜工作流程。 為生產工作負載提供健全且可擴展的方案,但需要設置和配置。 |
| dbutils.notebook.run() | 筆記本協調流程 | 如果工作無法支援您的使用案例,例如需要在一組動態參數上重複運行筆記本,請使用 dbutils.notebook.run()。針對每個呼叫啟動新的暫時作業,這可能會增加額外負荷,而且缺少進階排程功能。 |
| 工作區檔案 | 程式代碼模組化(建議) | 模組化程式代碼的建議方法。 將程式代碼模組化為儲存在工作區中的可重複使用程式代碼檔案。 支援使用存放庫進行版本控制,並與 IDE 整合,以便進行更好的偵錯和單元測試。 需要額外的設定來管理檔案路徑和相依性。 |
| %run | 程式代碼模組化 | 如果您無法存取工作區檔案,請使用 %run 。透過內嵌執行,從其他筆記本匯入函式或變數。 適用於原型設計,但可能會導致緊密結合的程式代碼難以維護。 不支持參數傳遞或版本控制。 |
%run 與 dbutils.notebook.run()
%run命令可讓您在筆記本中嵌入另一個筆記本。 您可以使用 %run,將支援函式放在個別的筆記本中,將程式代碼模組化。 您也可以使用它來串接分析中執行步驟的筆記本。 當您使用 %run時,會立即執行呼叫的 Notebook,並在其中定義的函數與變數會在呼叫 Notebook 時可供使用。
dbutils.notebook API 可補充 %run,因為它可讓您將參數傳遞至筆記本並傳回值。 這可讓您組建具有相依性的複雜工作流程和管線。 例如,您可以取得目錄中的檔案清單,並將名稱傳遞到另一個筆記本,這在 %run中則是不可能的。 您也可以根據傳回值建立 if-then-else 工作流程。
不同於 %run,方法 dbutils.notebook.run() 會啟動新的作業來執行 Notebook。
與所有 dbutils API 一樣,這些方法僅在 Python 與 Scala 中提供。 不過,您可以使用 dbutils.notebook.run() 來叫用 R Notebook。
使用 %run 匯入筆記本
在此範例中,第一個 Notebook 會定義函數 reverse,在您使用 %run magic 執行 shared-code-notebook 之後,其可用於第二個 Notebook。
由於這兩個筆記本都位於工作區中的相同目錄中,因此,請在 ./ 中使用前置詞 ./shared-code-notebook,以表示路徑應該相對於目前正在執行的筆記本進行解析。 您可以將 Notebook 組織成目錄,例如 %run ./dir/notebook,或使用像 %run /Users/username@organization.com/directory/notebook 之類的絕對路徑。
注意
-
%run必須單獨在儲存格中,因為它會執行整個 Notebook 內嵌。 - 你無法用執行一個Python檔案,然後將該檔案中定義的實體
%run匯入到筆記本中。 要從 Python 檔案匯入,請參考 用 files模組化你的程式碼。 或者,把檔案打包成一個Python函式庫,從該Python函式庫建立一個Azure Databricks函式庫,然後將該函式庫安裝到你用來執行筆記本的叢集中。 - 當您使用
%run來執行包含小工具的筆記本時,根據預設,指定的筆記本會以小工具的預設值執行。 您也可以將值傳遞至小工具,詳情請參閱 使用 Databricks 小工具搭配 %run。
使用 dbutils.notebook.run 啟動新作業
執行 Notebook 並傳回其結束值。 該方法會啟動立即執行的暫時作業。
dbutils.notebook API 所提供的方法是 run與 exit。 參數和傳回值都必須是字串。
run(path: String, timeout_seconds: int, arguments: Map): String
timeout_seconds 參數會控制執行的逾時 (0 表示沒有逾時)。 呼叫 run 如果在指定的時間內未完成,會擲回例外狀況。 如果Azure Databricks宕機超過 10 分鐘,筆記本執行會失敗,不管 timeout_seconds。
arguments 參數會設定目標筆記本的小工具值。 具體來說,如果您正在執行的 Notebook 有名稱為 A 的控制項,而且您會將索引鍵/值組 ("A": "B") 做為引數中的一部分傳遞給 run() 呼叫,則取得控制項 A 的值會傳回 "B"。 你可以在 Databricks 的小工具 頁面找到建立和使用元件的說明。
注意
-
arguments參數只接受拉丁字元(ASCII 字元集)。 使用非 ASCII 字元會傳回錯誤。 - 使用
dbutils.notebookAPI 建立的工作必須在 30 天內完成。
run 使用情況
Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
程式語言 Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
在筆記本之間傳遞結構化數據
本章節說明如何在 Notebook 之間傳遞結構化資料。
Python
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
import json
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
程式語言 Scala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
處理錯誤
本章節說明如何處理錯誤。
Python
# Errors throw a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print("Retrying error", e)
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
程式語言 Scala
// Errors throw a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here, we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
同時執行多個 Notebook
你可以同時運行多個筆記本,使用標準 Scala 和 Python 結構,例如 Threads(
- 下載下列四個筆記本。 這些筆記本是以 Scala 撰寫的。
- 將 Notebook 匯入工作區中的單一資料夾。
- 執行並行運行 Notebook。