Important
Lakeflow Spark 宣告式管線中的即時模式目前在預覽通道的 Databricks Runtime 18.1.2 上處於 Public Preview。
即時模式可實現超低延遲資料處理,端到端延遲低至五毫秒。 對於需要即時回應串流資料的營運工作負載,例如詐欺偵測與即時個人化,則可使用即時模式。
即時模式也可在結構化串流中直接在管線之外使用。 請參閱 結構化串流中的即時模式。
即時模式如何達到低延遲
即時模式與標準連續處理有三個主要差異:
- 長時間執行的批次:系統會在長時間執行的批次中,於來源中有資料可用時處理資料(預設為五分鐘)。
- 同時階段排程:所有查詢階段同時排程。 運算資源必須有足夠的任務欄位,以同時涵蓋所有階段。 請參閱運算資源規模。
- 串流洗牌:資料一產生就在各階段間傳遞,而非等待上游階段完成後才開始下游階段。
檢查點間隔(透過 pipelines.trigger.interval 進行設定)會控制將狀態和來源偏移量持久化到耐久性儲存體中的頻率。 較長的間隔可減少檢查點的開銷,但會增加故障後的復原時間,並延遲指標報告。 較短的間隔能提升耐用度,但增加額外負擔。
即時模式與連續管線
即時模式是一種專門的連續觸發方式。 連續模式仍是必需的——即時模式則額外增加了流量層級延遲的優化。 要使用即時模式,管線必須先以連續模式運行。 即時模式接著在流程層級進行額外優化,以達到超越標準連續處理的亞秒延遲。
啟用即時模式需要三個設定步驟:
- 將管線設定為連續模式。
- 在管線層級啟用即時模式。
- 定義即時更新流程。
Requirements
| Requirement | Value |
|---|---|
| Databricks Runtime | 18.1.2 於 SDP 預覽頻道 |
| 計算類型 | 經典運算或無伺服器 |
設定即時模式
步驟 1:將管線設定為連續模式
在你的管線設定中,將 管線模式 設為 連續,或在管線 JSON 中設定:
{
"continuous": true
}
步驟 2:在管線層級啟用即時模式
在管線設定中,將以下金鑰新增至 進階 > Spark 設定 下方的 Spark 設定中:
spark.databricks.streaming.realTimeMode.enabled = true
你也可以在管線 JSON 中設定:
{
"continuous": true,
"spark_conf": {
"spark.databricks.streaming.realTimeMode.enabled": "true"
}
}
步驟 3:定義即時更新流程
即時模式必須有更新流程。 使用 dp.create_sink() 定義輸出目標,然後使用 @dp.update_flow 裝飾器,將 pipelines.trigger 設為 "RealTime",並讓 target 指向接收端。
from pyspark import pipelines as dp
# Define the output sink
dp.create_sink(
"my_kafka_sink",
"kafka",
{
"kafka.bootstrap.servers": "<bootstrap-servers>",
"topic": "<output-topic>",
}
)
# Define the real-time update flow targeting the sink
@dp.update_flow(
name="my_rtm_flow",
target="my_kafka_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes", # optional; defaults to 5 minutes
}
)
def my_real_time_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<bootstrap-servers>")
.option("subscribe", "<input-topic>")
.load()
)
流量層級配置參數:
| 參數 | Required | 預設值 | Description |
|---|---|---|---|
pipelines.trigger |
Yes | — | 將其設為 "RealTime" 以啟用此流程的即時模式。 |
pipelines.trigger.interval |
No | "5 minutes" |
檢查點間隔。 控制狀態和偏移量提交的頻率。 較短的數值能提升恢復性;較長的數值可降低開銷。 |
程式碼範例
卡夫卡對卡夫卡
從 Kafka 主題讀取資料,並寫入 Kafka 輸出目標:
from pyspark import pipelines as dp
dp.create_sink("kafka_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="kafka_rtm_flow",
target="kafka_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def kafka_rtm_flow():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp")
)
以廣播連接來豐富
將 Kafka 串流與靜態查找表聯結。 僅支援廣播(串流到靜態)連接。 即時模式下不支援串流間連線。
from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast, expr
dp.create_sink("enriched_output_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": enriched_output_topic,
})
@dp.update_flow(
name="enriched_events_flow",
target="enriched_output_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
}
)
def enriched_events():
lookup = spark.read.table("catalog.schema.lookup_table")
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.withColumn("event_key", expr("CAST(value AS STRING)"))
.join(broadcast(lookup), expr("event_key = lookup_key"))
.select("event_key", "lookup_value", "timestamp")
)
Aggregation
使用有狀態的 groupBy 依鍵統計事件。 將 spark.sql.shuffle.partitions 設為與有狀態作業的輸入分割區數量一致:
from pyspark import pipelines as dp
from pyspark.sql.functions import col
dp.create_sink("event_counts_sink", "kafka", {
"kafka.bootstrap.servers": broker_address,
"topic": output_topic,
})
@dp.update_flow(
name="event_counts_flow",
target="event_counts_sink",
spark_conf={
"pipelines.trigger": "RealTime",
"pipelines.trigger.interval": "5 minutes",
"spark.sql.shuffle.partitions": "8",
}
)
def event_counts():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker_address)
.option("subscribe", input_topic)
.load()
.selectExpr("CAST(key AS STRING) AS event_type", "timestamp")
.groupBy(col("event_type"))
.count()
)
支援的來源與匯
| 連接器 | 作為資料來源 | 作為沉沒 | 註釋 |
|---|---|---|---|
| Apache Kafka | ✓ | ✓ | — |
| AWS MSK | ✓ | ✓ | 使用卡夫卡相容介面。 |
| Azure 事件中樞 (Kafka connector) | ✓ | ✓ | 使用卡夫卡相容介面。 |
| Amazon Kinesis | ✓ | 不支援 | 僅限用於 EFO(Enhanced Fan-Out)模式。 |
| Delta | 不支援 | 不支援 | — |
計算大小調整
如果運算資源有足夠的任務槽,你可以為每個運算資源執行一條即時管線。 可用的任務欄位必須涵蓋所有查詢階段的所有任務。
| 管線類型 | Configuration | 必填任務欄位 |
|---|---|---|
| 單階段無狀態(Kafka source + sink) |
maxPartitions = 8 |
8 |
| 兩階段狀態式(卡夫卡來源 + 洗牌) |
maxPartitions = 8,洗牌分割 = 20 |
28 (8 + 20) |
| 三階段(卡夫卡來源 + 兩次洗牌) |
maxPartitions = 8,兩個各20分的洗牌階段 |
48 (8 + 20 + 20) |
如果你沒有設定 maxPartitions,請使用 Kafka 主題中的分割數量。
營運商支援
| 類別 | Operator | 支援 |
|---|---|---|
| 無狀態 | 選擇與投影 | ✓ |
| UDFs | Scala UDF | ✓(有限制) |
| UDFs | Python 使用者定義函數 (UDF) | ✓(有限制) |
| Aggregation | 總和、計數、最大值、最小值、平均值 | ✓ |
| Windowing | 翻滾、滑行 | ✓ |
| Windowing | Session | 不支援 |
| Deduplication | dropDuplicates |
✓(無界狀態) |
| Deduplication | dropDuplicatesWithinWatermark |
不支援 |
| Joins | 廣播資料表聯結 | ✓ |
| Joins | 串流間連接 | 不支援 |
| 習慣 | transformWithState |
✓(行為差異) |
| 習慣 | union |
✓(有限制) |
| 習慣 | forEach |
不支援 |
| 習慣 | flatMapGroupsWithState |
不支援 |
| 習慣 | mapPartitions |
不支援 |
| 習慣 | forEachBatch |
不支援 |
transformWithState 在即時模式下
transformWithState 支援即時模式,與微批次處理有以下差異:
-
handleInputRows會對每一列叫用一次,而不是在每個批次中對每個鍵各叫用一次。inputRows迭代器每次呼叫只會產生一個值。 - 不支援事件時間計時器。 如果沒有資料到達,處理時間計時器會在長時間執行的批次結束時觸發。
- 不支援
transformWithStateInPandas。
即時模式中的 Pandas UDF
為了降低 PANDAS UDF 的延遲,請設 spark.sql.execution.arrow.maxRecordsPerBatch 為 1。 這以犧牲吞吐量為代價,優化延遲。 如果吞吐量也很重要,請將此值設為 100 或更高。
監控即時模式效能
即時模式會在 StreamingQueryProgress 的 latencies 欄位下顯示延遲指標。 可透過 StreamingQueryListener 存取這些指標,或檢查串流查詢上的 lastProgress 屬性。
| 計量 | Description |
|---|---|
processingLatencyMs |
紀錄被流程讀取到被流程完全處理之間的時間 |
sourceQueuingLatencyMs |
記錄成功寫入訊息匯流排(例如 Kafka 的日誌附加時間)後,到該記錄首次被流程讀取之間的時間 |
e2eLatencyMs |
從記錄在來源產生到被流程完全處理的全端延遲 |
每個指標分別以p50、p90、p95和p99百分位數報告。
Limitations
建議每條管線使用一個即時流量。 允許多個流程,但各流程間的任務槽爭用會增加延遲。
有關操作員與來源限制的完整清單,請參見 即時模式限制。