使用串流分析中的查詢平行化作業

本文會示範如何利用 Azure 串流分析中的平行化作業。 您可以了解如何透過設定輸入資料分割並調整分析查詢定義來調整串流分析工作。

先決條件是,您必須熟悉了解及調整串流處理單位中所述的串流處理單位概念。

串流分析工作由哪些部分所組成?

串流分析作業定義包含至少一個串流輸入、一個查詢,以及輸出。 輸入是指作業讀取資料流的來源。 查詢是用來轉換資料輸入資料流,而輸出是作業傳送作業結果的目的地。

輸入和輸出中的分割區

資料分割可讓您根據分割區索引鍵 \(部分機器翻譯\),將資料分割成子集。 如果您的輸入 (例如事件中樞) 已透過索引鍵進行資料分割,則建議您在將輸入新增至串流分析工作時指定此分割區索引鍵。 調整串流分析作業需要利用輸入和輸出中的分割區。 串流分析作業能夠以平行方式取用及寫入不同的分割區,其能增加輸送量。

輸入

所有串流分析串流輸入都可以利用資料分割:事件中樞、IoT 中樞、Blob 儲存體、Data Lake Storage Gen2。

注意

針對相容性層級 1.2 和更高層級,分割區索引鍵會設定為「輸入屬性」,而不需要查詢中的 PARTITION BY 關鍵字。 針對相容性層級 1.1 和更低層級,則分割區索引鍵需要「在查詢中」定義 PARTITION BY 關鍵字。

輸出

使用串流分析時,您可以利用輸出中的資料分割:

  • Azure Data Lake 儲存體
  • Azure Functions
  • Azure 資料表
  • Blob 儲存體 (可明確地設定資料分割索引鍵)
  • Azure Cosmos DB (需要明確地設定分割區索引鍵)
  • 事件中樞 (必須明確地設定分割區索引鍵)
  • IoT 中樞 (需要明確地設定資料分割索引鍵)
  • 服務匯流排
  • 具有選用資料分割的 SQL 和 Azure Synapse Analytics:如需詳細資訊,請參閱輸出至 Azure SQL Database 頁面

Power BI 不支援資料分割。 不過,您仍然可以分割輸入,如本節中所述。

如需分割區的詳細資訊,請參閱下列文章:

查詢

若要讓工作平行,分割區索引鍵需要在所有輸入、所有查詢邏輯步驟和所有輸出之間一致。 查詢邏輯資料分割是由用於聯結和彙總的索引鍵所決定 (GROUP BY)。 如果查詢邏輯未加上索引鍵 (投影、篩選、參考聯結等),則可以忽略這個最後一個需求。

  • 如果輸入和輸出是依 WarehouseId 進行資料分割,而且依 ProductId 的查詢群組沒有 WarehouseId,則作業並未平行。
  • 如果要聯結的兩個輸入是依不同的分割區索引鍵 (WarehouseIdProductId) 進行資料分割,則作業並未平行。
  • 如果單一工作中包含兩個以上的獨立資料流程,而且每個資料流程都有自己的分割區索引鍵,則工作未平行。

只有在所有輸入、輸出和查詢步驟都使用相同的索引鍵時,作業才會平行。

窘迫平行作業

