通过


结构化流式处理的生产注意事项

关于在 Azure Databricks 上使用作业来计划结构化流式处理工作负载,本文提供了一些建议。

Databricks 建议始终执行以下操作:

  • 从返回结果的笔记本中删除不必要的代码,例如 displaycount
  • 不要使用通用计算运行结构化流式处理工作负载。 始终使用作业计算将流作为作业来计划。
  • 使用 Continuous 模式计划作业。
  • 不要为结构化流式处理作业的计算启用自动缩放。

某些工作负载会受益于以下功能:

Azure Databricks 引入了 Lakeflow Spark 声明性管道,以减少管理结构化流式处理工作负荷的生产基础结构的复杂性。 Databricks 建议对新的结构化流管道使用 Lakeflow Spark 声明式管道。 请参阅 Lakeflow Spark 声明性管道

注意

计算自动缩放在缩减结构化流式处理工作负载的群集大小方面存在限制。 Databricks 建议使用具有增强自动缩放功能的 Lakeflow Spark 声明性管道来处理流式工作负载。 请参阅 使用自动缩放优化 Lakeflow Spark 声明性管道的群集利用率

设计流式处理工作负载来应对失败

Databricks 建议始终将流式处理作业配置为在失败时自动重启。 某些功能(包括架构演变)假定结构化流式处理工作负载已配置为自动重试。 请参阅如何配置结构化流式处理作业以在失败时重启流式查询

有些操作(例如 foreachBatch)提供至少一次(而不是恰好一次)保证。 对于这些操作,应确保你的处理管道是幂等的。 请参阅使用 foreachBatch 将内容写入到任意数据接收器

注意

当查询重启时,将会处理在之前运行中计划的微批处理。 如果您的作业由于内存不足错误导致失败,或者您因微批次过大而手动取消作业,则可能需要升级计算资源,以便成功处理微批次。

如果在运行之间更改了配置,这些配置将应用于计划的第一个新批处理。 请参阅在结构化流式处理查询发生更改后恢复

作业何时重试?

您可以将多个任务安排在一个 Azure Databricks 作业中。 使用连续触发器配置作业时,无法设置任务之间的依赖项。

可选择使用以下方法之一在单个作业中计划多个流:

  • 多任务:定义一个具有多个任务的作业,这些任务会使用连续触发器运行流式处理工作负载。
  • 多查询:在单个任务的源代码中定义多个流式处理查询。

还可以组合使用这些策略。 下表比较了这些方法。

策略: 多个任务 多个查询
如何共享计算? Databricks 建议为每个流式处理任务部署适当大小的作业计算。 可以选择跨任务共享计算。 所有查询共享相同的计算。 可以选择性地将查询分配给计划程序池
如何处理重试? 在作业重试之前,所有任务都必须失败。 如果任何查询失败,任务将会重试。

将结构化流式处理作业配置为在失败时重启流式处理查询

Databricks 建议使用连续触发器配置所有流式处理工作负载。 请参阅连续运行作业

连续触发器默认提供以下行为:

  • 防止作业同时多次运行。
  • 在上一次运行失败时启动新的运行。
  • 使用指数退避进行重试。

Databricks 建议在计划工作流时始终使用作业计算而不是通用计算。 在作业失败并重试时,将会部署新的计算资源。

注意

不需要使用 streamingQuery.awaitTermination()spark.streams.awaitAnyTermination()。 当流式处理查询处于活动状态时,作业会自动防止运行完成。

将计划程序池用于多个流式处理查询

可以通过配置调度池,在从同一源代码运行多个流式查询时,将计算能力分配给查询。

默认情况下,笔记本中启动的所有查询都在同一个公平调度池中运行。 由触发器根据笔记本中的所有流式处理查询生成的 Apache Spark 作业将按照先入先出 (FIFO) 的顺序逐一运行。 这可能会导致查询中产生不必要的延迟,因为它们不能有效地共享群集资源。

计划程序池允许您声明哪些结构化流式处理查询共享计算资源。

以下示例将 query1 分配给专用池,而 query2query3 共享一个计划程序池。

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

注意

本地属性配置必须位于你启动流式处理查询时所在的笔记本单元中。

有关更多详细信息,请参阅 Apache 公平计划程序文档