Durum bilgisi olan akış sorguları için isteğe bağlı durum yeniden bölümleme

İsteğe bağlı durum yeniden bölümleme, durum bilgisi olan bir Yapılandırılmış Akış sorgusunun bölüm sayısını denetim noktası durumunu kaybetmeden yeniden boyutlandırmanıza olanak tanır.

İsteğe bağlı durum yeniden bölümleme olmadan, denetim noktası oluşturma sırasında karıştırma bölümlerinin sayısını ayarlarsınız. spark.sql.shuffle.partitions değerini değiştirirseniz, mevcut denetim noktaları olan sorgular bu yeni değeri yok sayar. Yeni bölüm sayısı uygulamak için sorguyu yeni bir denetim noktasıyla yeniden başlatmanız gerekir.

İsteğe bağlı durum yeniden bölümleme aşağıdaki avantajlara sahiptir:

  • Denetim noktasını yeniden oluşturmadan bölüm sayısını yeniden boyutlandırarak sorguları ayarlayın.
  • İş yükü değişikliklerini eşleştirmek için sorguların ölçeğini artırma veya azaltma.

Gereksinimler

Bölüm sayısını değiştirme

Spark yapılandırmasını spark.sql.streaming.stateStore.partitions kullanın ve karıştırma ve akış durumu bölümlerinin sayısını değiştirmek için sorguyu yeniden başlatın:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Durum bilgisi olan sorgular için spark.sql.streaming.stateStore.partitions önceliklidir spark.sql.shuffle.partitions. Sorgu yeniden başlatıldıktan ve planlanan son mikrobatch tamamlandıktan sonra sorgu, durum verilerini yeni bölüm sayısına yeniden dağıtmak için bir yeniden bölümleme işlemi çalıştırır. Yeniden bölümleme işlemi tamamlandıktan sonra sorgu işlenmeye devam eder.

Yeniden bölümleme durumu izleme

Bir sonraki mikro grup tamamlandıktan sonra, StreamingQueryProgress olayları yeniden bölümleme işleminin süresini içerir. Bir olayın durationMs ölçümlerinde, controlBatch.REPARTITION süre değerini milisaniye cinsinden gösterir. Daha büyük durum boyutları yeniden bölümlendirim süresini artırabilir. Bkz. Azure Databricks'te Yapılandırılmış Akış sorgularını izleme.

Example

Aşağıdaki örnek, sorgunun ölçeğini varsayılan olarak 200 olan 100 karışık bölüme düşürmektedir. Sorguyu durdurun, yeni bölüm sayısını ayarlayın ve yeniden başlatın:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()