共用方式為


Azure Blob 記憶體檔案來源與 Azure 佇列記憶體(舊版)

重要

此檔已淘汰,且可能未更新。 不再支援此內容中所提及的產品、服務或技術。 請參閱 什麼是自動載入器?

ABS-AQS 連接器提供優化的檔案來源,其使用 Azure 佇列記憶體 (AQS) 來尋找寫入 Azure Blob 記憶體 (ABS) 容器的新檔案,而不重複列出所有檔案。 這提供兩個優點:

  • 較低的延遲:不需要列出 ABS 上的巢狀目錄結構,這會很慢且需要大量資源。
  • 成本較低:不需對 ABS 提出成本更高的 LIST API 要求。

注意

ABS-AQS 來源會在取用事件時,從 AQS 佇列刪除訊息。 如果您想要其他管線從此佇列取用訊息,請為優化讀取器設定個別的 AQS 佇列。 您可以設定多個事件方格訂用帳戶,以發佈至不同的佇列。

使用 ABS-AQS 檔案來源

若要使用 ABS-AQS 檔案來源,您必須:

  • 利用 Azure 事件方格 訂用帳戶並將其路由傳送至 AQS,以設定 ABS 事件通知。 請參閱 回應 Blob 記憶體事件

  • fileFormat指定和 queueUrl 選項和架構。 例如:

    spark.readStream \
      .format("abs-aqs") \
      .option("fileFormat", "json") \
      .option("queueName", ...) \
      .option("connectionString", ...) \
      .schema(...) \
      .load()
    

使用 Azure 佇列記憶體和 Blob 記憶體進行驗證

若要向 Azure 佇列記憶體和 Blob 記憶體進行驗證,請使用共用存取簽章 (SAS) 令牌或記憶體帳戶密鑰。 您必須為您的佇列部署所在的記憶體帳戶提供 連接字串,其中包含 SAS 令牌或記憶體帳戶的存取金鑰。 如需詳細資訊,請參閱設定 Azure 儲存體連接字串

您也必須提供 Azure Blob 記憶體容器的存取權。 如需如何設定 Azure Blob 記憶體容器存取權的相關信息,請參閱 連線至 Azure Data Lake Storage Gen2 和 Blob 記憶體

注意

強烈建議您使用秘密來提供 連接字串。

組態

選項 類型 預設 描述
allowOverwrites 布林值 true 是否應該重新處理覆寫的 Blob。
connectionString String 無(必要參數) 存取佇列的 連接字串
fetchParallelism 整數 1 從佇列服務擷取訊息時要使用的線程數目。
fileFormat String 無(必要參數) parquetjsoncsvtext等檔案的格式。
ignoreFileDeletion 布林值 false 如果您有生命週期設定,或手動刪除來源檔案,則必須將這個選項設定為 true
maxFileAge 整數 604800 決定將檔案通知儲存為狀態,以防止重複處理的時間(以秒為單位)。
pathRewrites JSON 字串。 "{}" 如果您使用裝入點,您可以使用裝入點重寫路徑的 container@storageAccount/key 前置詞。 只能重寫前置詞。 例如,針對組態 {"myContainer@myStorageAccount/path": "dbfs:/mnt/data-warehouse"},路徑 wasbs://myContainer@myStorageAccount.blob.windows.core.net/path/2017/08/fileA.json 會重寫為
dbfs:/mnt/data-warehouse/2017/08/fileA.json.
queueFetchInterval 例如, 2m 持續時間字串為2分鐘。 "5s" 如果佇列是空的,在擷取之間等候的時間長度。 每個 API 要求對 AQS 的 Azure 費用。 因此,如果數據不常到達,這個值可以設定為較長的持續時間。 只要佇列不是空的,我們會持續擷取。 如果每 5 分鐘建立新的檔案,您可能會想要設定高 queueFetchInterval ,以減少 AQS 成本。
queueName String 無(必要參數) AQS 佇列的名稱。

如果您在驅動程式記錄中觀察到許多看起來像 Fetched 0 new events and 3 old events.的訊息,其中您通常會觀察到比新事件更多的舊事件,您應該減少串流觸發間隔。

如果您要從 Blob 記憶體上的位置取用檔案,而您預期某些檔案可能會在處理之前遭到刪除,您可以設定下列設定來忽略錯誤並繼續處理:

spark.sql("SET spark.sql.files.ignoreMissingFiles=true")

常見問題集 (FAQ)

如果 ignoreFileDeletion 為 False (預設值)且物件已刪除,則會讓整個管線失敗嗎?

是,如果我們收到事件,指出檔案已刪除,則會讓整個管線失敗。

我該如何設定 maxFileAge

Azure 佇列記憶體提供至少一次的訊息傳遞語意,因此我們需要保留重複數據刪除的狀態。 的預設設定 maxFileAge 為 7 天,等於佇列中訊息的最大 TTL。