使用笔记本实用工具运行笔记本、并行运行多个笔记本或退出具有值的笔记本。 运行以下命令以概要了解可用的方法:
notebookutils.notebook.help()
下表列出了可用的笔记本运行和业务流程方法:
| 方法 | Signature | 说明 |
|---|---|---|
run |
run(path: str, timeout_seconds: int = 90, arguments: dict = None, workspace: str = ""): str |
运行笔记本并返回其退出值。 |
runMultiple |
runMultiple(dag: Any, config: dict = None): dict[str, dict[str, Any]] |
同时运行多个笔记本,同时支持依赖项关系。 |
validateDAG |
validateDAG(dag: Any): bool |
验证 DAG 定义是否正确结构化。 |
exit |
exit(value: str): None |
使用一个值退出并返回当前笔记本。 |
有关笔记本 CRUD 操作(创建、获取、更新、删除、列出),请参阅 “管理笔记本项目”。
注释
config参数runMultiple()仅在 Python 中可用。 Scala 和 R 不支持此参数。
注释
笔记本实用工具不适用于 Apache Spark 作业定义 (SJD)。
引用笔记本
该方法 run() 引用笔记本并返回其退出值。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。 被引用的笔记本将在调用此函数的笔记本的 Spark 池上运行。
notebookutils.notebook.run("notebook name", <timeout_seconds>, <arguments>, <workspace>)
例如:
notebookutils.notebook.run("Sample1", 90, {"input": 20 })
返回值
该方法 run() 返回在 notebookutils.notebook.exit(value) 子笔记本中传递的确切字符串。 如果在 exit() 子笔记本中未调用,则返回一个空字符串("")。
面料笔记本还支持通过指定 工作区 ID 来跨工作区引用笔记本。
notebookutils.notebook.run("Sample1", 90, {"input": 20 }, "fe0a6e2a-a909-4aa3-a698-0a651de790aa")
打开单元格输出中的快照链接以查看参考运行情况。 快照捕获运行结果,并帮助调试引用的笔记本。
设置子笔记本以接收参数
创建通过 run() 或 runMultiple()调用的子笔记本时,请设置参数单元格,以便笔记本可以从父级接收参数:
- 创建具有默认参数值的代码单元。
- 通过在笔记本 UI 中选择 “将单元格标记为参数 ”,将单元格标记为参数单元格。
- 在执行期间,参数单元格值将替换为从父级传递的参数。
# This cell should be marked as "parameters" cell
# Default values are overridden when the notebook is called
date = "2024-01-01"
region = "US"
小窍门
退出值始终是字符串。 如果需要父笔记本中的数值,在检索后转换结果(例如, int(result))。
注意事项
- 跨工作区引用笔记本由运行时版本 1.2 及更高版本支持。
- 如果使用笔记本资源下的文件,请在引用的笔记本中使用
notebookutils.nbResPath,以确保它指向与交互式运行相同的文件夹。 - 引用运行仅当子笔记本使用与父级相同的 Lakehouse、继承父级的 Lakehouse 或两者都未定义时,才能运行。 如果子级指定的湖仓与父笔记本的不同,则会阻止执行。 若要绕过此检查,请在参数中设置
useRootDefaultLakehouse: True。 - 不要在
try-catch块内调用notebookutils.notebook.exit(value)。 退出调用在异常处理中进行包装时不会生效。
并行运行多个笔记本
使用 notebookutils.notebook.runMultiple() 在并行或预定义的拓扑结构中运行多个 Jupyter 笔记本。 API 在 Spark 会话中使用多线程实现,这意味着引用的笔记本运行共享计算资源。
通过 notebookutils.notebook.runMultiple(),您可以:
同时执行多个笔记本,而无需等待每个笔记本完成。
使用简单的 JSON 格式为笔记本指定依赖项和执行顺序。
优化 Spark 计算资源的使用,并降低 Fabric 项目的成本。
在输出中查看每个笔记本运行记录的快照,并方便地调试/监视笔记本任务。
获取每个执行活动的退出值,并在下游任务中使用它们。
运行 notebookutils.notebook.help("runMultiple") 以查看更多示例和使用情况详细信息。
运行简单的笔记本列表
以下示例并行运行笔记本列表:
根笔记本中的执行结果如下所示:
返回值
该方法 runMultiple() 返回一个字典,其中每个键都是活动名称,每个值都是具有以下键的字典:
-
exitVal:子笔记本调用返回的exit()字符串;如果未exit()调用,则为空字符串。 -
exception:活动失败或None成功时出错对象。
使用 DAG 结构运行笔记本
下面的示例使用 notebookutils.notebook.runMultiple() 在 DAG 结构中运行笔记本。
# run multiple notebooks with parameters
DAG = {
"activities": [
{
"name": "Process_1", # activity name, must be unique
"path": "NotebookSimple", # notebook item name
"timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
"args": {"p1": "changed value", "p2": 100}, # notebook parameters
"workspace":"WorkspaceName" # both name and id are supported
},
{
"name": "Process_2",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 2", "p2": 200},
"workspace":"id" # both name and id are supported
},
{
"name": "Process_1.1",
"path": "NotebookSimple2",
"timeoutPerCellInSeconds": 120,
"args": {"p1": "changed value 3", "p2": 300},
"retry": 1,
"retryIntervalInSeconds": 10,
"dependencies": ["Process_1"] # list of activity names that this activity depends on
}
],
"timeoutInSeconds": 43200, # max timeout for the entire DAG, default to 12 hours
"concurrency": 12 # max number of notebooks to run concurrently, default to 3x CPU cores, 0 means unlimited
}
notebookutils.notebook.runMultiple(DAG, {"displayDAGViaGraphviz": False})
根笔记本中的执行结果如下所示:
DAG 参数参考
下表描述了可在 DAG 定义中使用的每个字段:
| 领域 | 级别 | 必需 | 说明 |
|---|---|---|---|
activities |
根 | 是的 | 定义要运行的笔记本的活动对象的列表。 |
timeoutInSeconds |
根 | 否 | 整个 DAG 的最大超时时间。 默认值为 43200 (12 小时)。 |
concurrency |
根 | 否 | 要并发运行的笔记本的最大数目。 默认值是可用 CPU 核心计数的 3 倍。 如果需要更严格的控制,请显式设置此值;或者使用 0 实现无限并发。 |
name |
活动 | 是的 | 活动的唯一名称。 用于标识结果并定义依赖项。 |
path |
活动 | 是的 | 要执行的笔记本项名称或路径。 |
timeoutPerCellInSeconds |
活动 | 否 | 子笔记本中每个单元格的最大超时时间。 默认值为 90 秒。 |
args |
活动 | 否 | 传递给子笔记本的参数配置字典。 |
workspace |
活动 | 否 | 笔记本所在的工作区名称或 ID。 默认情况下,子笔记本在与调用方相同的工作区中运行。 |
retry |
活动 | 否 | 如果活动失败,则重试尝试次数。 默认为 0。 |
retryIntervalInSeconds |
活动 | 否 | 重试尝试之间的等待时间(以秒为单位)。 默认为 0。 |
dependencies |
活动 | 否 | 必须在此活动开始之前完成的活动名称列表。 |
活动间出口值的参考
可以通过使用 @activity() 表达式来引用 args 字段中的依赖项活动的返回值。 此模式允许在 DAG 中的笔记本之间传递数据。
DAG = {
"activities": [
{
"name": "Extract",
"path": "ExtractData",
"timeoutPerCellInSeconds": 120,
"args": {"source": "prod_db"}
},
{
"name": "Transform",
"path": "TransformData",
"timeoutPerCellInSeconds": 180,
"args": {
"data_path": "@activity('Extract').exitValue()"
},
"dependencies": ["Extract"]
}
]
}
results = notebookutils.notebook.runMultiple(DAG)
小窍门
使用字段中的@activity('activity_name').exitValue()args表达式将一个活动的结果传递到 DAG 中的另一个活动。
生成动态 DAG
可以以编程的方式为跨多个分区的扇出处理等场景生成 DAG 结构。
def create_fan_out_dag(partitions):
activities = []
for partition in partitions:
activities.append({
"name": f"Process_{partition}",
"path": "ProcessPartition",
"timeoutPerCellInSeconds": 180,
"args": {"partition": partition}
})
activities.append({
"name": "Aggregate",
"path": "AggregateResults",
"timeoutPerCellInSeconds": 120,
"dependencies": [f"Process_{p}" for p in partitions]
})
return {"activities": activities, "concurrency": 25}
partitions = ["2024-01", "2024-02", "2024-03", "2024-04"]
dag = create_fan_out_dag(partitions)
results = notebookutils.notebook.runMultiple(dag)
验证 DAG
用于 validateDAG() 在执行之前验证 DAG 结构是否有效。 它捕获重复活动名称、缺少依赖项和循环引用等问题。
返回值
此方法 validateDAG() 返回 True DAG 结构是否有效,或者在验证失败时引发异常。
小窍门
始终先在生产工作流中调用validateDAG()runMultiple(),以尽早捕获结构错误。
处理“runMultiple”失败
该方法 runMultiple() 返回一个字典,其中每个键都是活动名称,每个值都包含一个 exitVal (字符串)和一个 exception (错误对象或 None)。 即使某些活动失败,也可以检查部分结果:
from notebookutils.common.exceptions import RunMultipleFailedException
try:
results = notebookutils.notebook.runMultiple(DAG)
except RunMultipleFailedException as ex:
results = ex.result
for activity_name, result in results.items():
if result["exception"]:
print(f"{activity_name} failed: {result['exception']}")
else:
print(f"{activity_name} succeeded: {result['exitVal']}")
注意事项
- 多个笔记本运行的并行度受限于 Spark 会话的总可用计算资源。
- 并发笔记本的默认数目是 可用 CPU 核心计数的 3 倍。 可以自定义此值,但由于计算资源使用率过高,过度并行可能会导致稳定性和性能问题。 如果出现相关问题,请考虑将笔记本拆分为多个
runMultiple调用,或者通过调整 DAG 参数中的“并发”字段来减少并发。 - 整个 DAG 的默认超时为 12 小时,子笔记本中每个单元格的默认超时时间为 90 秒。 可通过在 DAG 参数中设置“timeoutInSeconds”和“timeoutPerCellInSeconds”字段更改超时。
- 配置
retry和retryIntervalInSeconds以应对因暂时性问题(例如网络超时或临时服务不可用)可能导致失败的活动。 - 并行笔记本在单个 Spark 会话中共享计算资源。 监视资源利用率,以避免内存压力和 CPU 争用。
退出笔记本应用程序
该exit()方法返回一个值,退出笔记本。 可以在笔记本中以交互方式或在管道中运行嵌套函数调用。
以交互方式从笔记本调用
exit()函数时,Fabric 笔记本会引发异常,跳过运行后续单元格,并使 Spark 会话保持活动状态。在调用
exit()函数的管道中协调笔记本时,笔记本活动会返回退出值。 这会完成管道运行并停止 Spark 会话。在被引用的笔记本中调用函数
exit()时,Fabric Spark 会停止对引用的笔记本的进一步执行,并继续在调用该run()函数的主笔记本中运行下一个单元格。 例如:Notebook1 有三个单元格,调用第二个单元格中的exit()函数。 Notebook2 有五个单元格,调用第三个单元格中的run(notebook1)函数。 运行 Notebook2 时,Notebook1 在调用exit()函数时会在第二个单元格处停止。 Notebook2 会继续运行其第四个和第五个代码单元格。
返回行为
该方法 exit() 不返回值。 它会终止当前笔记本,并将提供的字符串传递给调用笔记本或管道。
注释
该 exit() 函数覆盖当前单元格输出。 为了避免丢失其他代码语句的输出,请调用单独的单元格中的 notebookutils.notebook.exit()。
重要
不要在try-catch块内调用notebookutils.notebook.exit()。 退出在被异常处理包装时不会生效。 调用 exit() 必须位于代码的顶层才能正常工作。
例如:
Sample1 笔记本包含以下两个单元格:
单元格 1 定义 input 参数,默认值设为 10。
单元 2 以input作为退出值退出笔记本。
可以在另一个具有默认值的笔记本中运行 Sample1 :
exitVal = notebookutils.notebook.run("Sample1")
print (exitVal)
输出:
10
可以在另一个笔记本中运行 Sample1 ,并将 输入 值设置为 20:
exitVal = notebookutils.notebook.run("Sample1", 90, {"input": 20 })
print (exitVal)
输出:
20