针对有状态流式处理查询的按需状态重新分区

按需状态重新分区允许您在不丢失检查点状态的情况下,更改有状态的 Structured Streaming 查询的分区数量。

如果不进行按需状态重新分区,可以在创建检查点期间设置随机分区数。 如果更改 spark.sql.shuffle.partitions,具有现有检查点的查询将忽略新值。 应用新的分区计数要求使用新的检查点重启查询。

按需状态重新分区具有以下优点:

  • 无需重建检查点即可通过调整分区数量来优化查询。
  • 纵向扩展或缩减查询以匹配工作负荷更改。

Requirements

更改分区数

使用 Spark 配置 spark.sql.streaming.stateStore.partitions 并重启查询,以更改混洗分区和流式状态分区的数量:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

对于有状态查询, spark.sql.streaming.stateStore.partitions 优先于 spark.sql.shuffle.partitions. 查询重启并完成最后一个计划的微分块后,查询将运行重新分区操作,将状态数据重新分发到新的分区数。 重新分区操作完成后,查询将恢复处理。

监控重新分区状态

下一个微批处理完成后, StreamingQueryProgress 事件包括重新分区操作的持续时间。 在事件的 durationMs 指标中, controlBatch.REPARTITION 以毫秒为单位显示持续时间值。 状态规模更大可能会增加重新分区所需的时间。 请参阅 在 Azure Databricks 上监视结构化流查询

Example

以下示例将查询从 200(默认值)缩减为 100 个随机分区。 停止查询,设置新的分区计数,然后重启:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()