Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Partisi ulang status sesuai permintaan memungkinkan Anda mengubah ukuran jumlah partisi untuk kueri Streaming Terstruktur yang stateful tanpa kehilangan status titik pemeriksaan.
Tanpa partisi ulang status sesuai permintaan, Anda mengatur jumlah partisi acak selama pembuatan titik pemeriksaan. Jika Anda mengubah spark.sql.shuffle.partitions, kueri dengan titik pemeriksaan yang ada mengabaikan nilai baru. Menerapkan jumlah partisi baru mengharuskan Anda untuk memulai ulang kueri dengan titik pemeriksaan baru.
Repartisi state sesuai permintaan menawarkan manfaat berikut:
- Sesuaikan kueri dengan mengubah ukuran jumlah partisi tanpa membangun kembali titik pemeriksaan.
- Skalakan kueri ke atas atau ke bawah untuk mencocokkan perubahan beban kerja.
Requirements
- Databricks Runtime 18.3 atau lebih tinggi.
- Kueri harus menggunakan penyedia penyimpanan status RocksDB. Pada DBR 17.3 atau lebih tinggi, RocksDB adalah penyedia penyimpanan status default. Lihat Mengonfigurasi penyimpanan status RocksDB di Azure Databricks.
Mengubah jumlah partisi
Gunakan konfigurasi Spark spark.sql.streaming.stateStore.partitions dan mulai ulang kueri untuk mengubah jumlah partisi shuffle dan status streaming:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
Untuk kueri stateful, spark.sql.streaming.stateStore.partitions lebih diutamakan daripada spark.sql.shuffle.partitions. Setelah kueri dimulai kembali dan microbatch terakhir yang direncanakan selesai, kueri menjalankan operasi repartisi untuk mendistribusikan ulang data status ke jumlah partisi yang baru. Setelah operasi partisi ulang selesai, kueri melanjutkan pemrosesan.
Memantau status partisi ulang
Setelah mikro-batch berikutnya selesai, event StreamingQueryProgress akan mencakup durasi operasi pemartisian ulang. Dalam metrik peristiwa durationMs , controlBatch.REPARTITION menunjukkan nilai durasi dalam milidetik. Ukuran state yang lebih besar mungkin menambah waktu yang diperlukan untuk melakukan partisi ulang. Lihat Memantau kueri Streaming Terstruktur di Azure Databricks.
Example
Contoh berikut menurunkan jumlah partisi shuffle untuk kueri dari 200, yang merupakan nilai default, menjadi 100. Hentikan kueri, atur jumlah partisi baru, dan mulai ulang:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()