结构化流式处理检查点
检查点和预写日志协同工作,为结构化流式处理工作负载提供处理保证。 检查点用于跟踪可标识查询的信息,包括状态信息和已处理的记录。 删除检查点目录中的文件或更改到新的检查点位置时,查询的下一次运行将开始刷新。
每个查询必须具有不同的检查点位置。 多个查询不应使用相同的位置。
为结构化流式处理查询启用检查点
在运行流式处理查询之前必须指定 checkpointLocation
选项,如以下示例所示:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
注意
如果省略此选项,某些接收器(例如笔记本中 display()
的输出和 memory
接收器)会自动生成临时检查点位置。 临时检查点位置不能确保任何容错,也不能保证数据一致性,并且可能无法进行正常的清理。 Databricks 建议始终为这些接收器指定检查点位置。
在结构化流式处理查询发生更改后恢复
在从同一检查点位置进行的各次重启之间,对于流式处理查询中允许哪些更改存在限制。 下面的几种更改是不允许的,或者是更改效果未明确的。 对于它们:
- “允许”一词意味着你可以执行指定的更改,但其效果的语义是否明确取决于查询和更改。
- “不允许”一词意味着不应执行指定的更改,因为重启的查询可能会失败并出现不可预知的错误。
sdf
表示通过sparkSession.readStream
生成的流式处理数据帧/数据集。
结构化流式处理查询中的更改类型
- 输入源的数量或类型(即不同源)的更改:这是不允许的。
- 输入源的参数中的更改:是否允许这样做,以及更改的语义是否明确取决于源和查询。 以下是一些示例。
允许添加、删除和修改速率限制:
spark.readStream.format("kafka").option("subscribe", "article")
to
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
通常不允许更改已订阅的文章和文件,因为结果不可预知:不允许将
spark.readStream.format("kafka").option("subscribe", "article")
更改为spark.readStream.format("kafka").option("subscribe", "newarticle")
- 触发器间隔更改:可以在增量批处理和时间间隔之间更改触发器。 请参阅更改运行之间的触发器间隔。
- 输出接收器类型的更改:允许在几个特定的接收器组合之间进行更改。 这需要根据具体情况进行验证。 以下是一些示例。
- 允许从文件接收器更改为 Kafka 接收器。 Kafka 只会看到新数据。
- 不允许从 Kafka 接收器更改为文件接收器。
- 允许从 Kafka 接收器更改为 foreach,反之亦然。
- 输出接收器的参数中的更改:是否允许这样做,以及更改的语义是否明确取决于接收器和查询。 以下是一些示例。
- 不允许更改文件接收器的输出目录:不允许从
sdf.writeStream.format("parquet").option("path", "/somePath")
更改为sdf.writeStream.format("parquet").option("path", "/anotherPath")
- 对输出主题的更改是允许的:允许从
sdf.writeStream.format("kafka").option("topic", "topic1")
更改为sdf.writeStream.format("kafka").option("topic", "topic2")
- 允许更改用户定义的 foreach 接收器(即
ForeachWriter
代码),但更改的语义取决于代码。
- 不允许更改文件接收器的输出目录:不允许从
- 投影/筛选器/映射类操作中的更改:允许某些案例。 例如:
- 允许添加/删除筛选器:允许将
sdf.selectExpr("a")
修改为sdf.where(...).selectExpr("a").filter(...)
。 - 允许对具有相同输出架构的投影进行更改:允许将
sdf.selectExpr("stringColumn AS json").writeStream
更改为sdf.select(to_json(...).as("json")).writeStream
。 - 具有不同输出架构的投影中的更改是有条件允许的:只有当输出接收器允许架构从
"a"
更改为"b"
时,才允许从sdf.selectExpr("a").writeStream
到sdf.selectExpr("b").writeStream
的更改。
- 允许添加/删除筛选器:允许将
- 有状态操作的更改:流式处理查询中的某些操作需要保留状态数据才能持续更新结果。 结构化流式处理自动为容错存储(例如,DBFS、Azure Blob 存储)的状态数据设置检查点并在重启后对其进行还原。 但是,这假设状态数据的架构在重启后保持不变。 这意味着,在两次重启之间不允许对流式处理查询的有状态操作进行任何更改(即,添加、删除或架构修改)。 下面列出了不应在两次重启之间更改其架构以确保状态恢复的有状态操作:
- 流式处理聚合:例如,
sdf.groupBy("a").agg(...)
。 不允许对分组键或聚合的数量或类型进行任何更改。 - 流式处理去重:例如,
sdf.dropDuplicates("a")
。 不允许对分组键或聚合的数量或类型进行任何更改。 - 流间联接:例如
sdf1.join(sdf2, ...)
(即,两个输入都是通过sparkSession.readStream
生成的)。 不允许对架构或同等联接列进行更改。 不允许对联接类型(外部或内部)进行更改。 联接条件中的其他更改被视为错误定义。 - 任意监控状态的操作:例如,
sdf.groupByKey(...).mapGroupsWithState(...)
或sdf.groupByKey(...).flatMapGroupsWithState(...)
。 不允许对用户定义状态的架构和超时类型进行任何更改。 允许在用户定义的状态映射函数中进行任何更改,但更改的语义效果取决于用户定义的逻辑。 如果你确实想要支持状态架构更改,则可以使用支持架构迁移的编码/解码方案,将复杂的状态数据结构显式编码/解码为字节。 例如,如果你将状态另存为 Avro 编码的字节,则可以在两次查询重启之间更改 Avro 状态架构,因为这会还原二进制状态。
- 流式处理聚合:例如,