Neupartitionierung des On-Demand-Zustands für zustandsbehaftete Streamingabfragen

Die Neupartitionierung des On-Demand-Zustands ermöglicht es Ihnen, die Größe der Partitionen für eine zustandsbehaftete Strukturierte Streaming-Abfrage zu ändern, ohne den Prüfpunktstatus zu verlieren.

Ohne bedarfsorientierte Neupartitionierung des Zustands legen Sie beim Erstellen des Prüfpunkts die Anzahl der Shuffle-Partitionen fest. Wenn Sie spark.sql.shuffle.partitions ändern, ignorieren Abfragen mit vorhandenen Prüfpunkten den neuen Wert. Wenn Sie eine neue Partitionsanzahl anwenden, müssen Sie die Abfrage mit einem neuen Prüfpunkt neu starten.

Die bedarfsgesteuerte Neuaufteilung des Zustands hat folgende Vorteile:

  • Optimieren Sie Abfragen, indem Sie die Anzahl der Partitionen anpassen, ohne den Checkpoint neu zu erstellen.
  • Skalieren Sie Abfragen nach oben oder unten, um Arbeitsauslastungsänderungen abzugleichen.

Requirements

Ändern der Anzahl von Partitionen

Verwenden Sie die Spark-Konfiguration spark.sql.streaming.stateStore.partitions , und starten Sie die Abfrage neu, um die Anzahl der Partitionen des Shuffle- und Streamingstatus zu ändern:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Für zustandsbehaftete Abfragen spark.sql.streaming.stateStore.partitions hat Vorrang vor spark.sql.shuffle.partitions. Nachdem die Abfrage neu gestartet und die letzte geplante Microbatch abgeschlossen wurde, führt die Abfrage einen Neupartitionsvorgang aus, um Zustandsdaten in die neue Anzahl von Partitionen neu zu verteilen. Nach Abschluss des Repartitionsvorgangs setzt die Abfrage die Verarbeitung fort.

Status der Neupartitionierung überwachen

Nach Abschluss des nächsten Mikrobatchs umfassen Ereignisse die Dauer des Repartitionsvorgangs StreamingQueryProgress . In den Metriken durationMs eines Ereignisses controlBatch.REPARTITION wird der Dauerwert in Millisekunden angezeigt. Eine größere Größe des Zustands kann die für die Repartitionierung benötigte Zeit verlängern. Siehe Überwachung strukturierter Streaming-Abfragen auf Azure Databricks.

Example

Im folgenden Beispiel wird eine Abfrage von 200, der Standard, auf 100 Shuffle-Partitionen skaliert. Beenden Sie die Abfrage, legen Sie die neue Partitionsanzahl fest, und starten Sie folgendes neu:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()