Aracılığıyla paylaş


Durum bilgisi olan akış nedir?

Bu sayfada durum bilgili Yapılandırılmış Akış sorguları, durum bilgili işlemler, iyileştirme önerileri, durum bilgili birden çok işleci zincirleme ve durum dengesini yeniden ayarlama konuları açıklanmaktadır.

Durum bilgisi olan Yapılandırılmış Akış sorgusu, ara durum bilgilerinde artımlı güncelleştirmeler gerektirirken, durum bilgisi olmayan Bir Yapılandırılmış Akış sorgusu yalnızca kaynaktan havuza hangi satırların işlendiği hakkındaki bilgileri izler. Durum bilgisi olmayan sorgular için kullanılabilen iyileştirme özellikleri için bkz. Durum bilgisi olmayan akış sorgularını iyileştirme.

Durum bilgisi olan işlemler

Durum bilgisi olan işlemler arasında akış toplama, distinct, dropDuplicates, akışlar arası birleşimler ve durum bilgisi olan özel uygulamalar bulunur.

Durum bilgisi olan Yapılandırılmış Akış sorguları için gereken ara durum bilgileri, yanlış yapılandırılmışsa beklenmeyen gecikme süresine ve üretim sorunlarına yol açabilir.

Databricks Runtime 13.3 LTS veya sonraki sürümlerinde, Yapılandırılmış Akış iş yükleri için denetim noktası süresini ve uçtan uca gecikme süresini azaltmak için RocksDB ile değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilirsiniz. Databricks, tüm Yapılandırılmış Akış durum bilgisi sorguları için değişiklik günlüğü denetim noktası oluşturmanın etkinleştirilmesini önerir. Bkz. Değişiklik günlüğü denetim noktasını etkinleştirme.

Durum bilgisi olan Yapılandırılmış Akış sorgularını iyileştirme

Databricks, durum bilgisi olan Yapılandırılmış Akış sorguları için aşağıdakileri önerir:

  • İşçiler olarak hesaplama için optimize edilmiş örnekleri kullanın.
  • Karışık bölüm sayısını kümedeki çekirdek sayısının 1-2 katı olarak ayarlayın.

Önemli

Kontrol noktası oluşturulurken karıştırma bölümlerinin sayısı sabitlenir. spark.sql.shuffle.partitions değiştirmek, zaten denetim noktası olan bir streaming sorgusunu etkilemez — sorgu, özgün bölüm sayısını kullanmaya devam eder. Yeni bölüm sayısı uygulamak için sorguyu yeni bir denetim noktası konumuyla başlatmanız gerekir.

Databricks Runtime 18.0 ve üzeri sürümlerde durum bilgisi olmayan akış sorguları, yeni bir denetim noktası gerektirmeden dinamik karıştırma bölümü değişikliklerini destekler.

  • SparkSession'da spark.sql.streaming.noDataMicroBatches.enabled yapılandırmasını false olarak ayarlayın. Bu, akış mikro toplu işleme motorunun veri içermeyen mikro toplu işleri işlemesini önler. Bu yapılandırmanın false olarak ayarlanması, filigran kullanan veya işlem zamanı zaman aşımı kullanan durum bilgisi olan işlemlerin, yeni veriler gelene kadar anında değil de veri çıkışı almamasıyla sonuçlanabilir.

Databricks, durum bilgisi olan akışların durumunu yönetmek için değişiklik günlüğü denetim noktası oluşturma ile RocksDB kullanılmasını önerir. Bkz. Azure Databricks'te RocksDB durum depolarını yapılandırma.

Not

Durum yönetimi şeması, sorgu yeniden başlatıldığı zamanlarda değiştirilemez. Varsayılan yönetimle bir sorgu başlatıldıysa, durum deposunu değiştirmek için sorguyu yeni bir denetim noktası konumuyla sıfırdan yeniden başlatmanız gerekir.

Yapılandırılmış Akış'ta durum bilgisi olan birden çok işleçle çalışma

Databricks Runtime 13.3 LTS veya sonraki sürümlerinde Azure Databricks, Yapılandırılmış Akış iş yüklerindeki durum bilgisi olan işleçler için gelişmiş destek sunar. Durum bilgisi olan birden çok operatörü birbirine bağlayabilirsiniz; başka bir deyişle, pencereli toplama gibi bir işlemin çıktısını, birleştirme gibi durum bilgisi olan başka bir işleme besleyebilirsiniz.

Databricks Runtime 16.2 veya sonraki sürümlerinde, durum bilgisi olan birden çok işleç içeren iş yüklerinde kullanabilirsiniz transformWithState . Bkz. özel durum bilgisi olan bir uygulama oluşturma.

Aşağıdaki örneklerde kullanabileceğiniz çeşitli desenler gösterilmektedir.

Önemli

Durum bilgisi olan birden çok işleçle çalışırken aşağıdaki sınırlamalar vardır:

  • Eski özel durum bilgisi olan işleçler (FlatMapGroupWithState ve applyInPandasWithState) desteklenmez.
  • Yalnızca ekleme çıkış modu desteklenir.

Zincirleme zaman penceresi birleştirme

Piton

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Scala programlama dili

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

İki farklı akışta zaman penceresiyle toplama yapıldıktan sonra akışlar arası pencere birleştirme işlemi.

Piton

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Scala programlama dili

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Ardışık akış zaman aralığı birleşimi ve ardından zaman penceresi toplulaştırması

Piton

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Scala programlama dili

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Yapılandırılmış Akış için durum yeniden dengeleme

Durum yeniden dengeleme, Lakeflow Spark Bildirimli İşlem Hatlarındaki tüm akış iş yükleri için varsayılan olarak etkindir. Databricks Runtime 11.3 LTS veya sonraki sürümlerinde, spark kümesi yapılandırmasında durum yeniden dengelemeyi etkinleştirmek için aşağıdaki yapılandırma seçeneğini ayarlayabilirsiniz:

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Durum yeniden dengelemesi, küme yeniden boyutlandırma olaylarından geçen durum bilgisi olan Yapılandırılmış Akış işlem hatlarına faydalar sunar. Durum bilgisi olmayan akış işlemleri, küme boyutlarının değişmesine bakılmaksızın avantaj elde edemez.

Not

İşlem otomatik ölçeklendirmesi, Yapılandırılmış Akış iş yükleri için küme boyutunu azaltmayla ilgili sınırlamalara sahiptir. Databricks, akış iş yükleri için geliştirilmiş otomatik ölçeklendirme ile Lakeflow Spark Bildirimli İşlem Hatlarının kullanılmasını önerir. Bkz. Otomatik Ölçeklendirme ile Lakeflow Spark Bildirimli İşlem Hatlarının küme kullanımını iyileştirme.

Küme yeniden boyutlandırma olayları durum yeniden dengelemeyi tetikler. Durum bulut depolamadan yeni yürütücülere yüklendiğinden, olayları yeniden dengeleme sırasında mikro toplu işlemlerin gecikme süresi daha yüksek olabilir.