支援的語言
即時模式支援 Scala、Java 和 Python。
計算類型
即時模式支援以下運算類型:
| 計算類型 | 支援 |
|---|---|
| 專用 (先前為:單一使用者) | ✓ |
| 標準 (先前: 共用) | ✓(僅限 Python) |
| Lakeflow Spark 宣告式資料流水線經典版 | 不支援(見下方註解) |
| Lakeflow Spark 宣告式管線無伺服器 | 不支援(見下方註解) |
| 無伺服器 | 不支援 |
備註
即時模式不支援 Lakeflow Spark 宣告式管線中直接的結構化串流觸發類型。 然而,Lakeflow Spark 宣告式管線支援透過管線層級的即時模式配置。 請參閱 Lakeflow Spark 宣告式管線中的「使用即時模式」。
對於延遲敏感且帶有 UDF 的工作負載,Databricks 建議你使用專用存取模式。 參見 表格函數。
執行模式
即時模式僅支援更新模式:
| 執行模式 | 支援 |
|---|---|
| 更新模式 | ✓ |
| 附加模式 | 不支援 |
| 完成模式 | 不支援 |
來源和接收器
即時模式支援以下來源與匯:
| 源頭或接收端 | 作為資料來源 | 作為沉沒 |
|---|---|---|
| Apache Kafka | ✓ | ✓ |
| Event Hubs(使用 Kafka 連接器) | ✓ | ✓ |
| Kinesis | ✓(僅限 EFO 模式) | 不支援 |
| AWS MSK | ✓ | 不支援 |
| Delta | 不支援 | 不支援 |
| Google 發布/訂閱 | 不支援 | 不支援 |
| 阿帕奇脈衝星 | 不支援 | 不支援 |
任意接收端(使用 forEachWriter) |
不適用 | ✓ |
操作員
即時模式支援大多數結構化串流運算子:
無狀態操作
| Operator | 支援 |
|---|---|
| 選擇 | ✓ |
| 投影 | ✓ |
UDFs
| Operator | 支援 |
|---|---|
| 斯卡拉 UDF | ✓(有一些限制) |
| Python 使用者定義函數 (UDF) | ✓(有一些限制) |
Aggregation
| Operator | 支援 |
|---|---|
| sum | ✓ |
| 計數 | ✓ |
| max | ✓ |
| min | ✓ |
| avg | ✓ |
| 聚合函數 | ✓ |
視窗化
| Operator | 支援 |
|---|---|
| 輪轉 | ✓ |
| 滑動 | ✓ |
| Session | 不支援 |
Deduplication
| Operator | 支援 |
|---|---|
| 刪除重複項 | ✓(狀態無界) |
| dropDuplicatesWithinWatermark | 不支援 |
串流到資料表連接
| Operator | 支援 |
|---|---|
| 廣播資料表連接(資料表應該很小) | ✓ |
| 串流到串流連接 | 不支援 |
| (平坦)MapGroupsWithState | 不支援 |
| transformWithState | ✓(有些差異) |
| union | ✓(有一些限制) |
| forEach | ✓ |
| 逐批處理 | 不支援 |
| map分區處理 | 不支援(見限制) |
特殊考慮
部分操作員與功能在即時模式下會有特定的考量或差異。
transformWithState 在即時模式下
為了建置自定義具狀態應用程式,Databricks 支援 transformWithState,這是 Apache Spark 結構化串流中的 API。 如需 API 和代碼段的詳細資訊,請參閱 建置自定義具狀態應用程式 。
然而,API 在即時模式下的行為與微批次查詢不同。
- 即時模式會對每一列呼叫
handleInputRows(key: String, inputRows: Iterator[T], timerValues: TimerValues)方法。-
inputRows反覆運算器會傳回單一值。 微批次模式會為每個鍵呼叫一次,inputRows迭代器會回傳該微批次中該鍵的所有值。 - 在撰寫程式碼時考慮這個差異
-
- 即時模式不支援事件時間定時器。
-
transformWithStateInPandas在即時模式下不支援。 改用列式transformWithStateAPI,它使用Row物件而非 pandas DataFrame。 - 在即時模式下,計時器的觸發會因資料到達而延遲。
- 如果計時器排定在 10:00:00,但沒有資料到達,計時器不會立即觸發。
- 若資料在10:00:10抵達,計時器會在10秒後觸發。
- 若無資料抵達且長期執行的批次正在終止,計時器會在批次結束前觸發。
備註
在 Databricks Runtime 18.1 及以下版本中,如果你使用 transformWithState 並使用即時模式處理低吞吐量(每秒少於 5 筆記錄)的 Python,可能會增加到數百毫秒的延遲。 Databricks 建議升級到 Databricks Runtime 18.2 及以上以解決。
Python 即時模式下的 UDF
Databricks 支援大多數 Python 使用者定義函式(UDF)的即時模式:
無狀態
| UDF 類型 | 支援 |
|---|---|
| Python 標量 UDF(使用者自訂標量函數 - Python) | ✓ |
| Apache Arrow 純量 UDF | ✓ |
| Pandas 標量 UDF(pandas 使用者自定義函數) | ✓ |
箭頭函數 (mapInArrow) |
✓ |
| Pandas 函數(地圖) | ✓ |
有狀態分組(UDAF)
| UDF 類型 | 支援 |
|---|---|
transformWithState (僅限介面 Row ) |
✓ |
transformWithStateInPandas |
不支援。 改用列式 transformWithState API,它使用 Row 物件而非 pandas DataFrame。 詳情請參見 transformWithStateInPandas 「不支援 」。 |
applyInPandasWithState |
不支援 |
無狀態分群(UDAF)
| UDF 類型 | 支援 |
|---|---|
apply |
不支援 |
applyInArrow |
不支援 |
applyInPandas |
不支援 |
表格功能
| UDF 類型 | 支援 |
|---|---|
| UDTF(Python使用者定義表格函式(UDTFs)) | 不支援 |
| 加州大學 UDF | 不支援 |
在即時模式下使用 Python UDF 時,有幾個要點需要考慮:
- 為了降低延遲,請將 Arrow 的批次大小
spark.sql.execution.arrow.maxRecordsPerBatch()設為 1。- 取捨:此組態會以犧牲輸送量為代價來最佳化延遲。 對於大部分的工作負載,建議使用此設定。
- 只有在需要更高的輸送量來容納輸入量時,才增加批次大小,並接受延遲的潛在增加。
- Pandas UDF 和函數在 Arrow 批次大小為 1 時表現不佳。
- 如果您使用 pandas UDF 或函數,請將箭頭批次大小設定為較高的值 (例如,100 或更高)。
- 這代表延遲會更高。 Databricks 建議如果可能的話,使用 Arrow 的 UDF 或函式。
-
transformWithStateInPandas在即時模式下不支援。 改用列式transformWithStateAPI,它使用Row物件而非 pandas DataFrame。 請參考transformWithStateInPandasNot supported 以及 Real-time mode 範例 使用資料列 API 的可運作Python範例。 - 對於延遲敏感且帶有 UDF 的工作負載,Databricks 建議你使用專用存取模式。 在標準存取模式下,安全隔離的開銷可能會降低 UDF 效能。