使用任务值在任务之间传递信息
任务值是指 Databricks 实用程序 taskValues
的子实用工具,你可以用它在 Databricks 作业中的任务之间传递任意值。 请参阅 taskValues 子实用工具 (dbutils.jobs.taskValues)。
可以在一个任务中使用 dbutils.jobs.taskValues.set()
指定键值对,然后使用任务名称和键引用后续任务中的值。
注意
子实用工具 dbutils.jobs.taskValues.set()
和 dbutils.jobs.taskValues.get()
仅在 Python 笔记本中可用。 可以使用支持参数的所有任务的动态值引用来引用任务值。 请参阅引用任务值。
设置任务值
使用子实用工具 dbutils.jobs.taskValues.set()
在 Python 笔记本中设置任务值。
任务值键必须是字符串。 如果你在笔记本中定义了多个任务值,则每个键必须是唯一的。
可以手动或以编程方式将任务值分配给键。 仅允许可以表示为有效 JSON 的值。 值的 JSON 表示形式的大小不能超过 48 KiB。
例如,以下示例为键 fave_food
设置静态字符串:
dbutils.jobs.taskValues.set(key = "fave_food", value = "beans")
以下示例使用笔记本任务参数查询特定订单号的所有记录,并返回当前订单状态和记录总数:
from pyspark.sql.functions import col
order_num = dbutils.widgets.get("order_num")
query = (spark.read.table("orders")
.orderBy(col("updated"), ascending=False)
.select(col("order_status"))
.where(col("order_num") == order_num)
)
dbutils.jobs.taskValues.set(key = "record_count", value = query.count())
dbutils.jobs.taskValues.set(key = "order_status", value = query.take(1)[0][0])
可以使用此模式传递值列表,然后使用它们协调下游逻辑,例如为每个任务。 请参阅在循环中运行参数化 Azure Databricks 作业任务。
以下示例将产品 ID 的非重复值提取到 Python 列表,并将其设置为任务值:
prod_list = list(spark.read.table("products").select("prod_id").distinct().toPandas()["prod_id"])
dbutils.jobs.taskValues.set(key = "prod_list", value = prod_list)
引用任务值
Databricks 建议将任务值引用为使用动态值引用模式 {{tasks.<task_name>.values.<value_name>}}
配置的任务参数。
例如,要从名为 prod_list
的任务中引用具有键 product_inventory
的任务值,请使用语法 {{tasks.product_inventory.values.prod_list}}
。
使用 dbutils.jobs.taskValues.get
语法 dbutils.jobs.taskValues.get()
需要指定上游任务名称。 不建议使用此语法,因为可以在多个下游任务中使用任务值,这意味着,如果任务名称更改,需要进行大量更新。
使用此语法,可以选择指定 default
值和 debugValue
。 如果找不到键,则使用默认值。 使用 debugValue
,你可以在将笔记本计划为任务之前在笔记本中手动进行代码开发和测试期间设置要使用的静态值。
以下示例将获取任务名称 order_status
中设置的键 order_lookup
的值。 仅当以交互方式运行笔记本时,才会返回值 Delivered
。
order_status = dbutils.jobs.taskValues.get(taskKey = "order_lookup", key = "order_status", debugValue = "Delivered")
注意
Databricks 不建议设置默认值,因为由于缺少键或错误命名的任务,它们可能难以排查并阻止预期的错误消息。
查看任务值
每个运行的任务值的返回值均显示在“任务运行详细信息”的“输出”面板中。 请参阅查看任务运行历史记录。