Запуск записной книжки Databricks из другой записной книжки
Внимание
Для оркестрации записных книжек используйте задания Databricks. Для сценариев модульизации кода используйте файлы рабочей области. Методы, описанные в этой статье, следует использовать только в том случае, если вариант использования не может быть реализован с помощью задания Databricks, например для циклического цикла записных книжек по динамическому набору параметров или если у вас нет доступа к файлам рабочей области. Дополнительные сведения см. в разделе "Планирование и оркестрация рабочих процессов" и совместного использования кода.
%run
Сравнение иdbutils.notebook.run()
Эта %run
команда позволяет включить в записную книжку другую записную книжку. Можно использовать %run
для разделения кода, например путем размещения вспомогательных функций в отдельной записной книжке. Его также можно использовать для сцепления записных книжек, которые реализуют определенные операции анализа. При использовании %run
вызванная записная книжка выполняется немедленно, а функции и переменные, определенные в ней, становятся доступными в вызывающей записной книжке.
dbutils.notebook
API является дополнением %run
к тому, что он позволяет передавать параметры и возвращать значения из записной книжки. Это позволяет создавать сложные рабочие процессы и конвейеры с зависимостями. Например, можно получить список файлов в каталоге и передать их в другую записную книжку, что невозможно с помощью %run
. Кроме того, можно создавать рабочие процессы если-то-иначе на основе возвращаемых значений или вызывать другие записные книжки с помощью относительных путей.
В отличие от %run
метод dbutils.notebook.run()
запускает новое задание для запуска записной книжки.
Эти методы, как и все интерфейсы API dbutils
, доступны только в Python и Scala. Тем не менее можно использовать dbutils.notebook.run()
для вызова записной книжки R.
Использование %run
для импорта записной книжки
В этом примере первая записная книжка определяет функцию, reverse
которая доступна во второй записной книжке после использования %run
магии для выполнения shared-code-notebook
.
Так как обе эти записные книжки находятся в одном каталоге в рабочей области, используйте префикс ./
в ./shared-code-notebook
, чтобы указать, что путь должен разрешаться относительно текущей записной книжки. Записные книжки можно упорядочить в каталоги, например %run ./dir/notebook
, или использовать абсолютный путь, например %run /Users/username@organization.com/directory/notebook
.
Примечание.
%run
должна находиться в ячейке отдельно, так как она полностью выполняет всю записную книжку.- Использовать
%run
для запуска файла Python и сущностейimport
, определенных в этом файле, в записной книжке нельзя. Сведения об импорте из файла Python см. в разделе "Модульная настройка кода с помощью файлов". Кроме того, вы можете упаковать файл в библиотеку Python, создать библиотеку Azure Databricks на основе этой библиотеки Python, а затем установить библиотеку в кластер, используемый для выполнения записной книжки. - Когда вы используете
%run
для выполнения записной книжки, содержащей мини-приложения, по умолчанию указанная записная книжка запускается со значениями по умолчанию для мини-приложения. Вы также можете передавать входные значения в мини-приложения (см. раздел Использование мини-приложений Databricks с %run).
API dbutils.notebook
Методы, доступные dbutils.notebook
в API, и run
exit
. Параметры и возвращаемые значения должны быть строками.
run(path: String, timeout_seconds: int, arguments: Map): String
Запускает записную книжку и возвращает значение выхода. Метод запускает кратковременное задание, которое выполняется немедленно.
timeout_seconds
Параметр управляет временем ожидания выполнения (0 означает отсутствие времени ожидания): вызов run
вызывает исключение, если задание не завершается в течение заданного времени. Если Azure Databricks не работает более 10 минут, запуск записной книжки завершается сбоем независимо от timeout_seconds
.
Параметр arguments
задает значения мини-приложения целевой записной книжки. В частности, если в работающей записной книжке содержится мини-приложение A
, и вы передаете ("A": "B")
пары "ключ — значение" в составе параметра аргументов в вызове run()
, то при извлечении значения мини-приложение A
вернет значение "B"
. Инструкции по созданию мини-приложений и работе с ними см. в статье о мини-приложениях.
Примечание.
- Параметр
arguments
принимает только латинские символы (кодировка ASCII). Использование символов, отличных от ASCII, возвращает ошибку. - Задания, созданные
dbutils.notebook
с помощью API, должны выполняться в 30 дней или меньше.
run
Использование
Python
dbutils.notebook.run("notebook-name", 60, {"argument": "data", "argument2": "data2", ...})
Scala
dbutils.notebook.run("notebook-name", 60, Map("argument" -> "data", "argument2" -> "data2", ...))
run
Пример
Предположим, что у вас есть записная книжка workflows
с мини-приложением foo
, которая выводит значение мини-приложения:
dbutils.widgets.text("foo", "fooDefault", "fooEmptyLabel")
print(dbutils.widgets.get("foo"))
При работе dbutils.notebook.run("workflows", 60, {"foo": "bar"})
создается следующий результат:
Мини-приложение имело значение, которое вы передали с помощью dbutils.notebook.run()
, "bar"
а не значение по умолчанию.
exit(value: String): void
Выход из записной книжки со значением. При вызове записной книжки с помощью метода run
это значение возвращается.
dbutils.notebook.exit("returnValue")
Вызов dbutils.notebook.exit
в задании приводит к успешному выполнению записной книжки. Если вы хотите вызвать сбой задания, вызовите исключение.
Пример
В следующем примере вы передаете аргументы в DataImportNotebook
и запускаете различные записные книжки (DataCleaningNotebook
или ErrorHandlingNotebook
) в зависимости от результата из DataImportNotebook
.
При запуске кода появится таблица, содержащая ссылку на запущенную записную книжку:
Чтобы просмотреть сведения о выполнении, щелкните ссылку "Время начала" в таблице. Если выполнение завершено, вы также можете просмотреть сведения о выполнении, щелкнув ссылку " Время окончания ".
Передача структурированных данных
В этом разделе показано, как передавать структурированные данные между записными книжками.
Python
# Example 1 - returning data through temporary views.
# You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
# return a name referencing data stored in a temporary view.
## In callee notebook
spark.range(5).toDF("value").createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
# Example 2 - returning data through DBFS.
# For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
## In callee notebook
dbutils.fs.rm("/tmp/results/my_data", recurse=True)
spark.range(5).toDF("value").write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
## In caller notebook
returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(spark.read.format("parquet").load(returned_table))
# Example 3 - returning JSON data.
# To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
## In callee notebook
import json
dbutils.notebook.exit(json.dumps({
"status": "OK",
"table": "my_data"
}))
## In caller notebook
import json
result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
print(json.loads(result))
Scala
// Example 1 - returning data through temporary views.
// You can only return one string using dbutils.notebook.exit(), but since called notebooks reside in the same JVM, you can
// return a name referencing data stored in a temporary view.
/** In callee notebook */
sc.parallelize(1 to 5).toDF().createOrReplaceGlobalTempView("my_data")
dbutils.notebook.exit("my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
val global_temp_db = spark.conf.get("spark.sql.globalTempDatabase")
display(table(global_temp_db + "." + returned_table))
// Example 2 - returning data through DBFS.
// For larger datasets, you can write the results to DBFS and then return the DBFS path of the stored data.
/** In callee notebook */
dbutils.fs.rm("/tmp/results/my_data", recurse=true)
sc.parallelize(1 to 5).toDF().write.format("parquet").save("dbfs:/tmp/results/my_data")
dbutils.notebook.exit("dbfs:/tmp/results/my_data")
/** In caller notebook */
val returned_table = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
display(sqlContext.read.format("parquet").load(returned_table))
// Example 3 - returning JSON data.
// To return multiple values, you can use standard JSON libraries to serialize and deserialize results.
/** In callee notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
// Exit with json
dbutils.notebook.exit(jsonMapper.writeValueAsString(Map("status" -> "OK", "table" -> "my_data")))
/** In caller notebook */
// Import jackson json libraries
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.fasterxml.jackson.databind.ObjectMapper
// Create a json serializer
val jsonMapper = new ObjectMapper with ScalaObjectMapper
jsonMapper.registerModule(DefaultScalaModule)
val result = dbutils.notebook.run("LOCATION_OF_CALLEE_NOTEBOOK", 60)
println(jsonMapper.readValue[Map[String, String]](result))
Обработка ошибок
В этом разделе показано, как обрабатывать ошибки.
Python
# Errors throw a WorkflowException.
def run_with_retry(notebook, timeout, args = {}, max_retries = 3):
num_retries = 0
while True:
try:
return dbutils.notebook.run(notebook, timeout, args)
except Exception as e:
if num_retries > max_retries:
raise e
else:
print("Retrying error", e)
num_retries += 1
run_with_retry("LOCATION_OF_CALLEE_NOTEBOOK", 60, max_retries = 5)
Scala
// Errors throw a WorkflowException.
import com.databricks.WorkflowException
// Since dbutils.notebook.run() is just a function call, you can retry failures using standard Scala try-catch
// control flow. Here we show an example of retrying a notebook a number of times.
def runRetry(notebook: String, timeout: Int, args: Map[String, String] = Map.empty, maxTries: Int = 3): String = {
var numTries = 0
while (true) {
try {
return dbutils.notebook.run(notebook, timeout, args)
} catch {
case e: WorkflowException if numTries < maxTries =>
println("Error, retrying: " + e)
}
numTries += 1
}
"" // not reached
}
runRetry("LOCATION_OF_CALLEE_NOTEBOOK", timeout = 60, maxTries = 5)
Одновременное выполнение нескольких записных книжек
Вы можете одновременно запустить несколько записных книжек, используя стандартные конструкции Scala и Python, такие как потоки (Scala, Python) и структуры Futures (Scala, Python). В примерах записных книжек показано, как использовать эти конструкции.
- Скачайте следующие 4 записных книжки. Записные книжки написаны в Scala.
- Импортируйте записные книжки в одну папку в рабочей области.
- Запустите записную книжку запуска одновременно .