Partisi ulang status sesuai permintaan untuk kueri streaming stateful

Partisi ulang status sesuai permintaan memungkinkan Anda mengubah ukuran jumlah partisi untuk kueri Streaming Terstruktur yang stateful tanpa kehilangan status titik pemeriksaan.

Tanpa partisi ulang status sesuai permintaan, Anda mengatur jumlah partisi acak selama pembuatan titik pemeriksaan. Jika Anda mengubah spark.sql.shuffle.partitions, kueri dengan titik pemeriksaan yang ada mengabaikan nilai baru. Menerapkan jumlah partisi baru mengharuskan Anda untuk memulai ulang kueri dengan titik pemeriksaan baru.

Repartisi state sesuai permintaan menawarkan manfaat berikut:

  • Sesuaikan kueri dengan mengubah ukuran jumlah partisi tanpa membangun kembali titik pemeriksaan.
  • Skalakan kueri ke atas atau ke bawah untuk mencocokkan perubahan beban kerja.

Requirements

Mengubah jumlah partisi

Gunakan konfigurasi Spark spark.sql.streaming.stateStore.partitions dan mulai ulang kueri untuk mengubah jumlah partisi shuffle dan status streaming:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Untuk kueri stateful, spark.sql.streaming.stateStore.partitions lebih diutamakan daripada spark.sql.shuffle.partitions. Setelah kueri dimulai kembali dan microbatch terakhir yang direncanakan selesai, kueri menjalankan operasi repartisi untuk mendistribusikan ulang data status ke jumlah partisi yang baru. Setelah operasi partisi ulang selesai, kueri melanjutkan pemrosesan.

Memantau status partisi ulang

Setelah mikro-batch berikutnya selesai, event StreamingQueryProgress akan mencakup durasi operasi pemartisian ulang. Dalam metrik peristiwa durationMs , controlBatch.REPARTITION menunjukkan nilai durasi dalam milidetik. Ukuran state yang lebih besar mungkin menambah waktu yang diperlukan untuk melakukan partisi ulang. Lihat Memantau kueri Streaming Terstruktur di Azure Databricks.

Example

Contoh berikut menurunkan jumlah partisi shuffle untuk kueri dari 200, yang merupakan nilai default, menjadi 100. Hentikan kueri, atur jumlah partisi baru, dan mulai ulang:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()