本頁說明在結構化串流中執行即時模式查詢所需的前置條件與設定。 有關逐步教學,請參見 「教學:執行即時串流工作負載」。 關於即時模式的概念性資訊,請參閱 結構化串流中的即時模式。
先決條件
要使用即時模式,您必須設定運算以符合以下需求:
- 在經典運算中使用專用存取模式。 不支援標準存取模式、Lakeflow Spark 宣告式管線及無伺服器叢集。
- 使用 Databricks Runtime 16.4 LTS 及以上版本。
- 關閉自動縮放。
- 關閉光子。
- 將
spark.databricks.streaming.realTimeMode.enabled設定為true。 - 關閉 spot instance 以避免被打斷。
關於建立與設定經典運算的說明,請參見計算組態參考。
查詢組態
要在即時模式下執行查詢,必須啟用即時觸發器。 即時觸發器僅支援更新模式。
Python
query = (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("topic", output_topic)
.option("checkpointLocation", checkpoint_location)
.outputMode("update")
# In PySpark, the realTime trigger requires specifying the interval.
.trigger(realTime="5 minutes")
.start()
)
Scala
import org.apache.spark.sql.execution.streaming.RealTimeTrigger
val readStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("subscribe", inputTopic).load()
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerAddress)
.option("topic", outputTopic)
.option("checkpointLocation", checkpointLocation)
.outputMode("update")
.trigger(RealTimeTrigger.apply())
// RealTimeTrigger can also accept an argument specifying the checkpoint interval.
// For example, this code indicates a checkpoint interval of 5 minutes:
// .trigger(RealTimeTrigger.apply("5 minutes"))
.start()
運算規模
當運算資源有足夠的任務槽時,你可以在每個運算資源上執行一個即時作業。
若要以低延遲模式執行,可用的工作位置總數必須大於或等於所有查詢階段的工作數目。
槽位置計算範例
| 管線類型 | Configuration | 必填欄位 |
|---|---|---|
| 單階段無狀態(Kafka source + sink) |
maxPartitions = 8 |
8個插槽 |
| 兩階段狀態式(卡夫卡來源 + 洗牌) |
maxPartitions = 8,洗牌分割 = 20 |
28個欄位(8 + 20) |
| 三階段(卡夫卡來源 + 洗牌 + 重新分配) |
maxPartitions = 8,兩個各20分的洗牌階段 |
48 個欄位(8 + 20 + 20) |
如果你沒有設定 maxPartitions,請使用 Kafka 主題中的分割數量。