本文介绍如何为 Azure Databricks 上的结构化流配置触发器间隔。
Apache Spark 结构化流式处理增量地处理数据。 触发器间隔控制结构化流式处理检查新数据的频率。 可以为准实时处理、计划的数据库刷新或批处理一天或一周的所有新数据配置触发器间隔。
由于 什么是自动加载程序? 使用结构化流式处理来加载数据,因此了解触发器的工作原理使你能够灵活地控制成本,同时以所需的频率引入数据。
触发器模式概述
下表总结了结构化流式处理中可用的触发器模式:
| 触发模式 | 语法示例 (Python) | 最适用于 |
|---|---|---|
| 未指定 (默认值) | 不适用 | 具有 3-5 秒延迟的常规用途流式处理。 等效于使用 0 毫秒间隔的 processingTime 触发器。 只要新数据到达,流处理就持续运行。 |
| 处理时间 | .trigger(processingTime='10 seconds') |
平衡成本和性能。 通过防止系统过于频繁地检查数据来减少开销。 |
| 现已推出 | .trigger(availableNow=True) |
计划的增量批处理。 在触发流式处理作业时处理尽可能多的数据。 |
| 实时模式 | .trigger(realTime='5 minutes') |
超低延迟的操作负载需要毫秒级处理,例如欺诈检测或实时个性化。 公共预览版。 “5 分钟”表示微批处理的长度。 用 5 分钟时间将每个批次的开销(例如查询编译)降到最低。 |
| 连续 | .trigger(continuous='1 second') |
不支持。 这是 Spark OSS 中包含的实验性功能。 请改用实时模式。 |
processingTime:基于时间的触发间隔
结构化流式处理将基于时间的触发器间隔称为“固定间隔微批处理”。 使用 processingTime 关键字,将持续时间指定为字符串,例如 .trigger(processingTime='10 seconds')。
此间隔的配置确定系统执行检查的频率,以查看新数据是否已到达。 配置处理时间以平衡延迟要求和数据到达源的速率。
AvailableNow:增量批处理
重要
在 Databricks Runtime 11.3 LTS 及更高版本中, Trigger.Once 已弃用。 使用 Trigger.AvailableNow 处理所有增量批处理工作负荷。
触发器 AvailableNow 选项会将所有可用记录作为增量批进行处理,并且可以使用选项(例如 maxBytesPerTrigger)来配置批大小。 尺寸选项因数据源而异。
支持的数据源
Azure Databricks 支持使用 Trigger.AvailableNow 来从许多结构化流式处理源进行增量批处理。 下表包含每个数据源所需的最低受支持 Databricks Runtime 版本:
| 源 | Databricks Runtime最低版本 |
|---|---|
| 文件源(JSON、Parquet 等) | 9.1 LTS |
| Delta Lake | 10.4 LTS |
| 自动加载器 | 10.4 LTS |
| Apache Kafka | 10.4 LTS |
| 动动力 | 13.1 |
realTime:超低延迟的工作负载
重要
此功能目前以公共预览版提供。
结构化流式处理实时模式在尾部实现 1 秒以下的端到端延迟,在常见情况下约为 300 毫秒。 有关如何有效配置和使用实时模式的更多详细信息,请参阅 结构化流式处理中的实时模式。
Apache Spark 有一个称为 连续处理的附加触发器间隔。 自 Spark 2.3 以来,此模式已归类为实验模式。 Azure Databricks 不支持或推荐此模式。 对于低延迟用例,请使用实时模式。
注意
此页上的连续处理模式与 Lakeflow Spark 声明性管道中的连续处理无关。
更改运行之间的触发器间隔
你可以在使用同一检查点时更改运行之间的触发间隔。
更改间隔时的行为
如果结构化流作业在处理微批次时停止,则该微批次必须在新的触发间隔应用之前完成。 因此,在更改触发器间隔后,您可能会观察到按照先前指定的设置进行的微批处理。 下面描述了转换时的预期行为:
从基于时间的间隔转换到
AvailableNow: 微批处理可能会在将所有可用记录作为增量批处理处理之前进行处理。从
AvailableNow过渡到基于时间的间隔: 处理可能会继续处理上次AvailableNow作业触发时可用的所有记录。 这是预期的行为。
从查询失败中恢复
注意
如果尝试从与增量批处理关联的查询失败中恢复,则更改触发器间隔无法解决此问题,因为批处理仍必须完成。 增加计算容量以处理批处理,从而尝试解决问题。 在极少数情况下,可能需要使用新的检查点重新启动流。