Azure Blob 儲存體檔案來源與 Azure 佇列儲存體 (舊版)
重要
此文件已淘汰,且可能未更新。 不再支援此內容中提及的產品、服務或技術。 請參閱 什麼是自動載入器?。
ABS-AQS 連接器提供優化的檔案來源,其使用 Azure 佇列記憶體 (AQS) 來尋找寫入 Azure Blob 記憶體 (ABS) 容器的新檔案,而不重複列出所有檔案。 這提供兩個優點:
- 較低的延遲:不需要列出 ABS 上的巢狀目錄結構,這會很慢且需要大量資源。
- 成本較低:不需對 ABS 提出成本更高的 LIST API 要求。
注意
ABS-AQS 來源會在取用事件時,從 AQS 佇列刪除訊息。 如果您想要其他管線從此佇列取用訊息,請為優化讀取器設定個別的 AQS 佇列。 您可以設定多個事件方格訂用帳戶,以發佈至不同的佇列。
若要使用 ABS-AQS 檔案來源,您必須:
利用 Azure 事件方格 訂用帳戶並將其路由傳送至 AQS,以設定 ABS 事件通知。 請參閱 回應 Blob 記憶體事件。
fileFormat
指定和queueUrl
選項和架構。 例如:spark.readStream \ .format("abs-aqs") \ .option("fileFormat", "json") \ .option("queueName", ...) \ .option("connectionString", ...) \ .schema(...) \ .load()
若要向 Azure 佇列記憶體和 Blob 記憶體進行驗證,請使用共用存取簽章 (SAS) 令牌或記憶體帳戶密鑰。 您必須為部署佇列的記憶體帳戶提供 連接字串,其中包含 SAS 令牌或記憶體帳戶的存取密鑰。 如需詳細資訊,請參閱設定 Azure 儲存體連接字串。
您也必須提供 Azure Blob 記憶體容器的存取權。 如需如何設定 Azure Blob 記憶體容器存取權的相關信息,請參閱 連線至 Azure Data Lake Storage Gen2 和 Blob 記憶體 。
注意
Databricks 強烈建議您使用管理秘密來提供 連接字串。
選項 | 類型 | 預設 | 描述 |
---|---|---|---|
allowOverwrites | 布林值 | true |
是否應該重新處理覆寫的 Blob。 |
connectionString | String | 無(必要參數) | 要存取佇列的 連接字串。 |
fetchParallelism | 整數 | 1 | 從佇列服務擷取訊息時要使用的執行緒數目。 |
fileFormat | String | 無(必要參數) | 、parquet json 、csv 、 text 等檔案的格式。 |
ignoreFileDeletion | 布林值 | false |
如果您有生命週期設定,或手動刪除來源檔案,則必須將這個選項設定為 true 。 |
maxFileAge | 整數 | 604800 | 決定將檔案通知儲存為狀態,以防止重複處理的時間(以秒為單位)。 |
pathRewrites | JSON 字串。 | "{}" |
如果您使用裝入點,您可以使用裝入點重寫路徑的 container@storageAccount/key 前置詞。 只能重寫前置詞。 例如,針對組態 {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"} ,路徑 wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json 會重寫為dbfs:/mnt/data-warehouse/2017/08/fileA.json . |
queueFetchInterval | 例如, 2m 持續時間字串為2分鐘。 |
"5s" |
如果佇列是空的,在擷取之間等候的時間長度。 每個 API 要求對 AQS 的 Azure 費用。 因此,如果數據不常到達,這個值可以設定為較長的持續時間。 只要佇列不是空的,我們會持續擷取。 如果每 5 分鐘建立新的檔案,您可能會想要設定高 queueFetchInterval ,以減少 AQS 成本。 |
queueName | String | 無(必要參數) | AQS 佇列的名稱。 |
如果您在驅動程式記錄中觀察到許多看起來像 Fetched 0 new events and 3 old events.
的訊息,其中您通常會觀察到比新事件更多的舊事件,您應該減少串流觸發間隔。
如果您要從 Blob 記憶體上的位置取用檔案,而您預期某些檔案可能會在處理之前遭到刪除,您可以設定下列設定來忽略錯誤並繼續處理:
spark.sql("SET spark.sql.files.ignoreMissingFiles=true")
如果 ignoreFileDeletion
為 False (預設值)且物件已刪除,則會讓整個管線失敗嗎?
是,如果我們收到事件,指出檔案已刪除,則會讓整個管線失敗。
我該如何設定 maxFileAge
?
Azure 佇列記憶體提供至少一次的訊息傳遞語意,因此我們需要保留重複數據刪除的狀態。 的預設設定 maxFileAge
為 7 天,等於佇列中訊息的最大 TTL。