按需状态重新分区允许您在不丢失检查点状态的情况下,更改有状态的 Structured Streaming 查询的分区数量。
如果不进行按需状态重新分区,可以在创建检查点期间设置随机分区数。 如果更改 spark.sql.shuffle.partitions,具有现有检查点的查询将忽略新值。 应用新的分区计数要求使用新的检查点重启查询。
按需状态重新分区具有以下优点:
- 无需重建检查点即可通过调整分区数量来优化查询。
- 纵向扩展或缩减查询以匹配工作负荷更改。
Requirements
- Databricks Runtime 18.3 或更高版本。
- 查询必须使用 RocksDB 状态存储提供程序。 在 DBR 17.3 或更高版本上,RocksDB 是默认的状态存储提供程序。 请参阅 在 Azure Databricks 上配置 RocksDB 状态存储。
更改分区数
使用 Spark 配置 spark.sql.streaming.stateStore.partitions 并重启查询,以更改混洗分区和流式状态分区的数量:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
对于有状态查询, spark.sql.streaming.stateStore.partitions 优先于 spark.sql.shuffle.partitions. 查询重启并完成最后一个计划的微分块后,查询将运行重新分区操作,将状态数据重新分发到新的分区数。 重新分区操作完成后,查询将恢复处理。
监控重新分区状态
下一个微批处理完成后, StreamingQueryProgress 事件包括重新分区操作的持续时间。 在事件的 durationMs 指标中, controlBatch.REPARTITION 以毫秒为单位显示持续时间值。 状态规模更大可能会增加重新分区所需的时间。 请参阅 在 Azure Databricks 上监视结构化流查询。
Example
以下示例将查询从 200(默认值)缩减为 100 个随机分区。 停止查询,设置新的分区计数,然后重启:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()