Ponowne partycjonowanie stanu na żądanie dla stanowych zapytań przesyłania strumieniowego

Ponowne partycjonowanie stanu na żądanie umożliwia zmianę liczby partycji w stanowym zapytaniu Structured Streaming bez utraty stanu punktu kontrolnego.

Bez ponownego partycjonowania stanu na żądanie należy ustawić liczbę partycji mieszania podczas tworzenia punktu kontrolnego. Jeśli zmienisz metodę spark.sql.shuffle.partitions, zapytania z istniejącymi punktami kontrolnymi ignorują nową wartość. Zastosowanie nowej liczby partycji wymaga ponownego uruchomienia zapytania przy użyciu nowego punktu kontrolnego.

Ponowne partycjonowanie stanu na żądanie ma następujące korzyści:

  • Dostrajanie zapytań przez zmianę rozmiaru partycji bez ponownego kompilowania punktu kontrolnego.
  • Skalowanie zapytań w górę lub w dół w celu dopasowania do zmian obciążenia.

Wymagania

Zmienianie liczby partycji

Użyj konfiguracji Spark spark.sql.streaming.stateStore.partitions i uruchom ponownie zapytanie, aby zmienić liczbę partycji shuffle i stanu przesyłania strumieniowego:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

W przypadku zapytań stanowych spark.sql.streaming.stateStore.partitions ma pierwszeństwo przed spark.sql.shuffle.partitions. Po ponownym uruchomieniu zapytania i zakończeniu ostatniej zaplanowanej mikropartii zapytanie wykonuje operację repartycjonowania w celu redystrybucji danych stanu do nowej liczby partycji. Po zakończeniu operacji repartycjonowania przetwarzanie zapytania zostaje wznowione.

Monitoruj stan repartycjonowania

Po zakończeniu następnego mikrobajtu zdarzenia StreamingQueryProgress obejmują czas trwania operacji ponownego partycjonowania. W metrykach durationMs zdarzenia controlBatch.REPARTITION pokazuje wartość czasu trwania w milisekundach. Większe rozmiary stanów mogą wydłużyć czas ponownego partycjonowania. Zobacz Monitorowanie zapytań przesyłania strumieniowego ze strukturą w Azure Databricks.

Example

Poniższy przykład skaluje zapytanie w dół z 200 do 100 partycji mieszania. Zatrzymaj zapytanie, ustaw nową liczbę partycji i uruchom ponownie:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()