非同步進度追蹤透過讓查詢能非同步更新檢查點進度並處理每個微批次的資料,降低結構化串流管線的延遲。
在查詢處理過程中,結構化串流會持續運作並管理偏移量,以衡量每個微批次的offsetLogcommitLog查詢進度。 若沒有非同步進度追蹤,偏移管理操作會直接影響處理延遲,因為資料處理必須完成才能繼續。
注意
非同步進度追蹤與Trigger.once或Trigger.availableNow觸發器不相容。 啟用時,具有 Trigger.once 或 Trigger.availableNow 的結構化串流查詢將失敗。
設定選項
| 選擇 | 預設 | 描述 |
|---|---|---|
asyncProgressTrackingEnabled |
false |
是否啟用非同步進度追蹤。 |
asyncProgressTrackingCheckpointIntervalMs |
1000 |
在位移寫入與完成提交操作之間的毫秒間隔。 |
啟用非同步進度追蹤
要啟用非同步進度追蹤,請設定 asyncProgressTrackingEnabled 為 true:
Python
stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
)
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
)
Scala
val stream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "in")
.load()
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.start()
透過檢查點頻率提升吞吐量
預設的檢查點頻率( 1000 毫秒)對大多數查詢來說都有良好的吞吐量。 當偏移管理作業發生的速度快於非同步進度追蹤能夠處理的速度時,偏移管理作業會產生積壓。 為防止積壓持續擴大,非同步進度追蹤可能會阻擋或減慢資料處理,可能侵蝕預期的延遲效益。
在此情境下,Databricks 建議你延長檢查點間隔:
Python
query = (stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
)
Scala
val query = stream.writeStream
.format("kafka")
.option("topic", "out")
.option("checkpointLocation", "/tmp/checkpoint")
.option("asyncProgressTrackingEnabled", "true")
.option("asyncProgressTrackingCheckpointIntervalMs", "5000")
.start()
注意
故障恢復時間會隨著檢查點間隔時間增加。 若失敗,管線必須重新處理自上次成功檢查點以來的所有資料。 在你做出生產調整前,請考慮在正常處理時較低延遲與失敗時復原時間之間的權衡。
關閉非同步進度追蹤
啟用非同步進度追蹤時,串流無法保證每個批次都有檢查點進度。 你必須先檢查進度點才能關閉此功能。
關閉時,請遵循以下步驟:
處理至少兩個微批次,分別設定
asyncProgressTrackingEnabled為true,asyncProgressTrackingCheckpointIntervalMs且 設定為0:Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .option("asyncProgressTrackingCheckpointIntervalMs", "0") .start()停止詢問:
Python
query.stop()Scala
query.stop()關閉非同步進度追蹤並重新啟動查詢:
Python
query = (stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start() )Scala
val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "false") .start()
如果你關閉非同步進度追蹤而不遵循上述步驟,可能會遇到以下錯誤:
java.lang.IllegalStateException: batch x doesn't exist
在驅動程式記錄檔中,您可能會看到下列錯誤:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
Limitations
- 對於 Kafka 匯流器,非同步進度追蹤僅支援無狀態資料管線。
- 非同步進度追蹤無法保證端到端處理的精確一次,因為批次的偏移範圍可能在失敗時發生變化。 某些接收器,例如 Kafka,無法提供精確一次的保證。