檢查點和預寫記錄檔會一起運作,為結構化串流工作負載提供處理保證。 檢查點會追蹤識別查詢的資訊,包括狀態資訊和已處理的記錄。 刪除檢查點目錄中的檔案或變更為新的檢查點位置時,查詢的下一次執行會全新開始。
每個查詢都必須有不同的檢查點位置。 多個查詢不應該共用相同的位置。
啟用檢查點以進行結構化串流查詢
您必須在執行串流查詢之前指定 checkpointLocation 選項,如下列範例所示:
Python
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
Scala
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
備註
如果您省略此選項,某些接收器 (例如筆記本中的 display() 輸出和 memory 接收器) 會自動產生臨時檢查點位置。 這些暫時檢查點位置不會確保任何容錯或資料一致性保證,並且可能無法被正確清理。 Databricks 建議一律為這些接收器指定檢查點位置。
在結構化串流查詢變更後進行復原
重新啟動時,如果從相同的檢查點位置進行,則對於串流查詢中允許的變更會有一些限制。 以下是一些不允許的變更,或變更的效果未妥善定義。 針對所有這些:
- allowed 字詞表示您可以執行指定的變更,但其效果的語意是否妥善定義取決於查詢和變更。
- not allowed 字詞表示您不應該執行指定的變更,因為重新啟動的查詢可能會因為無法預測的錯誤而失敗。
-
sdf表示以sparkSession.readStream產生的串流資料框架/資料集。
結構化串流查詢中的變更類型
輸入來源的數量或類型變更 (也就是不同的來源):這是不允許的。
輸入來源參數的變更:是否允許此變更,以及變更的語意是否明確,取決於來源與查詢,包括如
maxFilesPerTrigger或maxOffsetsPerTrigger的准入控制。 以下是一些範例。允許新增、刪除和修改速率限制:
spark.readStream.format("kafka").option("subscribe", "article")到
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)訂閱文章和檔案的變更通常不允許,因為結果無法預測:
spark.readStream.format("kafka").option("subscribe", "article")至spark.readStream.format("kafka").option("subscribe", "newarticle")
觸發間隔的變更:您可以在增量批次和時間間隔之間切換觸發條件。 請參見 變更執行之間的觸發間隔。
輸出接收器類型的變更:允許在幾個特定接收器組合之間進行變更。 這需要逐案驗證。 以下是一些範例。
- 允許檔案接收器至 Kafka 接收器。 Kafka 只會看到新的資料。
- 不允許從 Kafka 匯出至檔案匯出。
- Kafka 接收器已變更為 foreach,反之亦然。
輸出接收器參數的變更:這是否允許,以及變更的語意是否妥善定義,取決於接收器和查詢。 以下是一些範例。
- 不允許變更檔案接收器的輸出目錄:
sdf.writeStream.format("parquet").option("path", "/somePath")至sdf.writeStream.format("parquet").option("path", "/anotherPath") - 允許對輸出主題進行變更:
sdf.writeStream.format("kafka").option("topic", "topic1")至sdf.writeStream.format("kafka").option("topic", "topic2") - 允許對使用者定義的 foreach 接收器進行變更 (也就是
ForeachWriter程式碼),但變更的語意取決於程式碼。
- 不允許變更檔案接收器的輸出目錄:
投影/篩選/對應等作業的變更:有些情況是允許的。 例如:
- 允許新增/刪除篩選條件:
sdf.selectExpr("a")至sdf.where(...).selectExpr("a").filter(...)。 - 允許更改具有相同輸出結構的投影:
sdf.selectExpr("stringColumn AS json").writeStream至sdf.select(to_json(...).as("json")).writeStream。 - 有條件地允許具有不同輸出結構描述的投影變更:只有在輸出接收器允許結構描述從
"a"變更為"b"時,才允許sdf.selectExpr("a").writeStream變更至sdf.selectExpr("b").writeStream。
- 允許新增/刪除篩選條件:
具狀態作業的變更:串流查詢中的某些作業需要維護狀態資料,才能持續更新結果。 結構化串流會自動對狀態資料進行檢查點設定,將其存儲於容錯儲存空間(例如 DBFS、Azure Blob 儲存體),並在重新啟動後進行還原。 不過,這會假設狀態資料的結構描述在重新啟動時維持不變。 這表示在重新啟動之間不允許對串流查詢的具狀態作業進行任何變更 (也就是新增、刪除或結構描述修改)。 以下是有狀態操作清單,為確保狀態恢復,結構不應在重啟間改變:
-
串流彙總:例如
sdf.groupBy("a").agg(...)。 不允許對群組索引鍵或彙總的數量或類型進行任何變更。 -
串流重複資料刪除:例如
sdf.dropDuplicates("a")。 不允許對群組索引鍵或彙總的數量或類型進行任何變更。 -
串流-串流聯結:例如
sdf1.join(sdf2, ...)(亦即這兩個輸入都是使用sparkSession.readStream產生)。 不允許更改結構或等值聯結欄位。 不允許聯結類型 (外部或內部) 中的變更。 聯結條件中的其他變更定義不正確。 -
任意具狀態作業:例如
sdf.groupByKey(...).mapGroupsWithState(...)或sdf.groupByKey(...).flatMapGroupsWithState(...)。 不允許對使用者定義狀態的結構描述進行任何變更,而且不允許逾時類型。 允許使用者定義狀態對應函數內的任何變更,但變更的語意效果取決於使用者定義的邏輯。 如果您真的想要支援狀態結構描述變更,可以使用支援結構描述移轉的編碼/解碼結構描述,將複雜的狀態資料結構明確地編碼/解碼成位元組。 例如,如果您將狀態儲存為 Avro 編碼的位元組,就可以在查詢重新啟動之間變更 Avro-state-schema,因為這樣會還原二進位狀態。
-
串流彙總:例如
這很重要
有狀態運算子 dropDuplicates() 和 dropDuplicatesWithinWatermark() 在切換計算存取模式時,可能因狀態結構相容性檢查而無法重啟。
允許在專用與無隔離存取模式間切換。 允許在標準與無伺服器存取模式間切換。 請勿嘗試在其他存取模式組合間切換。
為避免此錯誤,請勿更改包含這些運算元的串流查詢的運算存取模式。