Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Halaman ini menjelaskan prasyarat dan konfigurasi yang diperlukan untuk menjalankan kueri mode real time di Streaming Terstruktur. Untuk tutorial langkah demi langkah, lihat Tutorial: Menjalankan beban kerja streaming real time. Untuk informasi konseptual tentang mode real time, lihat Mode real time di Streaming Terstruktur.
Prasyarat
Untuk menggunakan mode real time, Anda harus mengonfigurasi komputasi untuk memenuhi persyaratan berikut:
- Gunakan komputasi klasik. Mode akses khusus dan standar didukung. Mode akses standar hanya didukung untuk Python. Alur Deklaratif Lakeflow Spark dan kluster tanpa server tidak didukung.
- Gunakan Databricks Runtime 16.4 LTS ke atas.
- Nonaktifkan penskalakan otomatis.
- Matikan Photon.
- Atur
spark.databricks.streaming.realTimeMode.enabledketrue. - Nonaktifkan instans spot untuk menghindari gangguan.
Untuk beban kerja sensitif latensi dengan UDF, Databricks merekomendasikan agar Anda menggunakan mode akses khusus. Lihat Fungsi tabel.
Untuk petunjuk tentang membuat dan mengonfigurasi komputasi klasik, lihat Referensi konfigurasi komputasi.
Konfigurasi kueri
Untuk menjalankan kueri dalam mode real time, Anda harus mengaktifkan pemicu real-time. Pemicu real time hanya didukung dalam mode pembaruan.
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
Pemilihan ukuran
Anda dapat menjalankan satu pekerjaan waktu nyata untuk setiap sumber daya komputasi jika sumber daya tersebut memiliki slot tugas yang cukup.
Untuk berjalan dalam mode latensi rendah, jumlah total slot tugas yang tersedia harus lebih besar dari atau sama dengan jumlah tugas di semua tahap kueri.
Contoh perhitungan slot
| Jenis alur | Konfigurasi | Slot yang diperlukan |
|---|---|---|
| Stateless tahap tunggal (sumber Kafka + penampungan) |
maxPartitions = 8 |
8 slot |
| Stateful dua tahap (sumber Kafka + acak) |
maxPartitions = 8, partisi shuffle = 20 |
28 slot (8 + 20) |
| Tiga tahap (sumber Kafka + acak + partisi ulang) |
maxPartitions = 8, dua tahap pengacakan masing-masing 20 |
48 slot (8 + 20 + 20) |
Jika Anda tidak mengatur maxPartitions, gunakan jumlah partisi dalam topik Kafka.