在 Lakeflow Spark 宣告式管線中使用即時模式

Important

Lakeflow Spark 宣告式管線中的即時模式目前在預覽通道的 Databricks Runtime 18.1.2 上處於 Public Preview

即時模式可實現超低延遲資料處理,端到端延遲低至五毫秒。 對於需要即時回應串流資料的營運工作負載,例如詐欺偵測與即時個人化,則可使用即時模式。

即時模式也可在結構化串流中直接在管線之外使用。 請參閱 結構化串流中的即時模式

即時模式如何達到低延遲

即時模式與標準連續處理有三個主要差異:

  • 長時間執行的批次:系統會在長時間執行的批次中,於來源中有資料可用時處理資料(預設為五分鐘)。
  • 同時階段排程:所有查詢階段同時排程。 運算資源必須有足夠的任務欄位,以同時涵蓋所有階段。 請參閱運算資源規模
  • 串流洗牌:資料一產生就在各階段間傳遞,而非等待上游階段完成後才開始下游階段。

檢查點間隔(透過 pipelines.trigger.interval 進行設定)會控制將狀態和來源偏移量持久化到耐久性儲存體中的頻率。 較長的間隔可減少檢查點的開銷,但會增加故障後的復原時間,並延遲指標報告。 較短的間隔能提升耐用度,但增加額外負擔。

即時模式與連續管線

即時模式是一種專門的連續觸發方式。 連續模式仍是必需的——即時模式則額外增加了流量層級延遲的優化。 要使用即時模式,管線必須先以連續模式運行。 即時模式接著在流程層級進行額外優化,以達到超越標準連續處理的亞秒延遲。

啟用即時模式需要三個設定步驟:

  1. 將管線設定為連續模式。
  2. 在管線層級啟用即時模式。
  3. 定義即時更新流程。

Requirements

Requirement Value
Databricks Runtime 18.1.2 於 SDP 預覽頻道
計算類型 經典運算或無伺服器

設定即時模式

步驟 1:將管線設定為連續模式

在你的管線設定中,將 管線模式 設為 連續,或在管線 JSON 中設定:

{
  "continuous": true
}

步驟 2:在管線層級啟用即時模式

在管線設定中,將以下金鑰新增至 進階 > Spark 設定 下方的 Spark 設定中:

spark.databricks.streaming.realTimeMode.enabled = true

你也可以在管線 JSON 中設定:

{
  "continuous": true,
  "spark_conf": {
    "spark.databricks.streaming.realTimeMode.enabled": "true"
  }
}

步驟 3:定義即時更新流程

即時模式必須有更新流程。 使用 dp.create_sink() 定義輸出目標,然後使用 @dp.update_flow 裝飾器,將 pipelines.trigger 設為 "RealTime",並讓 target 指向接收端。

from pyspark import pipelines as dp

# Define the output sink
dp.create_sink(
    "my_kafka_sink",
    "kafka",
    {
        "kafka.bootstrap.servers": "<bootstrap-servers>",
        "topic": "<output-topic>",
    }
)

# Define the real-time update flow targeting the sink
@dp.update_flow(
    name="my_rtm_flow",
    target="my_kafka_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",  # optional; defaults to 5 minutes
    }
)
def my_real_time_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "<bootstrap-servers>")
            .option("subscribe", "<input-topic>")
            .load()
    )

流量層級配置參數:

參數 Required 預設值 Description
pipelines.trigger Yes 將其設為 "RealTime" 以啟用此流程的即時模式。
pipelines.trigger.interval No "5 minutes" 檢查點間隔。 控制狀態和偏移量提交的頻率。 較短的數值能提升恢復性;較長的數值可降低開銷。

程式碼範例

卡夫卡對卡夫卡

從 Kafka 主題讀取資料,並寫入 Kafka 輸出目標:

from pyspark import pipelines as dp

dp.create_sink("kafka_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="kafka_rtm_flow",
    target="kafka_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def kafka_rtm_flow():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .option("startingOffsets", "latest")
            .load()
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
    )

以廣播連接來豐富

將 Kafka 串流與靜態查找表聯結。 僅支援廣播(串流到靜態)連接。 即時模式下不支援串流間連線。

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr

dp.create_sink("enriched_output_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": enriched_output_topic,
})

