共用方式為


開始使用即時模式

這很重要

這項功能目前處於 公開預覽版

即時模式實現超低延遲串流,端對端延遲低至五毫秒,非常適合詐欺偵測與即時個人化等營運工作負載。 本教學將引導你如何用一個簡單的範例設定你的第一個即時串流查詢。

關於即時模式的概念性資訊、何時使用以及支援的功能,請參閱 結構化串流中的即時模式

要求

  • 你有 權限建立經典運算
  • Databricks 運行時環境 17.1 或以上(使用即時模式功能必備 display)。

備註

如果你沒有經典運算建立權限,請聯絡你的工作區管理員,請他們用步驟 1 中的設定為你建立即時模式叢集。

步驟 1:建立經典的即時運算

即時模式需要特定的經典運算配置才能達成超低延遲。 這些設定確保任務能在所有階段同時執行,且資料在抵達時持續處理,而非批次處理。

要建立正確設定的經典運算:

  1. 在你的 Azure Databricks 工作區,點選側邊欄的 「運算 」。

  2. 點擊 建立運算

  3. 輸入名稱。

  4. 選擇 Databricks 執行環境 17.1 或以上版本。

  5. 清除 光子加速 (即時模式不支援光子)。

  6. 清除 啟用自動縮放 (即時模式需要固定叢集大小)。

  7. 進階效能下,清除 使用 spot 實例 (spot 實例可能會造成中斷)。

  8. 點選 進階選項 以展開更多設定。

  9. 存取模式中,選擇專用(過去稱為單一使用者)。

  10. Spark 設定中,新增以下設定:

    spark.databricks.streaming.realTimeMode.enabled true
    
  11. 點擊 建立運算

步驟二:建立筆記本

筆記本提供互動式環境,用於開發與測試串流查詢。 你用這個筆記本來寫即時查詢,並持續看到結果更新。

建立筆記本:

  1. 在側邊欄點選 「新」 ,然後點「 筆記本」。
  2. 在計算下拉選單中,選擇你在步驟 1 建立的計算。
  3. 選擇 PythonScala 作為預設語言。

步驟 3:執行即時模式查詢

把以下程式碼複製貼上到筆記本儲存格並執行。 此範例使用速率來源,該來源以指定速率產生列,並即時顯示結果。

備註

display帶有realTime觸發器的函式可在 Databricks Runtime 17.1 及以上版本中使用。

Python

inputDF = (
  spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
)
display(inputDF, realTime="5 minutes", outputMode="update")

Scala

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.streaming.OutputMode

val inputDF = spark
  .readStream
  .format("rate")
  .option("numPartitions", 2)
  .option("rowsPerSecond", 1)
  .load()
display(inputDF, trigger=Trigger.RealTime(), outputMode=OutputMode.Update())

執行程式碼後,你會看到一個表格,隨著新列的產生而即時更新。 表格顯示一個 timestamp 欄和一個 value 欄,它們隨著每一行遞增。

了解程式碼

上述程式碼展示了即時串流查詢的基本要素。 以下表格說明關鍵參數及其控制範圍:

Python

參數 說明
format("rate") 使用速率來源,一個內建的資料來源,能以可設定的速率產生資料列。 這對於沒有外部依賴的測試非常有用。
numPartitions 設定產生資料的分割區數。
rowsPerSecond 控制每秒產生的列數。
realTime="5 minutes" 啟用即時模式。 區間則指定查詢檢查點的進行頻率。 較長的間隔意味著檢查點的頻率降低,但失敗後的恢復時間可能更長。
outputMode="update" 即時模式需要更新輸出模式。

Scala

參數 說明
format("rate") 使用速率來源,一個內建的來源,能以可設定的速率產生資料列。 這對於沒有外部依賴的測試非常有用。
numPartitions 設定產生資料的分割區數。
rowsPerSecond 控制每秒產生的列數。
Trigger.RealTime() 啟用預設檢查點間隔的即時模式。 你也可以指定一個區間,例如 Trigger.RealTime("5 minutes")
OutputMode.Update() 即時模式需要更新輸出模式。

你看到的

當你執行查詢時,display 函數會建立一個表格,隨著速率來源端產生新資料列,實時更新。 每列包含:

  • 時間戳記:由速率來源產生該列的時間
  • :按單調遞增順序增加的計數器,每新增一列便遞增。

該表格持續更新,延遲極低,展示即時模式如何在資料一可用時立即處理。 這就是即時模式的核心優勢——能夠立即看到並對資料採取行動,而不必等待批次處理。

您已學到的內容

你已經成功設定並執行了你的第一個即時串流查詢。 您現在知道如何:

  • 設定經典運算並設定即時模式所需的設定(專用叢集、停用 Photon、關閉自動縮放、Spark 設定)
  • 啟用使用 realTime 觸發器進行即時處理
  • 使用此功能 display 進行互動式開發與測試
  • 透過觀察持續更新,確認你的查詢是否以即時模式運行

你已經準備好用 Kafka、Kinesis 及其他支援的原始碼來建立生產即時管線了。 想了解更多結構化串流,請參閱結構化串流概念。

下一步

現在你已經執行了第一個即時查詢,請探索以下資源來建立生產串流應用程式:

  • 即時模式範例 - Kafka 原始碼與匯、有狀態查詢、聚合及自訂匯項的工作程式碼範例
  • 即時模式參考 - 了解叢集大小、支援的運算元、監控及功能限制
  • 有狀態串流應用程式 ——在您的串流查詢中加入狀態管理,以進行重複刪除、聚合與視窗化
  • 進階狀態管理 - 用於 transformWithState 具備存活時間(TTL)及複雜邏輯的自訂狀態處理