Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
İsteğe bağlı durum yeniden bölümleme, durum bilgisi olan bir Yapılandırılmış Akış sorgusunun bölüm sayısını denetim noktası durumunu kaybetmeden yeniden boyutlandırmanıza olanak tanır.
İsteğe bağlı durum yeniden bölümleme olmadan, denetim noktası oluşturma sırasında karıştırma bölümlerinin sayısını ayarlarsınız.
spark.sql.shuffle.partitions değerini değiştirirseniz, mevcut denetim noktaları olan sorgular bu yeni değeri yok sayar. Yeni bölüm sayısı uygulamak için sorguyu yeni bir denetim noktasıyla yeniden başlatmanız gerekir.
İsteğe bağlı durum yeniden bölümleme aşağıdaki avantajlara sahiptir:
- Denetim noktasını yeniden oluşturmadan bölüm sayısını yeniden boyutlandırarak sorguları ayarlayın.
- İş yükü değişikliklerini eşleştirmek için sorguların ölçeğini artırma veya azaltma.
Gereksinimler
- Databricks Runtime 18.3 veya üzeri.
- Sorgunun RocksDB durum deposu sağlayıcısını kullanması gerekir. DBR 17.3 veya üzerinde RocksDB varsayılan durum deposu sağlayıcısıdır. Bkz. Azure Databricks'te RocksDB durum depolarını yapılandırma.
Bölüm sayısını değiştirme
Spark yapılandırmasını spark.sql.streaming.stateStore.partitions kullanın ve karıştırma ve akış durumu bölümlerinin sayısını değiştirmek için sorguyu yeniden başlatın:
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
Durum bilgisi olan sorgular için spark.sql.streaming.stateStore.partitions önceliklidir spark.sql.shuffle.partitions. Sorgu yeniden başlatıldıktan ve planlanan son mikrobatch tamamlandıktan sonra sorgu, durum verilerini yeni bölüm sayısına yeniden dağıtmak için bir yeniden bölümleme işlemi çalıştırır. Yeniden bölümleme işlemi tamamlandıktan sonra sorgu işlenmeye devam eder.
Yeniden bölümleme durumu izleme
Bir sonraki mikro grup tamamlandıktan sonra, StreamingQueryProgress olayları yeniden bölümleme işleminin süresini içerir. Bir olayın durationMs ölçümlerinde, controlBatch.REPARTITION süre değerini milisaniye cinsinden gösterir. Daha büyük durum boyutları yeniden bölümlendirim süresini artırabilir. Bkz. Azure Databricks'te Yapılandırılmış Akış sorgularını izleme.
Example
Aşağıdaki örnek, sorgunun ölçeğini varsayılan olarak 200 olan 100 karışık bölüme düşürmektedir. Sorguyu durdurun, yeni bölüm sayısını ayarlayın ve yeniden başlatın:
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()