本页提供在 Azure Databricks 上使用作业计划结构化流式处理作业工作负载的建议。
Databricks 建议始终配置以下内容:
- 从返回结果的笔记本中删除不必要的代码,例如
display和count。 - 不要使用通用计算运行结构化流式处理工作负载。 始终使用作业计算将流作为作业来计划。
- 使用
Continuous模式计划作业。 这指的是Azure Databricks作业计划功能,而不是结构化流式处理trigger 间隔。 - 不要为结构化流式处理作业启用计算自动缩放。
某些工作负载会受益于以下功能:
在 Azure Databricks - 有状态查询的异步状态检查点
- 什么是异步进度跟踪?
Azure Databricks引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明式管道。
注意
计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率。
:::note 无服务器计算
在无服务器计算中,仅 Trigger.AvailableNow() 受支持且 Trigger.Once() 受支持。 Databricks 建议使用Trigger.AvailableNow()。
对于无服务器计算上的连续流式处理,请在连续模式下使用触发与连续管道模式。
请参阅 流式处理限制。
:::
设计流式处理工作负载来应对失败
Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)要求将结构化流式处理工作负载配置为自动重试。 请参阅如何配置结构化流式处理作业以在失败时重启流式查询。
有些操作(例如 foreachBatch)提供至少一次(而不是恰好一次)保证。 对于这些操作,请确保处理管道是幂等的。 请参阅使用 foreachBatch 将内容写入到任意数据接收器。
注意
当查询重启时,将会处理在之前运行中计划的微批处理。 如果您的作业由于内存不足错误导致失败,或者您因微批次过大而手动取消作业,则可能需要升级计算资源,以便成功处理微批次。
如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复。
作业何时重试?
可以将多个任务安排为Azure Databricks作业的一部分。 使用连续触发器配置作业时,无法设置任务之间的依赖项。
可选择使用以下方法之一在单个作业中计划多个流:
- 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
- 多查询:在单个任务的源代码中定义多个流式处理查询。
还可以组合使用这些策略。 下表比较了这些方法。
| 策略 | 多个任务 | 多个查询 |
|---|---|---|
| 如何共享计算? | Databricks 建议为每个流式处理任务部署适当大小的作业计算。 可以选择跨任务共享计算。 | 所有查询共享相同的计算。 可以选择将查询分配给 调度池。 |
| 如何处理重试? | 在作业重试之前,所有任务都必须失败。 | 如果任何查询失败,任务将会重试。 |
将结构化流式处理作业配置为在失败时重启流式处理查询
Databricks 建议使用连续触发器配置所有流式处理工作负载。 请参阅连续运行作业。
默认情况下,连续触发器具有以下行为:
- 防止作业同时多次运行。
- 在上一次运行失败时启动新的运行。
- 使用指数退避进行重试。
Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。
注意
Databricks 建议不要使用 streamingQuery.awaitTermination() 或 spark.streams.awaitAnyTermination()。 请参阅 何时使用 awaitTermination()。
何时使用 awaitTermination()
streamingQuery.awaitTermination() 和 spark.streams.awaitAnyTermination() 阻止当前线程,直到流式查询终止。 是否使用这些函数取决于执行环境。
对于 Databricks Jobs,请勿使用 streamingQuery.awaitTermination() 或 spark.streams.awaitAnyTermination()。 这些函数并不是必需的,因为作业服务会在流式查询处于活动状态时自动阻止运行完成。 这两个函数都阻止笔记本单元格完成执行,并阻止 Jobs 服务跟踪流式处理查询,这会影响积压指标和作业通知。
在以下情况下使用 awaitTermination() :
| 用例 | 行为 |
|---|---|
| 用于全用途计算的交互式笔记本 |
awaitTermination() 使单元格保持运行状态,使你能够观察查询状态,并确保笔记本输出中的故障浮出水面。 |
| 本地和开发环境 | 在本地运行 Spark 程序时,当主线程完成时,进程将退出。 调用 awaitTermination() 以使程序保持活动状态,直到流式处理查询完成或失败。 |
| 故障蔓延至驱动程序 | 如果没有 awaitTermination(),那么在非作业上下文中的流式查询失败可能不会传播到调用线程。 查询可能会以无提示方式失败,从而使故障更难检测和诊断。 调用 awaitTermination() 会重新引发驱动程序上的查询异常。 |
将计划程序池用于多个流式处理查询
可以配置调度池,以便在从同一源代码运行多个流式查询时,将计算能力分配给查询。
默认情况下,笔记本中启动的所有查询都在同一个公平调度池中运行。 由触发器根据笔记本中的所有流式处理查询生成的 Apache Spark 作业将按照先入先出 (FIFO) 的顺序逐一运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。
计划程序池允许您声明哪些结构化流式处理查询共享计算资源。
以下示例将query1分配到专用池,而query2和query3共享一个计划程序池。
# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")
# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")
# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")
注意
本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。
有关 Apache 公平调度器池的详细信息,请参阅 Apache 公平调度器文档。