Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Die Neupartitionierung des On-Demand-Zustands ermöglicht es Ihnen, die Größe der Partitionen für eine zustandsbehaftete Strukturierte Streaming-Abfrage zu ändern, ohne den Prüfpunktstatus zu verlieren.
Ohne bedarfsorientierte Neupartitionierung des Zustands legen Sie beim Erstellen des Prüfpunkts die Anzahl der Shuffle-Partitionen fest. Wenn Sie spark.sql.shuffle.partitions ändern, ignorieren Abfragen mit vorhandenen Prüfpunkten den neuen Wert. Wenn Sie eine neue Partitionsanzahl anwenden, müssen Sie die Abfrage mit einem neuen Prüfpunkt neu starten.
Die bedarfsgesteuerte Neuaufteilung des Zustands hat folgende Vorteile:
- Optimieren Sie Abfragen, indem Sie die Anzahl der Partitionen anpassen, ohne den Checkpoint neu zu erstellen.
- Skalieren Sie Abfragen nach oben oder unten, um Arbeitsauslastungsänderungen abzugleichen.
Requirements
- Databricks Runtime 18.3 oder höher.
- Die Abfrage muss den RocksDB-Statusspeicheranbieter verwenden. Bei DBR 17.3 oder höher ist RocksDB der Standardanbieter für den Zustandsspeicher. Weitere Informationen finden Sie unter Konfigurieren des RocksDB-Statusspeichers auf Azure Databricks.
Ändern der Anzahl von Partitionen
Verwenden Sie die Spark-Konfiguration spark.sql.streaming.stateStore.partitions , und starten Sie die Abfrage neu, um die Anzahl der Partitionen des Shuffle- und Streamingstatus zu ändern:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
Für zustandsbehaftete Abfragen spark.sql.streaming.stateStore.partitions hat Vorrang vor spark.sql.shuffle.partitions. Nachdem die Abfrage neu gestartet und die letzte geplante Microbatch abgeschlossen wurde, führt die Abfrage einen Neupartitionsvorgang aus, um Zustandsdaten in die neue Anzahl von Partitionen neu zu verteilen. Nach Abschluss des Repartitionsvorgangs setzt die Abfrage die Verarbeitung fort.
Status der Neupartitionierung überwachen
Nach Abschluss des nächsten Mikrobatchs umfassen Ereignisse die Dauer des Repartitionsvorgangs StreamingQueryProgress . In den Metriken durationMs eines Ereignisses controlBatch.REPARTITION wird der Dauerwert in Millisekunden angezeigt. Eine größere Größe des Zustands kann die für die Repartitionierung benötigte Zeit verlängern. Siehe Überwachung strukturierter Streaming-Abfragen auf Azure Databricks.
Example
Im folgenden Beispiel wird eine Abfrage von 200, der Standard, auf 100 Shuffle-Partitionen skaliert. Beenden Sie die Abfrage, legen Sie die neue Partitionsanzahl fest, und starten Sie folgendes neu:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()