Statusherpartitionering op aanvraag voor stateful streamingquery's

Met statuspartitionering op aanvraag kunt u het formaat van het aantal partities voor een stateful Structured Streaming-query wijzigen zonder dat de status van het controlepunt verloren gaat.

Zonder herpartitionering op aanvraag stelt u het aantal willekeurige partities in tijdens het maken van het controlepunt. Als u wijzigt spark.sql.shuffle.partitions, negeren query's met bestaande controlepunten de nieuwe waarde. Als u een nieuw aantal partities toepast, moet u de query opnieuw starten met een nieuw controlepunt.

Herpartitionering van statussen op aanvraag heeft de volgende voordelen:

  • Optimaliseer query’s door het aantal partities aan te passen zonder het checkpoint opnieuw op te bouwen.
  • Schaal query's omhoog of omlaag zodat deze overeenkomen met wijzigingen in de workload.

Requirements

Het aantal partities wijzigen

Gebruik de Spark-configuratie spark.sql.streaming.stateStore.partitions en herstart de query om het aantal shuffle- en streamingstatuspartities te wijzigen:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Voor stateful query's heeft spark.sql.streaming.stateStore.partitions voorrang op spark.sql.shuffle.partitions. Nadat de query opnieuw is opgestart en de laatste geplande microbatch is voltooid, voert de query een herpartitionbewerking uit om statusgegevens opnieuw te distribueren naar het nieuwe aantal partities. Nadat de herpartitioneringsbewerking is voltooid, wordt de queryverwerking hervat.

Status van repartition controleren

Nadat de volgende microbatch is afgerond, omvatten StreamingQueryProgress-gebeurtenissen de duur van de herpartitioneringsbewerking. In de metrische gegevens durationMs van controlBatch.REPARTITION een gebeurtenis wordt de duurwaarde in milliseconden weergegeven. Grotere statusgrootten kunnen de tijd voor opnieuw partitioneren vergroten. Zie Het Bewaken van Structured Streaming-queries op Azure Databricks.

Example

In het volgende voorbeeld wordt een query teruggebracht van 200, de standaardinstelling, naar 100 shuffle-partities. Stop de query, stel het nieuwe aantal partities in en start het opnieuw op:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()