共用方式為


結構化串流中的即時模式

即時模式是一種結構化串流的觸發類型,能實現超低延遲的資料處理,端到端延遲低至五毫秒。 對於需要即時回應串流資料的營運工作負載,例如詐欺偵測、即時個人化及即時決策系統,請使用即時模式。

Databricks Runtime 16.4 LTS 和更新版本提供即時模式。 有關逐步設定說明,請參見「 即時模式開始」。 關於程式碼範例,請參見 即時模式範例

什麼是即時模式?

營運工作負載與分析工作負載

串流工作負載可廣泛分為分析工作負載和作業工作負載:

  • 分析工作負載會使用數據引入和轉換,通常遵循 Medallion 架構(例如,將數據引入至銅、銀和金表)。
  • 作業工作負載會耗用實時數據、套用商業規則,以及觸發下游動作或決策。

作業工作負載的一些範例包括:

  • 如果詐騙分數超過閾值,則實時封鎖或標記信用卡交易,以異常位置、大型交易大小或快速消費模式等因素為基礎。
  • 當點擊流數據顯示用戶已經流覽牛仔褲五分鐘時,提供促銷訊息,如果他們在未來 15 分鐘內購買,則提供 25% 折扣。

一般而言,營運工作負載的特點是需要低於一秒的端對端延遲。 這可以使用Apache Spark結構化串流中的即時模式來達成。

即時模式如何達成低延遲

即時模式可藉由下列方式改善執行架構:

  • 執行一些耗時的批次作業(預設為五分鐘),系統會在資料於來源中可用時進行處理。
  • 同時排程執行查詢的所有階段。 這需要可用的工作位置數目等於或大於批次中所有階段的工作數目。
  • 在產生資料後立即透過串流洗牌在各階段間傳遞資料。

在處理一批次結束且下一批批次開始前,結構化串流會執行檢查點以保存進度並發布指標。 批次持續時間會影響檢查點的頻率:

  • 更長的批次:檢查點的頻率減少,這意味著失敗時的重播時間延長,以及數據指標的可用性延遲。
  • 較短批次:檢查點更頻繁,可能影響延遲。

Databricks 建議將即時模式與目標工作負載做基準測試,找出合適的觸發間隔。

何時使用即時模式

選擇即時模式,當您的使用案例需要時:

  • 亞秒延遲:需要在毫秒內回應資料的應用程式,例如必須即時阻擋交易的詐欺偵測系統。
  • 營運決策:根據收到的資料,如即時優惠、警示或通知,觸發即時行動的系統。
  • 連續處理:資料必須在抵達後立即處理,而非定期批次處理的工作負載。

在以下情況下使用微批次模式(預設的結構化串流觸發器):

  • 分析處理:ETL 管線、資料轉換,以及以秒或分鐘計量延遲需求的 medallion 架構實作。
  • 成本優化:不需要亞秒延遲的工作負載,因為即時模式需要專用的運算資源。
  • 檢查點頻率很重要:能從更頻繁的檢查點中獲益、以加快恢復的應用程式。

需求與配置

即時模式對計算設定與查詢配置有特定需求。 本節說明使用即時模式所需的前置條件與設定步驟。

先決條件

要使用即時模式,您必須符合以下條件:

  • Databricks 執行環境 16.4 LTS 或更高版本:實時模式僅在 DBR 16.4 LTS 及更新版本中提供。
  • 專用運算:你必須使用專用(過去是單一使用者)運算。 標準(過去共享)、Lakeflow Spark 宣告式管線及無伺服器叢集均不支援。
  • 禁止自動縮放:必須關閉自動縮放。
  • 無光子:即時模式不支援光子加速。
  • 火花配置:你必須設定 spark.databricks.streaming.realTimeMode.enabledtrue

計算配置

請用以下設定配置你的運算器:

  • 在 Spark 配置中設定spark.databricks.streaming.realTimeMode.enabledtrue
  • 關閉自動縮放。
  • 關閉光子加速。
  • 確保運算被設定為專用叢集(非標準、Lakeflow Spark 宣告式管線或無伺服器叢集)。

關於建立與設定即時模式運算的逐步說明,請參見 「即時模式開始」。

查詢組態

要在即時模式下執行查詢,必須啟用即時觸發器。 即時觸發器僅支援更新模式。

Python

query = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("subscribe", input_topic)
        .load()
        .writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker_address)
        .option("topic", output_topic)
        .option("checkpointLocation", checkpoint_location)
        .outputMode("update")
        # In PySpark, the realTime trigger requires specifying the interval.
        .trigger(realTime="5 minutes")
        .start()
)

Scala

import org.apache.spark.sql.execution.streaming.RealTimeTrigger

val readStream = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("subscribe", inputTopic).load()
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokerAddress)
      .option("topic", outputTopic)
      .option("checkpointLocation", checkpointLocation)
      .outputMode("update")
      .trigger(RealTimeTrigger.apply())
      // RealTimeTrigger can also accept an argument specifying the checkpoint interval.
      // For example, this code indicates a checkpoint interval of 5 minutes:
      // .trigger(RealTimeTrigger.apply("5 minutes"))
      .start()

運算規模

當運算資源有足夠的任務槽時,你可以在每個運算資源上執行一個即時作業。

若要以低延遲模式執行,可用的工作位置總數必須大於或等於所有查詢階段的工作數目。

槽位置計算範例

管線類型 Configuration 必填欄位
單階段無狀態(Kafka source + sink) maxPartitions = 8 8個插槽
兩階段狀態式(卡夫卡來源 + 洗牌) maxPartitions = 8,洗牌分割 = 20 28個欄位(8 + 20)
三階段(卡夫卡來源 + 洗牌 + 重新分配) maxPartitions = 8,兩個各20分的洗牌階段 48 個欄位(8 + 20 + 20)

如果你沒有設定 maxPartitions,請使用 Kafka 主題中的分割數量。

重要考慮

在設定運算時,請考慮以下幾點:

  • 與微批次模式不同,即時工作可以在等待數據時保持閑置,因此適當重設大小對於避免浪費的資源至關重要。
  • 透過調整設定以達成目標利用率(例如達到50%)
    • maxPartitions (適用於卡夫卡)
    • spark.sql.shuffle.partitions (針對洗牌階段)
  • Databricks 建議設定 maxPartitions,以便每個任務可以處理多個 Kafka 分區,從而減少開銷。
  • 調整每個工作者的工作槽,以符合簡單單階段工作的工作負載。
  • 對於涉及大量洗牌操作的作業,進行實驗以找到能避免積壓的最小洗牌分區數,然後從那裡進行調整。 如果計算資源不足,它就不會排程作業。

Note

從 Databricks Runtime 16.4 LTS 和更新版本,所有即時管線都會使用檢查點 v2,允許即時和微批次模式之間順暢地切換。

優化技術

請使用以下技術來降低即時模式下的延遲:

  • 非同步進度追蹤:Moves 將寫入偏移量並將日誌提交到非同步執行緒,減少無狀態查詢的批次間時間。
  • 非同步狀態檢查點:計算完成後立即開始處理下一個微批次,無需等待狀態檢查點,降低有狀態查詢的延遲。

Note

這兩種技術預設都未啟用。 你必須分別啟用它們。

監視和可觀察性

查詢效能的衡量對於即時工作負載至關重要。 在即時模式下,傳統批次時長指標無法反映實際延遲,因此需要其他方法。

端對端延遲是工作負載特定的,有時只能使用商業規則精確測量。 例如,如果來源時間戳是輸出在 Kafka 中,你可以計算延遲,作為 Kafka 輸出時間戳與來源時間戳的差值。

你也可以利用下方內建的指標和 API 來估算端到端延遲。

內建指標 StreamingQueryProgress

事件中包含 StreamingQueryProgress 下列計量,該事件會自動記錄在驅動程序記錄中。 您也可以透過 StreamingQueryListeneronQueryProgress() 回呼函式存取它們。 QueryProgressEvent.json()toString() 包含額外的即時模式計量。

  1. 處理延遲 (processingLatencyMs)。 即即時模式查詢讀取紀錄到將紀錄寫入下一階段或下游之間的時間。 對於單一階段查詢,這會測量與 E2E 延遲相同的持續時間。 系統會根據任務報告這個指標。
  2. 來源佇列延遲 (sourceQueuingLatencyMs)。 系統將記錄寫入訊息匯流排(例如 Kafka 的日誌附加時間)到即時模式查詢首次讀取記錄之間所經過的時間。 系統會根據任務報告這個指標。
  3. E2E 延遲 (e2eLatencyMs)。 系統將記錄寫入訊息匯流排與即時模式查詢將記錄寫入下游之間的時間。 系統會對每批次彙總此指標,並涵蓋所有任務處理的所有記錄。

