共用方式為


具狀態查詢的異步狀態檢查點

注意

適用於 Databricks Runtime 10.4 LTS 和更新版本。

異步狀態檢查點會針對串流查詢維持一次完全一次的保證,但可能會降低某些結構化串流具狀態工作負載在狀態更新上瓶頸的整體延遲。 這可藉由開始處理下一個微批次,只要前一個微批次的計算完成,就不需要等待狀態檢查點完成。 下表比較同步和異步檢查點的取捨:

特性 同步檢查點 異步檢查點
延遲 每個微批次的延遲較高。 減少延遲,因為微批次可以重疊。
重新啟動 快速復原,因為只需要重新執行最後一個批次。 重新啟動延遲較高,因為可能需要重新執行 micro-batch 以上。

以下是可能受益於異步狀態檢查點的串流作業特性:

  • 作業有一或多個具狀態作業(例如匯總、 flatMapGroupsWithState、、 mapGroupsWithState數據流聯結)
  • 狀態檢查點延遲是整體批次執行延遲的主要參與者之一。 您可以在 StreamingQueryProgress 事件中找到此資訊。 這些事件也會在Spark驅動程式的log4j記錄中找到。 以下是串流查詢進度的範例,以及如何尋找狀態檢查點對整體批次執行延遲的影響。
    • {
         "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
         "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
         "...",
         "batchId" : 0,
         "durationMs" : {
           "...",
           "triggerExecution" : 547730,
           "..."
         },
         "stateOperators" : [ {
           "...",
           "commitTimeMs" : 3186626,
           "numShufflePartitions" : 64,
           "..."
         }]
      }
      
    • 上述查詢進度事件的狀態檢查點延遲分析

      • 批次持續時間 (durationMs.triggerDuration) 大約是547秒。
      • 狀態存放區認可延遲 (stateOperations[0].commitTimeMs) 大約是 3,186 秒。 認可延遲會匯總到包含狀態存放區的工作。 在此情況下,有64個這類工作 (stateOperators[0].numShufflePartitions)。
      • 包含狀態運算子的每個工作平均花費 50 秒 (3,186/64) 檢查點。 這是造成批次持續時間的額外延遲。 假設所有 64 個工作同時執行,檢查點步驟貢獻了批次持續時間的大約 9% (50 秒/547 秒)。 當最大並行工作小於 64 時,百分比會更高。

啟用異步狀態檢查點

您必須使用 RocksDB 型狀態存放區 進行異步狀態檢查。 設定下列組態:


spark.conf.set(
  "spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
  "true"
)

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

異步檢查點的限制和需求

注意

計算自動調整有相應減少結構化串流工作負載的叢集大小限制。 Databricks 建議針對串流工作負載使用差異即時資料表與增強型自動調整。 請參閱 使用增強式自動調整優化差異實時數據表管線的叢集使用率。

  • 任何一或多個存放區異步檢查點中的任何失敗都會導致查詢失敗。 在同步檢查點模式中,檢查點會當做工作的一部分執行,Spark 會在查詢失敗之前多次重試工作。 這個機制不存在於異步狀態檢查點。 不過,使用 Databricks 作業重試,這類失敗可以自動重試。
  • 當狀態存放區位置在微批次執行之間未變更時,異步檢查點的運作效果最佳。 叢集重設大小,與異步狀態檢查點結合,可能無法正常運作,因為狀態存放區實例可能會隨著節點在叢集重設大小事件中新增或刪除而重新散發。
  • 只有 RocksDB 狀態存放區提供者實作才支援異步狀態檢查點。 默認記憶體內部狀態存放區實作不支援它。