@dp.update_flow(
    name="enriched_events_flow",
    target="enriched_output_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
    }
)
def enriched_events():
    lookup = spark.read.table("catalog.schema.lookup_table")
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .withColumn("event_key", expr("CAST(value AS STRING)"))
            .join(broadcast(lookup), expr("event_key = lookup_key"))
            .select("event_key", "lookup_value", "timestamp")
    )

Aggregation

使用有狀態的 groupBy 依鍵統計事件。 將 spark.sql.shuffle.partitions 設為與有狀態作業的輸入分割區數量一致:

from pyspark import pipelines as dp
from pyspark.sql.functions import col

dp.create_sink("event_counts_sink", "kafka", {
    "kafka.bootstrap.servers": broker_address,
    "topic": output_topic,
})

@dp.update_flow(
    name="event_counts_flow",
    target="event_counts_sink",
    spark_conf={
        "pipelines.trigger": "RealTime",
        "pipelines.trigger.interval": "5 minutes",
        "spark.sql.shuffle.partitions": "8",
    }
)
def event_counts():
    return (
        spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", broker_address)
            .option("subscribe", input_topic)
            .load()
            .selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
            .groupBy(col("event_type"))
            .count()
    )

支援的來源與匯

連接器 作為資料來源 作為沉沒 註釋
Apache Kafka
AWS MSK 使用卡夫卡相容介面。
Azure 事件中樞 (Kafka connector) 使用卡夫卡相容介面。
Amazon Kinesis 不支援 僅限用於 EFO(Enhanced Fan-Out)模式。
Delta 不支援 不支援

計算大小調整

如果運算資源有足夠的任務槽,你可以為每個運算資源執行一條即時管線。 可用的任務欄位必須涵蓋所有查詢階段的所有任務。

管線類型 Configuration 必填任務欄位
單階段無狀態(Kafka source + sink) maxPartitions = 8 8
兩階段狀態式(卡夫卡來源 + 洗牌) maxPartitions = 8,洗牌分割 = 20 28 (8 + 20)
三階段(卡夫卡來源 + 兩次洗牌) maxPartitions = 8,兩個各20分的洗牌階段 48 (8 + 20 + 20)

如果你沒有設定 maxPartitions,請使用 Kafka 主題中的分割數量。

營運商支援

類別 Operator 支援
無狀態 選擇與投影
UDFs Scala UDF ✓(有限制)
UDFs Python 使用者定義函數 (UDF) ✓(有限制)
Aggregation 總和、計數、最大值、最小值、平均值
Windowing 翻滾、滑行
Windowing Session 不支援
Deduplication dropDuplicates ✓(無界狀態)
Deduplication dropDuplicatesWithinWatermark 不支援
Joins 廣播資料表聯結
Joins 串流間連接 不支援
習慣 transformWithState ✓(行為差異)
習慣 union ✓(有限制)
習慣 forEach 不支援
習慣 flatMapGroupsWithState 不支援
習慣 mapPartitions 不支援
習慣 forEachBatch 不支援

transformWithState 在即時模式下

transformWithState 支援即時模式,與微批次處理有以下差異:

  • handleInputRows 會對每一列叫用一次,而不是在每個批次中對每個鍵各叫用一次。 inputRows迭代器每次呼叫只會產生一個值。
  • 不支援事件時間計時器。 如果沒有資料到達,處理時間計時器會在長時間執行的批次結束時觸發。
  • 不支援 transformWithStateInPandas

即時模式中的 Pandas UDF

為了降低 PANDAS UDF 的延遲,請設 spark.sql.execution.arrow.maxRecordsPerBatch1。 這以犧牲吞吐量為代價,優化延遲。 如果吞吐量也很重要,請將此值設為 100 或更高。

監控即時模式效能

即時模式會在 StreamingQueryProgresslatencies 欄位下顯示延遲指標。 可透過 StreamingQueryListener 存取這些指標,或檢查串流查詢上的 lastProgress 屬性。

計量 Description
processingLatencyMs 紀錄被流程讀取到被流程完全處理之間的時間
sourceQueuingLatencyMs 記錄成功寫入訊息匯流排(例如 Kafka 的日誌附加時間)後,到該記錄首次被流程讀取之間的時間
e2eLatencyMs 從記錄在來源產生到被流程完全處理的全端延遲

每個指標分別以p50、p90、p95和p99百分位數報告。

Limitations

建議每條管線使用一個即時流量。 允許多個流程,但各流程間的任務槽爭用會增加延遲。

有關操作員與來源限制的完整清單,請參見 即時模式限制

其他資源