Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Durum bilgisi olan Yapılandırılmış Akış sorgusu, ara durum bilgilerinde artımlı güncelleştirmeler gerektirirken, durum bilgisi olmayan Bir Yapılandırılmış Akış sorgusu yalnızca kaynaktan havuza hangi satırların işlendiği hakkındaki bilgileri izler. Durum bilgisi olmayan sorgular için kullanılabilen iyileştirme özellikleri için bkz. Durum bilgisi olmayan akış sorgularını iyileştirme.
Durum bilgisi olan işlemler
Durum bilgisi olan işlemler arasında akış toplama, akışdropDuplicates, akışlar arası birleştirmeler ve özel durum bilgisi olan uygulamalar bulunur.
Durum bilgisi olan Yapılandırılmış Akış sorguları için gereken ara durum bilgileri, yanlış yapılandırılmışsa beklenmeyen gecikme süresine ve üretim sorunlarına yol açabilir.
Databricks Runtime 13.3 LTS ve üzeri sürümlerde, Yapılandırılmış Akış iş yükleri için denetim noktası süresini ve uçtan uca gecikme süresini azaltmak için RocksDB ile değişiklik günlüğü denetim noktası oluşturmayı etkinleştirebilirsiniz. Databricks, tüm Yapılandırılmış Akış durum bilgisi sorguları için değişiklik günlüğü denetim noktası oluşturmanın etkinleştirilmesini önerir. Bkz. Değişiklik günlüğü denetim noktasını etkinleştirme.
Durum bilgisi olan Yapılandırılmış Akış sorgularını optimize etme
Durum bilgisi olan Yapılandırılmış Akış sorgularının ara durum bilgilerini yönetmek beklenmeyen gecikme süresi ve üretim sorunlarını önlemeye yardımcı olabilir.
Databricks tarafından önerilenler:
- İşçiler olarak hesaplama için optimize edilmiş örnekleri kullanın.
- Karışık bölüm sayısını kümedeki çekirdek sayısının 1-2 katı olarak ayarlayın.
- SparkSession'da
spark.sql.streaming.noDataMicroBatches.enabledyapılandırmasınıfalseolarak ayarlayın. Bu, akış mikro-batch motorunun veri içermeyen mikro toplu işlemleri işlemesini önler. Ayrıca, bu yapılandırmanınfalseolarak ayarlanmasının, filigran kullanan durum bilgisine sahip işlemler ve işleme zaman aşımlarının hemen veri çıkışı almaması, bunun yerine yeni veriler gelene kadar beklemesiyle sonuçlanabileceğini unutmayın.
Databricks, durum bilgisi olan akışların durumunu yönetmek için değişiklik günlüğü denetim noktası oluşturma ile RocksDB kullanılmasını önerir. Bkz. Azure Databricks'te RocksDB durum depolarını yapılandırma.
Not
Sorgu yeniden başlatıldığında durum yönetimi düzeni değiştirilemez. Varsayılan yönetimle bir sorgu başlatıldıysa, durum deposunu değiştirmek için sorguyu yeni bir denetim noktası konumuyla sıfırdan yeniden başlatmanız gerekir.
Yapılandırılmış Akış'ta duruma duyarlı birden çok operatörle çalışma
Databricks Runtime 13.3 LTS ve üzerinde Azure Databricks, Yapılandırılmış Akış iş yüklerinde durum bilgisi olan işleçler için gelişmiş destek sunar. Artık durum bilgisi olan birden çok işleci zincirleyebilir, yani bir işlemin çıktısını, örneğin pencereli bir toplamanın sonucunu, birleştirme gibi başka bir durum bilgisi olan işleme besleyebilirsiniz.
Databricks Runtime 16.2 ve üzerinde, durum bilgisi olan birden çok işleç içeren iş yüklerinde transformWithState kullanabilirsiniz. Bkz. özel durum bilgisi olan bir uygulama oluşturma.
Aşağıdaki örneklerde kullanabileceğiniz çeşitli desenler gösterilmektedir.
Önemli
Durum bilgisi olan birden çok işleçle çalışırken aşağıdaki sınırlamalar vardır:
- Eski özel durum bilgisi olan işleçler (
FlatMapGroupWithStateveapplyInPandasWithStatedesteklenmez. - Yalnızca ekleme çıkış modu desteklenir.
Zincirleme zaman penceresi birleştirme
Piton
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Scala programlama dili
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
İki farklı akışta zaman penceresiyle toplama yapıldıktan sonra akışlar arası pencere birleştirme işlemi.
Piton
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Scala programlama dili
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Ardışık akış zaman aralığı birleşimi ve ardından zaman penceresi toplulaştırması
Piton
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Scala programlama dili
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Yapılandırılmış Akış için durum yeniden dengeleme
Durum yeniden dengeleme, Lakeflow Spark Bildirimli İşlem Hatlarındaki tüm akış iş yükleri için varsayılan olarak etkindir. Databricks Runtime 11.3 LTS ve üzeri sürümlerin yeniden dengelemesini etkinleştirmek için Spark kümesi yapılandırmasında aşağıdaki yapılandırma seçeneğini ayarlayabilirsiniz:
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Durum yeniden dengelemesi, küme yeniden boyutlandırma olaylarından geçen durum bilgisi olan Yapılandırılmış Akış işlem hatlarına faydalar sunar. Durum bilgisi olmayan akış işlemleri, değişen küme boyutlarına bakılmaksızın avantaj sağlamaz.
Not
İşlem otomatik ölçeklendirmesi, Yapılandırılmış Akış iş yükleri için küme boyutunu azaltmayla ilgili sınırlamalara sahiptir. Databricks, akış iş yükleri için geliştirilmiş otomatik ölçeklendirme ile Lakeflow Spark Bildirimli İşlem Hatlarının kullanılmasını önerir. Bkz. Otomatik Ölçeklendirme ile Lakeflow Spark Bildirimli İşlem Hatlarının küme kullanımını iyileştirme.
Küme yeniden boyutlandırma olayları durum yeniden dengelemeyi tetikler. Durum bulut depolamadan yeni yürütücülere yüklendiğinden, olayları yeniden dengeleme sırasında mikro toplu işlemlerin gecikme süresi daha yüksek olabilir.