按需狀態重劃分允許你在不失去檢查點狀態的情況下,重新調整有狀態結構化串流查詢的分割區數量。
如果未使用依需求進行的狀態重新分割,就必須在建立檢查點時設定 shuffle 分割區的數量。 如果你更改 spark.sql.shuffle.partitions,帶有現有檢查點的查詢會忽略新值。 套用新的分割區計數需要你用新的檢查點重新開始查詢。
按需狀態重分配具有以下優點:
- 透過調整分割區數量來調校查詢,而無須重建檢查點。
- 根據工作負載的變化,調整查詢的規模。
Requirements
- Databricks Runtime 18.3 或更新版本。
- 查詢必須使用 RocksDB 狀態存放區提供者。 在 DBR 17.3 或以上版本中,RocksDB 是預設的狀態儲存服務提供者。 請參閱 《在 Azure Databricks 設定 RocksDB 狀態存放區》。
變更分割區數量
使用 Spark 設定 spark.sql.streaming.stateStore.partitions 並重新啟動查詢,以變更 shuffle 分割區和串流狀態分割區的數量:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
對於有狀態查詢,spark.sql.streaming.stateStore.partitions 優先於 spark.sql.shuffle.partitions。 查詢重新啟動且最後一個已規劃的微批次完成後,查詢會執行重新分割作業,將狀態資料重新分配到新的分割區數量中。 重新分割操作完成後,查詢會恢復處理。
監控重新分區狀態
下一個微批次完成後, StreamingQueryProgress 事件會包含重新分配操作的持續時間。 在事件指標 durationMs 中,顯示 controlBatch.REPARTITION 以毫秒為單位的持續時間值。 較大的州規模可能會增加重新劃分的時間。 請參見在 Azure Databricks 上監控結構化串流查詢。
Example
以下範例將查詢的 shuffle 分割區數量從預設的 200 調降為 100。 停止查詢,設定新的分割區數量,然後重新啟動:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()