本頁提供結構化串流即時模式的參考資訊,包括支援的環境、語言、來源、匯入與運算元。 關於已知的限制,請參見 即時模式限制。
支援的語言
即時模式支援 Scala、Java 和 Python。
計算類型
即時模式支援以下運算類型:
| 計算類型 |
支援 |
| 專用 (先前為:單一使用者) |
✓ |
| 標準 (先前: 共用) |
✓(僅限 Python) |
| Lakeflow Spark 宣告式資料流水線經典版 |
不支援 |
| Lakeflow Spark 宣告式管線無伺服器 |
不支援 |
| 無伺服器 |
不支援 |
執行模式
即時模式僅支援更新模式:
| 執行模式 |
支援 |
| 更新模式 |
✓ |
| 附加模式 |
不支援 |
| 完成模式 |
不支援 |
來源和接收器
即時模式支援以下來源與匯:
| 源頭或接收端 |
作為資料來源 |
作為沉沒 |
| Apache Kafka |
✓ |
✓ |
| Event Hubs(使用 Kafka 連接器) |
✓ |
✓ |
| Kinesis |
✓(僅限 EFO 模式) |
不支援 |
| AWS MSK |
✓ |
不支援 |
| Delta |
不支援 |
不支援 |
| Google 發布/訂閱 |
不支援 |
不支援 |
| 阿帕奇脈衝星 |
不支援 |
不支援 |
任意接收端(使用 forEachWriter) |
不適用 |
✓ |
操作員
即時模式支援大多數結構化串流運算子:
無狀態操作
UDFs
Aggregation
| Operator |
支援 |
| sum |
✓ |
| 計數 |
✓ |
| max |
✓ |
| min |
✓ |
| avg |
✓ |
|
聚合函數 |
✓ |
視窗化
| Operator |
支援 |
| 輪轉 |
✓ |
| 滑動 |
✓ |
| Session |
不支援 |
Deduplication
| Operator |
支援 |
| 刪除重複項 |
✓(狀態無界) |
| dropDuplicatesWithinWatermark |
不支援 |
串流到資料表連接
| Operator |
支援 |
| 廣播資料表連接(資料表應該很小) |
✓ |
| 串流到串流連接 |
不支援 |
| (平坦)MapGroupsWithState |
不支援 |
| transformWithState |
✓(有些差異) |
| union |
✓(有一些限制) |
| forEach |
✓ |
| 逐批處理 |
不支援 |
| map分區處理 |
不支援(見限制) |
特殊考慮
部分操作員與功能在即時模式下會有特定的考量或差異。
為了建置自定義具狀態應用程式,Databricks 支援 transformWithState,這是 Apache Spark 結構化串流中的 API。 如需 API 和代碼段的詳細資訊,請參閱 建置自定義具狀態應用程式 。
不過,API 在即時模式中的行為和利用微批次架構的傳統串流查詢之間有一些差異。
- 即時模式會對每一列呼叫
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues) 方法。
-
inputRows反覆運算器會傳回單一值。
微批次模式會為每個鍵呼叫一次, inputRows 迭代器會回傳該微批次中該鍵的所有值。
- 在撰寫程式碼時考慮這個差異
- 即時模式不支援事件時間定時器。
- 在即時模式下,計時器的觸發會因資料到達而延遲。
- 如果計時器排定在 10:00:00,但沒有資料到達,計時器不會立即觸發。
- 若資料在10:00:10抵達,計時器會在10秒後觸發。
- 若無資料抵達且長期執行的批次正在終止,計時器會在批次結束前觸發。
Python 即時模式下的 UDF
Databricks 支援大多數 Python 使用者定義函式(UDF)的即時模式:
無狀態
有狀態分組(UDAF)
| UDF 類型 |
支援 |
transformWithState (僅限介面 Row ) |
✓ |
applyInPandasWithState |
不支援 |
無狀態分群(UDAF)
| UDF 類型 |
支援 |
apply |
不支援 |
applyInArrow |
不支援 |
applyInPandas |
不支援 |
表格功能
在即時模式下使用 Python UDF 時,有幾個要點需要考慮:
- 為了降低延遲,請將 Arrow 批次大小
spark.sql.execution.arrow.maxRecordsPerBatch()設定為 1。
- 取捨:此組態會以犧牲輸送量為代價來最佳化延遲。 對於大部分的工作負載,建議使用此設定。
- 只有在需要更高的輸送量來容納輸入量時,才增加批次大小,並接受延遲的潛在增加。
- Pandas UDF 和函數在 Arrow 批次大小為 1 時表現不佳。
- 如果您使用 pandas UDF 或函數,請將箭頭批次大小設定為較高的值 (例如,100 或更高)。
- 這代表延遲會更高。 Databricks 建議如果可能的話,使用 Arrow 的 UDF 或函式。
- 由於 pandas 的效能問題,transformWithState 只支援
Row 介面。