「窘迫平行」作業是 Azure 串流分析中調整性最高的案例。 它將對於查詢執行個體之輸入的某個資料分割,連接到輸出的某個資料分割。 此平行處理原則具有下列需求︰

  • 如果查詢邏輯相依於同一個查詢執行個體所處理的相同索引鍵,則您必須確保事件會傳送至輸入的相同分割區。 對於事件中樞或 IoT 中樞,這表示事件資料必須設定 PartitionKey 值。 或者,您可以使用分割的傳送者。 對於 Blob 儲存體,這表示事件會傳送至相同的磁碟分割資料夾。 例如,會針對每個 userID 彙總資料的查詢執行個體,其中輸入事件中樞是使用 userID 作為分割區索引鍵來進行分割。 不過,如果您的查詢邏輯並不需要由同一個查詢執行個體處理相同的索引鍵,則您可以忽略這個需求。 簡單的選取-投影-篩選查詢,即為此邏輯的一個例子。

  • 下一步是將您的查詢進行資料分割。 對於具有相容性層級 1.2 或更高版本 (建議) 的作業,可以在輸入設定中將自訂資料行指定為分割區索引鍵,該作業便會自動平行化。 針對具有相容性層級 1.0 或 1.1 的作業,您必須在查詢的所有步驟中使用 PARTITION BY PartitionId。 您可以使用多個步驟,但全部都必須依相同的索引鍵來分割。

  • 串流分析所支援的大部分輸出都可以利用資料分割。 如果您使用的輸出類型不支援資料分割,您的作業將不會是「窘迫平行」的。 針對事件中樞輸出,請確定 [分割區索引鍵資料行] 設定為用於查詢中的相同分割區索引鍵。 如需詳細資訊,請參閱輸出區段

  • 輸入分割區的數目必須等於輸出分割區的數目。 Blob 儲存體輸出可支援資料分割,並繼承上游查詢的資料分割配置。 當針對 Blob 儲存體指定資料分割索引鍵時,資料會依每個輸入分割區進行分割,因此結果仍然是完全平行。 以下是允許完全平行作業的分割區值範例:

    • 八個事件中樞輸入分割區和八個事件中樞輸出分割區
    • 八個事件中樞輸入分割區和 Blob 儲存體輸出
    • 八個事件中樞輸入分割區和 Blob 儲存體輸出,依含有任意基數的自訂欄位進行資料分割
    • 八個 Blob 儲存體輸入分割區和 Blob 儲存體輸出
    • 八個 Blob 儲存體輸入分割區和八個事件中樞輸出分割區

以下幾節討論一些窘迫平行的範例情節。

簡單查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:具有八個分割區的事件中樞 ([分割區索引鍵資料行] 必須設定為使用 PartitionId)

查詢:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

此查詢是簡單的篩選。 因此,我們不需要擔心將傳送到事件中樞的輸入分割。 請注意,相容性層級低於 1.2 的作業必須包含 PARTITION BY PartitionId 子句,使其能滿足稍早所述的需求 #2。 對於輸出,我們必須將作業中的事件中樞輸出設定為讓資料分割索引鍵設為 PartitionId。 最後一項檢查是確保輸入分割區數目等於輸出分割區數目。

利用群組索引鍵的查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:Blob 儲存體

查詢:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

此查詢含有群組索引鍵。 因此,分組在一起的事件必須傳送至相同的事件中樞分割區。 在此範例中,我們是依 TollBoothID 分組,因此,我們應該確保在將事件傳送至事件中樞時,將 TollBoothID 當作分割區索引鍵使用。 然後在串流分析中,您可以使用 PARTITION BY PartitionId 從此資料分割配置繼承,並啟用完整的平行化作業。 因為輸出是 blob 儲存體,所以我們不需要擔心設定分割區索引鍵值,以滿足需求 #4。

非*易於平行的案例範例

在上一節中,本文涵蓋一些易於平行的平行案例。 在本節中,我們要討論的案例不符合成為易於平行的所有需求。

不相符的分割區計數

  • 輸入:具有八個分割區的事件中樞
  • 輸出:具有 32 個分割區的事件中樞

如果輸入分割區計數不符合輸出分割區計數,則不論查詢為何,拓撲都不會是窘迫平行。 不過,我們仍然可以取得某種程度的平行化。

使用非資料分割的輸出查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:Power BI

Power BI 輸出目前不支援資料分割。 因此,此案例不是窘迫平行。

具有不同 PARTITION BY 值的多重步驟查詢

  • 輸入:具有八個分割區的事件中樞
  • 輸出:具有八個分割區的事件中樞
  • 相容性層級:1.0 或 1.1

查詢:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

如您所見,第二個步驟把[TollBoothId] 當做資料分割索引鍵來使用。 此步驟與第一個步驟不同,因此需要進行變換。

具有不同 PARTITION BY 值的多重步驟查詢

  • 輸入:具有八個分割區的事件中樞 (未設定 [分割區索引鍵資料行],預設為 [PartitionId])
  • 輸出:具有八個分割區的事件中樞 ([分割區索引鍵資料行] 必須設定為使用 [TollBoothId])
  • 相容性層級 - 1.2 或更高版本

查詢:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

相容性層級 1.2 或更高版本預設會啟用平行查詢執行。 例如,只要將 [TollBoothId] 資料行設為輸入分割區索引鍵,系統便會分割上一節中的查詢。 不需要 PARTITION BY PartitionId 子句。

計算工作的最大串流處理單元

資料流分析工作可使用的串流處理單元總數,取決於為工作定義之查詢中的步驟數目,和每個步驟的資料分割數目。

查詢中的步驟

一個查詢可以有一或多個步驟。 每個步驟都是使用 WITH 關鍵字定義的子查詢。 在 WITH 關鍵字外的查詢 (只有一個查詢) 也視為一個步驟,例如下列查詢中的 SELECT 陳述式:

查詢:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

此查詢有兩個步驟。

注意

本文稍後會詳細討論此查詢。

分割步驟

要分割步驟必須符合下列條件:

  • 必須分割輸入來源。
  • 查詢的 SELECT 陳述式必須讀取某個已分割的輸入來源。
  • 步驟內的查詢必須有 PARTITION BY 關鍵字。

分割查詢時,將會在個別的分割區群組中處理和彙總輸入事件,然後為每個群組產生輸出事件。 如果想要合併的彙總,您必須建立第二個非分割步驟來彙總。

計算工作的最大串流處理單元

所有非分割步驟合起來,可將串流分析作業調整為最多一個串流單元 (SU V2)。 此外,您可以在資料分割步驟中,為每個分割區新增一個 SU V2。 您可以在下表中看到一些範例

查詢 作業的最多 SU
  • 查詢包含一個步驟。
  • 此步驟未進行資料分割。
1 個 SU V2
  • 輸入資料流會分割為 16 個。
  • 查詢包含一個步驟。
  • 步驟進行分割。
16 個 SU V2 (1 * 16 個分割區)
  • 查詢包含兩個步驟。
  • 兩個步驟都不會分割。
1 個 SU V2
  • 輸入資料流分割時會除以 3。
  • 查詢包含兩個步驟。 輸入步驟已進行資料分割,第二個步驟則否。
  • SELECT 陳述式會讀取分割的輸入。
4 個 SU V2 (3 個用於分割步驟 + 1 用於非分割步驟

調整的範例

下列查詢會計算三分鐘內通過收費站 (有三個收費亭) 的車流量。 此查詢可以調整為最多一個 SU V2。

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

若要讓查詢使用更多 SU,輸入資料流和查詢都必須分割。 因為資料流分割區設為 3,以下修改的查詢可以調整為最多 3 個 SU V2:

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

分割查詢時將會處理輸入事件並將其彙總在個別的資料分割群組中。 也會為每個群組產生輸出事件。 GROUP BY 欄位不是輸入資料流中的分割區索引鍵時,資料分割可能會導致某些非預期的結果。 例如,上一個查詢中的 TollBoothId 欄位不是 Input1 的分割區索引鍵。 結果是收費亭 #1 的資料可能分散在多個分割區。

串流分析會個別處理每個 Input1 分割區。 因此,在相同的輪轉視窗中,相同收費亭的汽車計數會建立多筆記錄。 如果無法變更輸入分割區索引鍵,可藉由新增非分割步驟彙總資料分割的值來解決此問題,如下列範例所示:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

此查詢可以調整為 4 個 SU V2。

注意

如果您要聯結兩個資料流,請確定以您用來建立聯結的資料行的分割區索引鍵來分割資料流。 也請確定兩個資料流中的分割區數目相同。

大規模達到更高的輸送量

窘迫平行作業雖為必要,但並不足以大規模地維持較高的輸送量。 每個儲存體系統和其對應的串流分析輸出,在達成最佳寫入輸送量的方法上皆有所不同。 如同任何大規模的案例,可以透過使用正確的設定來解決一些挑戰。 本節討論幾個常見輸出的設定,並提供範例來維持每秒 1 K、5 K 與 10 K 個事件的擷取速率。

下列觀察會使用具有無狀態 (傳遞) 查詢的串流分析作業,以及一個寫入至事件中樞、Azure SQL 或 Azure Cosmos DB 的基本 JavaScript UDF。

事件中樞

擷取速率 (每秒事件數) 串流處理單位 輸出資源
1 K 1/3 2 個 TU
5 K 1 6 個 TU
10 K 2 10 個 TU

事件中樞解決方案會針對串流單位 (SU) 和輸送量進行線性調整,這使其成為從串流分析中分析和串流資料的最有效率且最高效能的方法。 工作可以擴大到 66 個 SU V2,其大約具有最多 400 MB/s 的處理效能,或每天處理 38 兆個事件。

Azure SQL

擷取速率 (每秒事件數) 串流處理單位 輸出資源
1 K 2/3 S3
5 K 3 P4
10 K 6 P6

Azure SQL 支援平行寫入 (稱為繼承資料分割),但預設不會予以啟用。 不過,啟用繼承資料分割加上完全平行查詢,可能並不足以達成更高的輸送量。 SQL 寫入輸送量大幅取決於您的資料庫設定和資料表結構描述。 SQL 輸出效能一文具有可將寫入輸送量最大化之參數的詳細資料。 如 串流分析輸出至 Azure SQL Database 一文中所述,此解決方案在超過 8 個分割區之後,並不會以完全平行管線的形式進行線性調整,且可能需要在 SQL 輸出之前重新分割 (請參閱 INTO)。 需要進階 SKU 以維持高 IO 速率,以及由每隔幾分鐘便會發生的記錄備份所帶來的額外負荷。

Azure Cosmos DB

擷取速率 (每秒事件數) 串流處理單位 輸出資源
1 K 2/3 20 K RU
5 K 4 60 K RU
10 K 8 120 K RU

來自串流分析的 Azure Cosmos DB 輸出已經更新為使用相容性層級 1.2 底下的原生整合。 與相容性層級 1.1 (其為新作業的預設合規性層級) 相比,1.2 能提供明顯更高的輸送量並減少 RU 耗用量。 該解決方案會使用在 /deviceId 上進行資料分割的 Azure Cosmos DB 容器,而其餘的解決方案也會以相同方式設定。

所有大規模串流 Azure 範例都會使用事件中樞作為輸入,而事件中樞是由負載模擬測試用戶端所餵入。 每個輸入事件都是 1 KB 的 JSON 文件,其可以輕鬆地將所設定的擷取速率轉譯為輸送量速率 (1 MB/s、5 MB/s 及 10 MB/s)。 事件會模擬 IoT 裝置針對最多 1000 個裝置傳送下列 JSON 資料 (以縮短的形式):

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

注意

由於用於解決方案中的各種元件,設定可能會有所變更。 如需更精確的評估,請自訂範例以符合您的案例。

找出瓶頸

使用您 Azure 串流分析作業中的 [計量] 窗格來找出管線中的瓶頸。 檢閱 [輸入/輸出事件] 來了解輸送量,並檢閱「浮水印延遲」 \(英文\) 或 [待處理的事件] 來查看作業是否能與輸入速率保持一致。 針對事件中樞計量,請尋找 [節流的要求],並據以調整閾值單位。 針對 Azure Cosmos DB 計量,檢閱 [輸送量] 底下的 [每個分割區索引鍵範圍每秒取用的 RU 上限],以確保系統會一致地取用您的分割區索引鍵範圍。 針對 Azure SQL DB,請監視 [記錄 IO] 和 [CPU]

取得協助

如需進一步的協助,請嘗試 Azure 串流分析的 Microsoft 問與答頁面

下一步