オンデマンド状態の再パーティション分割を使用すると、チェックポイントの状態を失うことなく、ステートフル構造化ストリーミング クエリのパーティション数のサイズを変更できます。
オンデマンドの状態再パーティション化を使用しない場合は、チェックポイントの作成時にシャッフルパーティションの数を設定します。
spark.sql.shuffle.partitionsを変更すると、既存のチェックポイントを持つクエリは新しい値を無視します。 新しいパーティション数を適用するには、新しいチェックポイントを使用してクエリを再起動する必要があります。
状態をオンデマンドで再パーティション化することには、次の利点があります。
- チェックポイントを再構築せずにパーティションの数をサイズ変更して、クエリを調整します。
- ワークロードの変更に合わせてクエリをスケールアップまたはスケールダウンします。
Requirements
- Databricks Runtime 18.3 以降。
- クエリでは、RocksDB ステート ストア プロバイダーを使用する必要があります。 DBR 17.3 以降では、RocksDB が既定の状態ストア プロバイダーです。 「Azure Databricks で RocksDB 状態ストアを構成する」をご覧ください。
パーティションの数を変更する
spark 構成 spark.sql.streaming.stateStore.partitions 使用し、クエリを再起動して、シャッフル状態パーティションとストリーミング状態パーティションの数を変更します。
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
ステートフル クエリの場合、 spark.sql.streaming.stateStore.partitions は spark.sql.shuffle.partitionsよりも優先されます。 クエリが再起動され、最後に計画されたマイクロバッチが完了すると、クエリは再パーティション操作を実行して、状態データを新しい数のパーティションに再配布します。 再パーティション化操作が完了すると、クエリの処理が再開されます。
再パーティションの状態を監視する
次のマイクロバッチの完了後、StreamingQueryProgress イベントには再パーティション操作の所要時間が含まれます。 イベントの durationMs メトリックでは、 controlBatch.REPARTITION は時間の値をミリ秒単位で表示します。 状態サイズが大きい場合、再パーティション化にかかる時間が長くなる可能性があります。
Azure Databricks での構造化ストリーミング クエリの監視を参照してください。
Example
次の例では、クエリを既定の 200 から 100 シャッフル パーティションにスケールダウンします。 クエリを停止し、新しいパーティション数を設定して再起動します。
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()