Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfada, Yapılandırılmış Akış'ta gerçek zamanlı mod sorguları çalıştırmak için gereken önkoşullar ve yapılandırma açıklanmaktadır. Adım adım bir kılavuz için bkz: Gerçek zamanlı akış iş yükü çalıştırma Kılavuzu. Gerçek zamanlı mod hakkında kavramsal bilgi için bkz. Yapılandırılmış Akış'ta gerçek zamanlı mod.
Önkoşullar
Gerçek zamanlı modu kullanmak için, işlemlerinizi aşağıdaki gereksinimleri karşılayacak şekilde yapılandırmanız gerekir:
- Klasik işlem kullanın. Ayrılmış ve standart erişim modları desteklenir. Standart erişim modu yalnızca Python için desteklenir. Lakeflow Spark Bildirimli İşlem Hatları ve sunucusuz kümeler desteklenmez.
- Databricks Runtime 16.4 LTS ve üzerini kullanın.
- Otomatik ölçeklendirmeyi kapatın.
- Foton'u kapatın.
-
spark.databricks.streaming.realTimeMode.enabledseçeneğinitrueolarak ayarlayın. - Kesintileri önlemek için spot örnekleri kapatın.
UDF'leri olan gecikmeye duyarlı iş yükleri için Databricks ayrılmış erişim modunu kullanmanızı önerir. Bkz. Tablo işlevleri.
Klasik işlem oluşturma ve yapılandırma yönergeleri için bkz. İşlem yapılandırma başvurusu.
Sorgu yapılandırması
Bir sorguyu gerçek zamanlı modda çalıştırmak için gerçek zamanlı tetikleyiciyi etkinleştirmeniz gerekir. Gerçek zamanlı tetikleyiciler yalnızca güncelleştirme modunda desteklenir.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Hesaplama boyutlandırma
İşlemde yeterli görev yuvası varsa işlem kaynağı başına bir gerçek zamanlı iş çalıştırabilirsiniz.
Düşük gecikme modunda çalışmak için toplam kullanılabilir görev yuvası sayısı tüm sorgu aşamalarındaki görev sayısından büyük veya buna eşit olmalıdır.
Slot hesaplama örnekleri
| İşlem hattı türü | Konfigürasyon | Gerekli yuvalar |
|---|---|---|
| Tek aşamalı durum bilgisi olmayan (Kafka kaynağı + havuz) |
maxPartitions = 8 |
8 slota |
| durum bilgisi olan iki aşamalı (Kafka kaynağı + karıştırma) |
maxPartitions = 8, karışık bölümler = 20 |
28 yuva (8 + 20) |
| Üç aşamalı (Kafka kaynağı + karıştırma + yeniden bölümleme) |
maxPartitions = 8, her biri 20 aşamadan oluşan iki karıştırma aşaması |
48 yuva (8 + 20 + 20) |
ayarlamazsanız maxPartitionsKafka konu başlığındaki bölüm sayısını kullanın.