Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ponowne partycjonowanie stanu na żądanie umożliwia zmianę liczby partycji w stanowym zapytaniu Structured Streaming bez utraty stanu punktu kontrolnego.
Bez ponownego partycjonowania stanu na żądanie należy ustawić liczbę partycji mieszania podczas tworzenia punktu kontrolnego. Jeśli zmienisz metodę spark.sql.shuffle.partitions, zapytania z istniejącymi punktami kontrolnymi ignorują nową wartość. Zastosowanie nowej liczby partycji wymaga ponownego uruchomienia zapytania przy użyciu nowego punktu kontrolnego.
Ponowne partycjonowanie stanu na żądanie ma następujące korzyści:
- Dostrajanie zapytań przez zmianę rozmiaru partycji bez ponownego kompilowania punktu kontrolnego.
- Skalowanie zapytań w górę lub w dół w celu dopasowania do zmian obciążenia.
Wymagania
- Databricks Runtime 18.3 lub nowszy.
- Zapytanie musi używać dostawcy magazynu stanów bazy danych RocksDB. W systemie DBR 17.3 lub nowszym baza danych RocksDB jest domyślnym dostawcą magazynu stanów. Zobacz Konfigurowanie magazynu stanów bazy danych RocksDB w usłudze Azure Databricks.
Zmienianie liczby partycji
Użyj konfiguracji Spark spark.sql.streaming.stateStore.partitions i uruchom ponownie zapytanie, aby zmienić liczbę partycji shuffle i stanu przesyłania strumieniowego:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
W przypadku zapytań stanowych spark.sql.streaming.stateStore.partitions ma pierwszeństwo przed spark.sql.shuffle.partitions. Po ponownym uruchomieniu zapytania i zakończeniu ostatniej zaplanowanej mikropartii zapytanie wykonuje operację repartycjonowania w celu redystrybucji danych stanu do nowej liczby partycji. Po zakończeniu operacji repartycjonowania przetwarzanie zapytania zostaje wznowione.
Monitoruj stan repartycjonowania
Po zakończeniu następnego mikrobajtu zdarzenia StreamingQueryProgress obejmują czas trwania operacji ponownego partycjonowania. W metrykach durationMs zdarzenia controlBatch.REPARTITION pokazuje wartość czasu trwania w milisekundach. Większe rozmiary stanów mogą wydłużyć czas ponownego partycjonowania. Zobacz Monitorowanie zapytań przesyłania strumieniowego ze strukturą w Azure Databricks.
Example
Poniższy przykład skaluje zapytanie w dół z 200 do 100 partycji mieszania. Zatrzymaj zapytanie, ustaw nową liczbę partycji i uruchom ponownie:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()