Gerçek zamanlı modu ayarlama

Bu sayfada, Yapılandırılmış Akış'ta gerçek zamanlı mod sorguları çalıştırmak için gereken önkoşullar ve yapılandırma açıklanmaktadır. Adım adım bir kılavuz için bkz: Gerçek zamanlı akış iş yükü çalıştırma Kılavuzu. Gerçek zamanlı mod hakkında kavramsal bilgi için bkz. Yapılandırılmış Akış'ta gerçek zamanlı mod.

Önkoşullar

Gerçek zamanlı modu kullanmak için, işlemlerinizi aşağıdaki gereksinimleri karşılayacak şekilde yapılandırmanız gerekir:

  • Klasik işlem kullanın. Ayrılmış ve standart erişim modları desteklenir. Standart erişim modu yalnızca Python için desteklenir. Lakeflow Spark Bildirimli İşlem Hatları ve sunucusuz kümeler desteklenmez.
  • Databricks Runtime 16.4 LTS ve üzerini kullanın.
  • Otomatik ölçeklendirmeyi kapatın.
  • Foton'u kapatın.
  • spark.databricks.streaming.realTimeMode.enabled seçeneğini true olarak ayarlayın.
  • Kesintileri önlemek için spot örnekleri kapatın.

UDF'leri olan gecikmeye duyarlı iş yükleri için Databricks ayrılmış erişim modunu kullanmanızı önerir. Bkz. Tablo işlevleri.

Klasik işlem oluşturma ve yapılandırma yönergeleri için bkz. İşlem yapılandırma başvurusu.

Sorgu yapılandırması

Bir sorguyu gerçek zamanlı modda çalıştırmak için gerçek zamanlı tetikleyiciyi etkinleştirmeniz gerekir. Gerçek zamanlı tetikleyiciler yalnızca güncelleştirme modunda desteklenir.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Hesaplama boyutlandırma

İşlemde yeterli görev yuvası varsa işlem kaynağı başına bir gerçek zamanlı iş çalıştırabilirsiniz.

Düşük gecikme modunda çalışmak için toplam kullanılabilir görev yuvası sayısı tüm sorgu aşamalarındaki görev sayısından büyük veya buna eşit olmalıdır.

Slot hesaplama örnekleri

İşlem hattı türü Konfigürasyon Gerekli yuvalar
Tek aşamalı durum bilgisi olmayan (Kafka kaynağı + havuz) maxPartitions = 8 8 slota
durum bilgisi olan iki aşamalı (Kafka kaynağı + karıştırma) maxPartitions = 8, karışık bölümler = 20 28 yuva (8 + 20)
Üç aşamalı (Kafka kaynağı + karıştırma + yeniden bölümleme) maxPartitions = 8, her biri 20 aşamadan oluşan iki karıştırma aşaması 48 yuva (8 + 20 + 20)

ayarlamazsanız maxPartitionsKafka konu başlığındaki bölüm sayısını kullanın.

Ek kaynaklar