Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Met statuspartitionering op aanvraag kunt u het formaat van het aantal partities voor een stateful Structured Streaming-query wijzigen zonder dat de status van het controlepunt verloren gaat.
Zonder herpartitionering op aanvraag stelt u het aantal willekeurige partities in tijdens het maken van het controlepunt. Als u wijzigt spark.sql.shuffle.partitions, negeren query's met bestaande controlepunten de nieuwe waarde. Als u een nieuw aantal partities toepast, moet u de query opnieuw starten met een nieuw controlepunt.
Herpartitionering van statussen op aanvraag heeft de volgende voordelen:
- Optimaliseer query’s door het aantal partities aan te passen zonder het checkpoint opnieuw op te bouwen.
- Schaal query's omhoog of omlaag zodat deze overeenkomen met wijzigingen in de workload.
Requirements
- Databricks Runtime 18.3 of hoger.
- De query moet de RocksDB-provider voor statusopslag gebruiken. Op DBR 17.3 of hoger is RocksDB de standaardprovider voor de statusopslag. Zie Opslag van RocksDB-status in Azure Databricks configureren.
Het aantal partities wijzigen
Gebruik de Spark-configuratie spark.sql.streaming.stateStore.partitions en herstart de query om het aantal shuffle- en streamingstatuspartities te wijzigen:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
Voor stateful query's heeft spark.sql.streaming.stateStore.partitions voorrang op spark.sql.shuffle.partitions. Nadat de query opnieuw is opgestart en de laatste geplande microbatch is voltooid, voert de query een herpartitionbewerking uit om statusgegevens opnieuw te distribueren naar het nieuwe aantal partities. Nadat de herpartitioneringsbewerking is voltooid, wordt de queryverwerking hervat.
Status van repartition controleren
Nadat de volgende microbatch is afgerond, omvatten StreamingQueryProgress-gebeurtenissen de duur van de herpartitioneringsbewerking. In de metrische gegevens durationMs van controlBatch.REPARTITION een gebeurtenis wordt de duurwaarde in milliseconden weergegeven. Grotere statusgrootten kunnen de tijd voor opnieuw partitioneren vergroten. Zie Het Bewaken van Structured Streaming-queries op Azure Databricks.
Example
In het volgende voorbeeld wordt een query teruggebracht van 200, de standaardinstelling, naar 100 shuffle-partities. Stop de query, stel het nieuwe aantal partities in en start het opnieuw op:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()