例如:

"rtmMetrics" : {
    "processingLatencyMs" : {
      "P0" : 0,
      "P50" : 0,
      "P90" : 0,
      "P95" : 0,
      "P99" : 0
    },
    "sourceQueuingLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 3
    },
    "e2eLatencyMs" : {
      "P0" : 0,
      "P50" : 1,
      "P90" : 1,
      "P95" : 2,
      "P99" : 4
    },

使用 Observe API 進行自訂延遲測量

觀察 API 可協助測量延遲,而不啟動另一個作業。 如果你有一個近似來源資料到達時間的來源時間戳記,你可以使用 Observe API 估算每個批次的延遲。 在到達水槽前傳送時間戳:

Python

from datetime import datetime

from pyspark.sql.functions import avg, col, lit, max, percentile_approx, udf, unix_millis
from pyspark.sql.types import TimestampType

@udf(returnType=TimestampType())
def current_timestamp():
  return datetime.now()

# Query before outputting
.withColumn("temp-timestamp", current_timestamp())
.withColumn(
  "latency",
  unix_millis(col("temp-timestamp")).cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  "observedLatency",
  avg(col("latency")).alias("avg"),
  max(col("latency")).alias("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).alias("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).alias("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
# Output part of the query. For example, .WriteStream, etc.

Scala

import org.apache.spark.sql.functions.{avg, col, lit, max, percentile_approx, udf, unix_millis}

val currentTimestampUDF = udf(() => System.currentTimeMillis())

// Query before outputting
.withColumn("temp-timestamp", currentTimestampUDF())
.withColumn(
  "latency",
  col("temp-timestamp").cast("long") - unix_millis(col("timestamp")).cast("long"))
.observe(
  name = "observedLatency",
  avg(col("latency")).as("avg"),
  max(col("latency")).as("max"),
  percentile_approx(col("latency"), lit(0.99), lit(150)).as("p99"),
  percentile_approx(col("latency"), lit(0.5), lit(150)).as("p50"))
.drop(col("latency"))
.drop(col("temp-timestamp"))
// Output part of the query. For example, .WriteStream, etc.

在此範例中,在輸出專案之前會記錄目前的時間戳,而延遲的估計方式是計算此時間戳與記錄的來源時間戳之間的差異。 結果會包含在進度報告中,並提供給聽眾。 以下是範例輸出:

"observedMetrics" : {
  "observedLatency" : {
    "avg" : 63.8369765176552,
    "max" : 219,
    "p99" : 154,
    "p50" : 49
  }
}

功能支援與限制

本節說明即時模式所支援的功能及目前限制,包括相容環境、語言、原始碼、匯入、運算元,以及特定功能的特殊考量。

支援的環境、語言與模式

支援語言: 即時模式支援 Scala、Java 和 Python。

支援的運算類型:

計算類型 Supported
專用 (先前為:單一使用者)
標準 (先前: 共用) ✓(僅限 Python)
Lakeflow Spark 宣告式資料流水線經典版 不支援
Lakeflow Spark 宣告式管線無伺服器 不支援
Serverless 不支援

支援的執行模式:

執行模式 Supported
更新模式
附加模式 不支援
完成模式 不支援

來源與匯集支援

源頭或接收端 作為資料來源 作為沉沒
Apache Kafka
Event Hubs(使用 Kafka 連接器)
Kinesis ✓(僅 EFO 模式) 不支援
AWS MSK 不支援
Delta 不支援 不支援
Google 發布/訂閱 不支援 不支援
阿帕奇脈衝星 不支援 不支援
任意接收端(使用 forEachWriter 不適用

支援的運算子

Operators Supported
無國籍行動
Selection
Projection
UDF
斯卡拉 UDF ✓(有一些限制
Python 使用者定義函數 (UDF) ✓(有一些限制
匯總
sum
count
max
min
avg
彙總函數
視窗化
Tumbling
Sliding
Session 不支援
重複資料刪除
刪除重複項 ✓(狀態無界)
dropDuplicatesWithinWatermark 不支援
資料流 - 表格合併
廣播表(應該較小)
串流 - 串流聯結 不支援
(平坦)MapGroupsWithState 不支援
transformWithState ✓(有些差異
union ✓(有一些限制
forEach
逐批處理 不支援
map分區處理 不支援(見限制

特殊考慮

部分操作員與功能在即時模式下會有特定的考量或差異。

transformWithState 函數在即時模式下

為了建置自定義具狀態應用程式,Databricks 支援 transformWithState,這是 Apache Spark 結構化串流中的 API。 如需 API 和代碼段的詳細資訊,請參閱 建置自定義具狀態應用程式

不過,API 在即時模式中的行為和利用微批次架構的傳統串流查詢之間有一些差異。

  • 即時模式會對每一列呼叫 handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) 方法。
    • inputRows反覆運算器會傳回單一值。 微批次模式會為每個鍵呼叫一次, inputRows 迭代器會回傳該微批次中該鍵的所有值。
    • 你在撰寫程式碼時必須注意這個差異。
  • 即時模式不支援事件時間定時器。
  • 在即時模式下,計時器的觸發會因資料到達而延遲。
    • 如果計時器排定在 10:00:00,但沒有資料到達,計時器不會立即觸發。
    • 若資料在10:00:10抵達,計時器會在10秒後觸發。
    • 若無資料抵達且長期執行的批次正在終止,計時器會在批次結束前觸發。

Python UDF 在即時模式運行

Databricks 支援大部分的 Python 使用者定義函式 (UDF) 在即時模式中:

類別 UDF 類型 Supported
無狀態 Python 標量 UDF(使用者自訂標量函式 - Python
無狀態 Apache Arrow 純量 UDF
無狀態 Pandas 標量 UDF(pandas 使用者自定義函數
無狀態 箭頭函數 (mapInArrow
無狀態 Pandas 函數(地圖
有狀態分組(UDAF) transformWithState (僅限Row介面)
有狀態群組(UDAF) applyInPandasWithState 不支援
無狀態分群(UDAF) apply 不支援
非有狀態分組(UDAF) applyInArrow 不支援
無狀態聚合(UDAF) applyInPandas 不支援
表格函數 UDTF(Python 使用者定義表格函式(UDTFs) 不支援
表格函數 加州大學 UDF 不支援

在即時模式下使用 Python UDF 時需要考慮以下幾點:

  • 為了降低延遲,請將 Arrow 批次大小spark.sql.execution.arrow.maxRecordsPerBatch()設定為 1。
    • 取捨:此組態會以犧牲輸送量為代價來最佳化延遲。 對於大部分的工作負載,建議使用此設定。
    • 只有在需要更高的輸送量來容納輸入量時,才增加批次大小,並接受延遲的潛在增加。
  • Pandas UDF 和函數在 Arrow 批次大小為 1 時表現不佳。
    • 如果您使用 pandas UDF 或函數,請將箭頭批次大小設定為較高的值 (例如,100 或更高)。
    • 請注意,這意味著更高的延遲。 Databricks 建議盡可能使用 Arrow UDF 或函式。
  • 由於 pandas 的效能問題,transformWithState 只支援 Row 介面。

Limitations

來源限制

在 Kinesis 中,即時模式不支援輪詢模式。 此外,頻繁的重新分割可能會對延遲造成負面影響。

工會限制

聯合運算子有一些限制:

  • 即時模式不支援自合:
    • Kafka:你不能使用同一個來源資料框架物件,並合併從中衍生的資料框架。 變通方法:使用從同一來源讀取的不同的 DataFrame 對象。
    • Kinesis:你無法將來自相同 Kinesis 來源、相同配置的資料框架合併。 變通方法:除了使用不同的 DataFrame,你還可以為每個 DataFrame分別指派不同的「consumerName」選項。
  • 即時模式不支援在 Union 之前定義的有狀態運算子(例如 aggregatededuplicatetransformWithState)。
  • 即時模式不支援與批次來源的聯集。

MapPartitions 限制

mapPartitions 在 Scala 及類似的 Python API(mapInPandas, mapInArrow)中,會遍歷整個輸入分區,並產生一個包含輸入與輸出之間任意映射的整體輸出迭代器。 這些 API 在串流 Real-Time 模式下會阻擋整個輸出,導致延遲增加,導致效能問題。 這些 API 的語意無法充分支援浮水印傳播。

可用純量 UDF 搭配 複雜資料型態轉換filter,以達成類似功能。

下一步

現在你已經了解什麼是即時模式以及如何設定,請參考以下資源,開始實作即時串流應用程式: