共用方式為


利用 Azure Stream Analytics 的查詢平行化技術

本文會示範如何利用 Azure 串流分析中的平行化作業。 您可以了解如何透過設定輸入資料分割並調整分析查詢定義來調整串流分析工作。

先決條件是,您必須熟悉了解及調整串流處理單位中所述的串流處理單位概念。

串流分析工作由哪些部分所組成?

串流分析作業定義包含至少一個串流輸入、一個查詢,以及輸出。 輸入是指作業讀取數據流的來源。 查詢是用來轉換資料輸入資料流,而輸出是作業傳送作業結果的目的地。

輸入和輸出中的分割區

資料分割可讓您依據分割鍵,將資料分成子集。 如果您的輸入 (例如 Event Hubs) 已透過索引鍵進行分區,則建議您在將輸入新增至串流分析工作時指定此分區索引鍵。 擴展串流分析作業運用了輸入和輸出中的分割區。 串流分析工作能夠以平行方式讀取及寫入不同的分割,這樣能增加輸送量。

輸入

所有 Azure Stream Analytics 的串流輸入都可以利用分割功能:包括事件中樞、IoT 中樞、Blob 儲存體、Data Lake Storage Gen2。

注意

針對相容性層級 1.2 和更高層級,分割區索引鍵會設定為「輸入屬性」,而不需要查詢中的 PARTITION BY 關鍵字。 針對相容性層級 1.1 和其以下版本,分割區索引鍵需要在查詢中使用 PARTITION BY 關鍵字來定義。

輸出

在使用 Stream Analytics 時,您可以利用輸出中的分區功能:

  • Azure Data Lake Storage
  • Azure Functions
  • Azure 資料表
  • Blob 儲存體 (可明確地設定資料分割索引鍵)
  • Azure Cosmos DB (需要明確地設定分割區索引鍵)
  • 事件中樞 (必須明確地設定分割區索引鍵)
  • IoT Hub(需要明確設定分割區索引鍵)
  • 服務匯流排
  • 具有選用資料分割的 SQL 和 Azure Synapse Analytics:如需詳細資訊,請參閱輸出至 Azure SQL Database 頁面

Power BI 不支援資料分割。 不過,您仍然可以分割輸入,如本節中所述。

如需分割區的詳細資訊,請參閱下列文章:

查詢

若要讓工作平行,分割鍵需在所有輸入、查詢邏輯步驟和輸出之間保持一致。 查詢邏輯資料分割是由用於聯結和彙總的索引鍵所決定 (GROUP BY)。 如果查詢邏輯沒有索引鍵,則可以忽略最後一項需求(投影、篩選、引用聯結...)。

  • 如果輸入和輸出由 WarehouseId 分割,且查詢依 ProductId 群組但沒有 WarehouseId,那麼該作業並不是平行的。
  • 如果要聯結的兩個輸入是依不同的分割區索引鍵 (WarehouseIdProductId) 進行資料分割,則作業無法並行。
  • 如果單一工作中包含兩個以上的獨立資料流程,並且每個資料流程都有自己的分割鍵,則工作不會是平行的。

只有在所有輸入、輸出和查詢步驟都使用相同的索引鍵時,作業才會平行。

窘迫平行作業

「窘迫平行」作業是 Azure 串流分析中調整性最高的案例。 它將輸入的某個資料分割連接到查詢的一個實例,然後連接到輸出的某個資料分割。 此平行性需要滿足下列要求:

  • 如果查詢邏輯相依於同一個查詢執行個體所處理的相同索引鍵,則您必須確保事件會傳送至輸入的相同分割區。 對於事件中樞或 IoT 中樞,這表示事件資料必須設定 PartitionKey 值。 或者,您可以使用分區傳送者。 針對 Blob 儲存體,這表示事件會傳送至相同的分割區資料夾。 例如,一個查詢執行個體會針對每個 userID 彙總資料,而輸入事件中樞則使用 userID 作為分區鍵來進行分割。 不過,如果您的查詢邏輯並不需要由同一個查詢執行個體處理相同的索引鍵,則您可以忽略這個需求。 簡單的選取-投影-篩選查詢是這種邏輯的一個例子。

  • 下一步是將您的查詢進行資料分割。 對於相容性層級 1.2 或更高版本的作業(建議),可以在輸入設定中將自訂資料行指定為分區索引鍵,該作業便會自動平行化執行。 針對具有相容性層級 1.0 或 1.1 的作業,您必須在查詢的所有步驟中使用 PARTITION BY PartitionId。 可以使用多個步驟,但全部必須以相同的鍵值進行分區。

  • 串流分析所支援的大部分輸出都可以利用資料分割。 如果您使用的輸出類型不支援資料分割,您的作業將無法達到「令人愉悅的平行化」的效果。 針對事件中樞輸出,請確定 分割區索引鍵欄 設定為查詢中使用的相同分割區索引鍵。 如需詳細資訊,請參閱輸出區段

  • 輸入分割區的數目必須等於輸出分割區的數目。 Blob 儲存體輸出可支援資料分割,並繼承上游查詢的資料分割配置。 當針對 Blob 儲存體指定資料分割索引鍵時,資料會依每個輸入分割區進行分割,因此結果仍然是完全平行。 以下是允許完全平行作業的分割區值範例:

    • 八個事件中樞輸入分割區和八個事件中樞輸出分割區
    • 八個事件中樞輸入分割區和 Blob 儲存體輸出
    • 八個事件中樞輸入分割區和 Blob 儲存體輸出,依含有任意基數的自訂欄位進行資料分割
    • 八個 Blob 儲存體輸入分割區和 Blob 儲存體輸出區域
    • 八個 Blob 儲存體輸入分割區和八個事件中樞輸出分割區

以下幾節討論一些窘迫平行的範例情節。

簡單查詢

  • 輸入:具有八個分割槽的事件中樞
  • 輸出:具有八個分割區的事件中樞 (「分割區鍵欄」必須設定為使用 PartitionId

查詢:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

此查詢是簡單的篩選。 因此,我們不需要擔心對傳送到事件中心的輸入進行分區。 請注意,相容性層級低於 1.2 的作業必須包含 PARTITION BY PartitionId 子句,使其能滿足稍早所述的需求 #2。 對於輸出,我們必須將作業中的事件中樞輸出設定為讓分割區索引鍵設為 PartitionId。 最後一項檢查是確保輸入分割區數目等於輸出分割區數目。

使用群組鍵的查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:Blob 儲存體

查詢:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

此查詢含有群組索引鍵。 因此,分組在一起的事件必須傳送至相同的事件中樞分割區。 在此範例中,我們是依 TollBoothID 分組,因此,我們應該確保在將事件傳送至事件中樞時,將 TollBoothID 當作分割區索引鍵使用。 然後在 Azure Stream Analytics 中,您可以使用 PARTITION BY PartitionId 從此資料分割配置繼承,並啟用完整平行化。 因為輸出是 blob 儲存體,所以我們不需要擔心設定分割區索引鍵值,以滿足需求 #4。

不屬於「極易平行化」的範例情境

在上一節中,本文涵蓋了一些非常易於平行處理的案例。 在本節中,將討論不符合窘迫平行所有需求的案例。

不相符的分割區數量

  • 輸入:具有八個分區的事件中心
  • 輸出:具有 32 個分割區的事件中樞

如果輸入分割區計數不符合輸出分割區計數,則無論查詢內容為何,拓撲都不會是天然平行。 不過,我們仍然可以取得某種程度的平行化。

使用非資料分割的輸出查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:Power BI

Power BI 輸出目前不支援資料分割。 因此,此案例不是窘迫平行。

具有不同 PARTITION BY 值的多重步驟查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:具有八個分割區的事件中樞
  • 相容性層級:1.0 或 1.1

查詢:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

如您所見,第二個步驟使用TollBoothId作為分區鍵。 此步驟與第一個步驟不同,因此需要進行重新排列。

具有不同 PARTITION BY 值的多重步驟查詢

  • 輸入:具有八個分割區的事件中樞 (未設定 [分割區索引鍵資料行],預設為 [PartitionId])
  • 輸出:具有八個分割區的事件中樞 ([分割區索引鍵資料行] 必須設定為使用 [TollBoothId])
  • 相容性層級 - 1.2 或更高版本

查詢:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

相容性層級 1.2 或更高版本預設會啟用平行查詢執行。 例如,只要將 [TollBoothId] 資料行設為輸入的分割區索引鍵,系統就會分割上一節中的查詢。 不需要 PARTITION BY PartitionId 子句。

計算工作的最大串流單元

資料流分析工作可使用的串流處理單元總數,取決於為工作定義之查詢中的步驟數目,和每個步驟的資料分割數目。

查詢中的步驟

一個查詢可以有一或多個步驟。 每個步驟都是使用 WITH 關鍵字定義的子查詢。 在 WITH 關鍵字外的查詢 (只有一個查詢) 也視為一個步驟,例如下列查詢中的 SELECT 陳述式:

查詢:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

此查詢有兩個步驟。

注意

本文稍後會詳細討論此查詢。

劃分步驟

要分割步驟必須符合下列條件:

  • 必須分割輸入來源。
  • 查詢的 SELECT 陳述式必須讀取某個已分割的輸入來源。
  • 步驟內的查詢必須有 PARTITION BY 關鍵字。

分割查詢時,將會在個別的分割區群組中處理和彙總輸入事件,然後為每個群組產生輸出事件。 如果想要合併的彙總結果,您必須建立一個不分區的第二步驟來進行彙總。

計算工作的最大串流單元數量

所有未分割的步驟合併在一起,可以將串流分析作業擴展到最多一個串流單位 (SU V2)。 此外,您可以在分割步驟中,為每個分割區新增一個 SU V2。 您可以在下表中看到一些範例

查詢 作業的最多 SU
  • 查詢包含一個步驟。
  • 此步驟未分割。
1 個 SU V2
  • 輸入資料流會分割為 16 個。
  • 查詢包含一個步驟。
  • 步驟被分割。
16 個 SU V2 (1 * 16 個分區)
  • 查詢包含兩個步驟。
  • 這兩個步驟均未分割。
1 SU V2
  • 輸入資料流被分為 3 個部分。
  • 查詢包含兩個步驟。 輸入步驟已進行分區,第二步驟則不分區。
  • SELECT 陳述式會讀取分割的輸入。
4 個 SU V2s (3 個用於分割步驟 + 1 用於非分割步驟)

擴展的範例

下列查詢會計算三分鐘內通過收費站 (有三個收費亭) 的車流量。 此查詢可以調整為最多一個 SU V2。

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

若要讓查詢使用更多 SU,輸入資料流和查詢都必須分割。 因為資料流分割區設為 3,以下修改的查詢可以調整為最多 3 個 SU V2:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

分割查詢時將會處理輸入事件並將其彙總在個別的資料分割群組中。 也會為每個群組產生輸出事件。 當 GROUP BY 欄位不是輸入資料流中的分割區索引鍵時,分割區可能會導致某些非預期的結果。 例如,上一個查詢中的 TollBoothId 欄位不是 Input1 的分割區索引鍵。 結果是來自收費亭 #1 的資料可能會被分散至多個分區。

串流分析會個別處理每個 Input1 分割區。 因此,在相同的滾動視窗中,相同收費亭的汽車計數將會產生多筆記錄。 如果無法變更輸入分割區索引鍵,可藉由新增未分割的步驟來彙總跨分割區的值來解決此問題,如下列範例所示:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

此查詢可以擴展至 4 個 SU V2。

注意

如果您要合併兩個資料流,請確定資料流依據您用來建立聯結的欄位的分區鍵進行分割。 也請確定兩個資料流中的分割區數目相同。

在擴充規模時達到更高的吞吐量

窘迫平行作業雖為必要,但並不足以大規模地維持較高的輸送量。 每個儲存體系統和其對應的串流分析輸出,在達成最佳寫入輸送量的方法上皆有所不同。 如同任何大規模的案例,可以透過使用正確的設定來解決一些挑戰。 本節討論幾個常見輸出的設定,並提供範例以維持每秒 1 K、5 K 與 10 K 個事件的吞吐速率。

下列觀察使用無狀態(直通)查詢的流分析作業,以及基本的 JavaScript 使用者定義函數(UDF),將資料寫入 Event Hubs、Azure SQL 或 Azure Cosmos DB。

事件中樞

資料攝取速率 (每秒事件數) 串流處理單位 輸出資源
1 K 1/3 2 TU
5公里 1 6 個 TU
10K 2 10 TU

事件中樞解決方案會針對串流單位 (SU) 和輸送量進行線性調整,這使其成為從串流分析中分析和串流資料的最有效率且最高效能的方法。 工作可以擴大到 66 個 SU V2,其大約具有最多 400 MB/s 的處理效能,或每天處理 38 兆個事件。

Azure SQL

資料攝取速率 (每秒事件數) 串流處理單位 輸出資源
1 K 2/3 S3
5公里 3 P4
10K 6 P6

Azure SQL 支援平行寫入 (稱為繼承資料分割),但預設不會予以啟用。 不過,即使啟用「繼承資料分割」並全面平行化查詢,可能仍不足以提升輸送量。 SQL 寫入輸送量大幅取決於您的資料庫設定和資料表結構描述。 SQL 輸出效能一文具有可將寫入輸送量最大化之參數的詳細資料。 如 串流分析輸出至 Azure SQL Database 一文中所述,此解決方案在超過 8 個分割區之後,並不會以完全平行管線的形式進行線性調整,且可能需要在 SQL 輸出之前重新分割 (請參閱 INTO)。 需要進階 SKU 以維持高 IO 速率,以及由每隔幾分鐘便會發生的記錄備份所帶來的額外負荷。

Azure Cosmos DB

資料攝取速率 (每秒事件數) 串流處理單位 輸出資源
1 K 2/3 20 K RU
5公里 4 60 K RU
10K 8 120 K RU

來自串流分析的 Azure Cosmos DB 輸出已更新為在相容性層級 1.2下使用原生整合。 與相容性層級 1.1 (其為新作業的預設合規性層級) 相比,1.2 能提供明顯更高的輸送量並減少 RU 耗用量。 該解決方案會使用基於 /deviceId 進行分割的 Azure Cosmos DB 容器,而其他部分的設定也會以相同方式進行。

所有大規模串流 Azure 範例都會使用事件中樞作為輸入,而事件中樞是由負載模擬測試用戶端所餵入。 每個輸入事件都是 1 KB 的 JSON 文件,其可以輕鬆地將所設定的擷取速率轉譯為輸送量速率 (1 MB/s、5 MB/s 及 10 MB/s)。 事件會模擬 IoT 裝置針對最多 1000 個裝置傳送下列 JSON 資料 (以縮短的形式):

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

注意

由於用於解決方案中的各種元件,設定可能會有所變更。 如需更精確的評估,請自訂範例以符合您的案例。

找出瓶頸

使用您 Azure 串流分析作業中的 [計量] 窗格來找出管線中的瓶頸。 檢閱「輸入/輸出事件」 以了解輸送量,並檢視「浮水印延遲」 或「待處理的事件」,以確認作業是否能跟上輸入速率。 對於 Event Hubs 的計量,請查找受限的要求,並據此調整閾值單位。 針對 Azure Cosmos DB 計量,檢閱 [輸送量] 底下的 [每個分割區索引鍵範圍每秒取用的 RU 上限],以確保系統會一致地取用您的分割區索引鍵範圍。 針對 Azure SQL DB,請監視 [記錄 IO] 和 [CPU]

取得協助

如需進一步的協助,請嘗試 Azure 串流分析的 Microsoft 問與答頁面

下一步