共用方式為


設定即時模式

本頁說明在結構化串流中執行即時模式查詢所需的前置條件與設定。 有關逐步教學,請參見 「教學:執行即時串流工作負載」。 關於即時模式的概念性資訊,請參閱 結構化串流中的即時模式

先決條件

要使用即時模式,您必須設定運算以符合以下需求:

  • 在經典運算中使用專用存取模式。 不支援標準存取模式、Lakeflow Spark 宣告式管線及無伺服器叢集。
  • 使用 Databricks Runtime 16.4 LTS 及以上版本。
  • 關閉自動縮放。
  • 關閉光子。
  • spark.databricks.streaming.realTimeMode.enabled 設定為 true
  • 關閉 spot instance 以避免被打斷。

關於建立與設定經典運算的說明,請參見計算組態參考。

查詢組態

要在即時模式下執行查詢,必須啟用即時觸發器。 即時觸發器僅支援更新模式。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

運算規模

當運算資源有足夠的任務槽時,你可以在每個運算資源上執行一個即時作業。

若要以低延遲模式執行,可用的工作位置總數必須大於或等於所有查詢階段的工作數目。

槽位置計算範例

管線類型 Configuration 必填欄位
單階段無狀態(Kafka source + sink) maxPartitions = 8 8個插槽
兩階段狀態式(卡夫卡來源 + 洗牌) maxPartitions = 8,洗牌分割 = 20 28個欄位(8 + 20)
三階段(卡夫卡來源 + 洗牌 + 重新分配) maxPartitions = 8,兩個各20分的洗牌階段 48 個欄位(8 + 20 + 20)

如果你沒有設定 maxPartitions,請使用 Kafka 主題中的分割數量。

其他資源