用於有狀態串流查詢的按需狀態重劃分

按需狀態重劃分允許你在不失去檢查點狀態的情況下,重新調整有狀態結構化串流查詢的分割區數量。

如果未使用依需求進行的狀態重新分割,就必須在建立檢查點時設定 shuffle 分割區的數量。 如果你更改 spark.sql.shuffle.partitions,帶有現有檢查點的查詢會忽略新值。 套用新的分割區計數需要你用新的檢查點重新開始查詢。

按需狀態重分配具有以下優點:

  • 透過調整分割區數量來調校查詢,而無須重建檢查點。
  • 根據工作負載的變化,調整查詢的規模。

Requirements

  • Databricks Runtime 18.3 或更新版本。
  • 查詢必須使用 RocksDB 狀態存放區提供者。 在 DBR 17.3 或以上版本中,RocksDB 是預設的狀態儲存服務提供者。 請參閱 《在 Azure Databricks 設定 RocksDB 狀態存放區》。

變更分割區數量

使用 Spark 設定 spark.sql.streaming.stateStore.partitions 並重新啟動查詢,以變更 shuffle 分割區和串流狀態分割區的數量:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

對於有狀態查詢,spark.sql.streaming.stateStore.partitions 優先於 spark.sql.shuffle.partitions。 查詢重新啟動且最後一個已規劃的微批次完成後,查詢會執行重新分割作業,將狀態資料重新分配到新的分割區數量中。 重新分割操作完成後,查詢會恢復處理。

監控重新分區狀態

下一個微批次完成後, StreamingQueryProgress 事件會包含重新分配操作的持續時間。 在事件指標 durationMs 中,顯示 controlBatch.REPARTITION 以毫秒為單位的持續時間值。 較大的州規模可能會增加重新劃分的時間。 請參見在 Azure Databricks 上監控結構化串流查詢

Example

以下範例將查詢的 shuffle 分割區數量從預設的 200 調降為 100。 停止查詢,設定新的分割區數量,然後重新啟動:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()