Menyiapkan mode real-time

Halaman ini menjelaskan prasyarat dan konfigurasi yang diperlukan untuk menjalankan kueri mode real time di Streaming Terstruktur. Untuk tutorial langkah demi langkah, lihat Tutorial: Menjalankan beban kerja streaming real time. Untuk informasi konseptual tentang mode real time, lihat Mode real time di Streaming Terstruktur.

Prasyarat

Untuk menggunakan mode real time, Anda harus mengonfigurasi komputasi untuk memenuhi persyaratan berikut:

  • Gunakan komputasi klasik. Mode akses khusus dan standar didukung. Mode akses standar hanya didukung untuk Python. Alur Deklaratif Lakeflow Spark dan kluster tanpa server tidak didukung.
  • Gunakan Databricks Runtime 16.4 LTS ke atas.
  • Nonaktifkan penskalakan otomatis.
  • Matikan Photon.
  • Atur spark.databricks.streaming.realTimeMode.enabled ke true.
  • Nonaktifkan instans spot untuk menghindari gangguan.

Untuk beban kerja sensitif latensi dengan UDF, Databricks merekomendasikan agar Anda menggunakan mode akses khusus. Lihat Fungsi tabel.

Untuk petunjuk tentang membuat dan mengonfigurasi komputasi klasik, lihat Referensi konfigurasi komputasi.

Konfigurasi kueri

Untuk menjalankan kueri dalam mode real time, Anda harus mengaktifkan pemicu real-time. Pemicu real time hanya didukung dalam mode pembaruan.

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

Pemilihan ukuran

Anda dapat menjalankan satu pekerjaan waktu nyata untuk setiap sumber daya komputasi jika sumber daya tersebut memiliki slot tugas yang cukup.

Untuk berjalan dalam mode latensi rendah, jumlah total slot tugas yang tersedia harus lebih besar dari atau sama dengan jumlah tugas di semua tahap kueri.

Contoh perhitungan slot

Jenis alur Konfigurasi Slot yang diperlukan
Stateless tahap tunggal (sumber Kafka + penampungan) maxPartitions = 8 8 slot
Stateful dua tahap (sumber Kafka + acak) maxPartitions = 8, partisi shuffle = 20 28 slot (8 + 20)
Tiga tahap (sumber Kafka + acak + partisi ulang) maxPartitions = 8, dua tahap pengacakan masing-masing 20 48 slot (8 + 20 + 20)

Jika Anda tidak mengatur maxPartitions, gunakan jumlah partisi dalam topik Kafka.

Sumber daya tambahan