Azure Stream Analytics でクエリの並列処理を使用する

この記事では、Azure Stream Analytics で並列処理を活用する方法を示します。 入力パーティションの構成と分析クエリ定義のチューニングによって Stream Analytics ジョブをスケールする方法について説明します。

前提条件として、「ストリーミングユニットの理解と調整」で説明されているストリーミングユニットの概念について理解しておく必要があります。

Stream Analytics ジョブの構成について教えてください。

Stream Analytics のジョブ定義には少なくとも、入力、クエリ、出力が含まれています。 入力は、ジョブがデータ ストリームを読み取る場所です。 クエリは、データ入力ストリームを変換するために使用されます。出力は、ジョブ結果の送信先です。

入力と出力のパーティション

パーティション分割すると、パーティション キーに基づいてデータをサブセットに分割できます。 入力 (たとえば Event Hubs) がキーによってパーティション分割されている場合、Stream Analytics ジョブに入力を追加するとき、パーティション キーを指定することをお勧めします。 Stream Analytics ジョブのスケーリングでは、入力と出力でパーティションを利用します。 Stream Analytics ジョブでは、さまざまなパーティションを同時に使用し、書き込みを実行できるので、スループットが向上します。

入力

すべての Azure Stream Analytics ストリーミング入力において、パーティション分割 (Event Hubs、IoT Hub、Blob Storage、Data Lake Storage Gen2) を利用できます。

注意

互換性レベル 1.2 以降では、パーティション キーを 入力プロパティとして設定します。クエリに PARTITION BY キーワードは必要ありません。 互換性レベル 1.1 以下の場合は、 クエリで PARTITION BY キーワードを使用してパーティション キーを定義します。

出力

Stream Analytics を使用する場合は、次の出力でパーティション分割を利用します。

  • Azure Data Lake Storage
  • Azure Functions
  • Azure テーブル
  • BLOB ストレージ (パーティション キーを明示的に設定)
  • Azure Cosmos DB (パーティション キーを明示的に設定)
  • Event Hubs (パーティション キーを明示的に設定)
  • IoT Hub (パーティション キーを明示的に設定)
  • Service Bus
  • オプションのパーティション分割を使用した SQL および Azure Synapse Analytics: 詳細については、Azure SQL Database への出力に関するページを参照してください。

Power BI では、パーティション分割がサポートされていません。 ただし、 このセクションの説明に従って、入力をパーティション分割することはできます。

パーティションの詳細については、次の記事をご覧ください。

クエリ

ジョブを並列にするために、すべての入力、すべてのクエリ ロジック ステップ、およびすべての出力の間でパーティション キーを調整する必要があります。 クエリ ロジックのパーティション分割は、結合と集約のために使われるキー (GROUP BY) によって決まります。 クエリ ロジックにキーが設定されていない場合 (プロジェクション、フィルター、参照結合...)、最後の要件は無視できます。

  • 入力と出力がWarehouseIdでパーティション分割され、クエリグループがProductIdWarehouseIdされていない場合、ジョブは並列ではありません。
  • 結合する 2 つの入力が異なるパーティション キー (WarehouseIdProductId) によってパーティション分割されている場合、ジョブは並列ではありません。
  • 1 つのジョブに 2 つ以上の独立したデータ フローが含まれている場合、それぞれが独自のパーティション キーを持つ場合、ジョブは並列ではありません。

ジョブは、すべての入力、出力、およびクエリ ステップで同じキーが使用されている場合にのみ並列です。

驚異的並列ジョブ

驚異的並列ジョブは、Azure Stream Analytics において最もスケーラブルなシナリオです。 入力の 1 つのパーティションを、出力の 1 つのパーティションに対するクエリの 1 つのインスタンスに接続します。 この並列処理には次の要件があります。

  • クエリ ロジックが同じクエリ インスタンスによって処理される同じキーに依存する場合、イベントが入力の同じパーティションに送信されるようにする必要があります。 Event Hubs または IoT Hub の場合、イベント データに PartitionKey 値が設定されている必要があります。 代わりに、パーティション分割された送信元を使用することもできます。 Blob Storage の場合、イベントが同じパーティション フォルダーに送信されることを意味します。 たとえば、クエリ インスタンスで userID 別にデータを集計するとき、パーティション キーとして userID を使用し、入力イベント ハブがパーティション分割されます。 ただし、クエリ ロジックが、同一のクエリ インスタンスによって処理されるために同一のキーを必要としない場合は、この要件を無視してもかまいません。 このロジックの例として、単純な select-project-filter クエリがあります。

  • 次に、クエリをパーティション分割します。 互換性レベル 1.2 以上のジョブ (推奨) の場合は、入力設定でパーティション キーとしてカスタム列を指定し、ジョブは自動的に並列です。 互換性レベル 1.0 または 1.1 のジョブの場合は、クエリのすべての手順で PARTITION BY PartitionId を使用します。 複数の手順を使用できますが、すべて同じキーでパーティション分割する必要があります。

  • Stream Analytics でサポートされている出力のほとんどでは、パーティション分割が活用されます。 パーティション分割をサポートしていない種類の出力を使用する場合、ジョブは、驚異的並列になりません。 Event Hubs の出力については、パーティション キー列がクエリで使用されているパーティション キーと同じに設定されていることを確認してください。 詳細については、「出力」セクションを参照してください。

  • 入力パーティションの数が出力パーティションの数と同じである必要があります。 Blob Storage 出力では、パーティションをサポートでき、アップストリーム クエリのパーティション構成を継承します。 BLOB ストレージのパーティション キーを指定すると、データは入力パーティションごとにパーティション分割されるため、結果はまだ完全に並列になります。 完全な並列ジョブを可能にするパーティション値の例を次に示します。

    • 8 個の event hub 入力パーティションと 8 個の event hub 出力パーティション
    • 8 個の event hub 入力パーティションと BLOB ストレージ出力
    • 8 個の event hub 入力パーティションと任意のカーディナリティを持つカスタム フィールドによってパーティション分割された blob storage 出力
    • 8つのBlobストレージ入力パーティションとBlobストレージ出力
    • 8 個の blob storage 入力パーティションと 8 個のイベント ハブ出力パーティション

以下のセクションでは、並列化しやすいシナリオの例について説明します。

単純なクエリ

  • 入力: 8 個のパーティションを持つイベント ハブ
  • 出力: 8 個のパーティションを持つイベント ハブ ("パーティション キー列" は PartitionId を使うように設定する必要があります)

クエリ:

    --Using compatibility level 1.2 or above
    SELECT TollBoothId
    FROM Input1
    WHERE TollBoothId > 100
    
    --Using compatibility level 1.0 or 1.1
    SELECT TollBoothId
    FROM Input1 PARTITION BY PartitionId
    WHERE TollBoothId > 100

このクエリは単純なフィルターです。 そのため、イベント ハブに送信される入力のパーティション分割について心配する必要はありません。 互換性レベルが 1.2 より前のジョブには、上記の要件 2 を満たすために、PARTITION BY PartitionId 句を含める必要があることに注意してください。 出力については、パーティション キーを PartitionId に設定するようにジョブのイベント ハブ出力を構成する必要があります。 最後に、入力パーティションと出力パーティションの数が同じであることを確認します。

グループ化キーが含まれたクエリ

  • 入力: 8 個のパーティションを持つ Event Hub
  • 出力:BLOB ストレージ

クエリ:

    --Using compatibility level 1.2 or above
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    
    --Using compatibility level 1.0 or 1.1
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

このクエリにはグループ化キーが含まれています。 そのため、グループ化されたイベントは、同じ Event Hubs パーティションに送信される必要があります。 この例では TollBoothID でグループ化するので、イベントが Event Hubs に送信されるときに、 TollBoothID がパーティション キーとして使用されていることを確認する必要があります。 その後、Azure Stream Analytics で、PARTITION BY PartitionId を使用して、このパーティション構成から継承し、完全な並列処理を有効にすることができます。 出力は BLOB ストレージであるため、要件 #4 に従ってパーティション キー値の構成について心配する必要はありません。

ばかばかしい並列ではないシナリオの例

この記事の前のセクションでは、驚異的並列のシナリオをいくつか記事で紹介しました。 このセクションでは、容易に並列化可能のすべての要件を満たしていないシナリオについて学びます。

パーティション数の不一致

  • 入力: 8 個のパーティションを持つイベント ハブ
  • 出力: 32 個のパーティションを持つイベント ハブ

入力パーティション数と出力パーティション数が一致しない場合、クエリに関係なく、トポロジは驚異的並列ではありません。 ただし、あるレベルの並列化を引き続き取得できます。

パーティション分割されていない出力を使用したクエリ

  • 入力: 8 個のパーティションを持つイベント ハブ
  • 出力:Power BI

現在、Power BI 出力ではパーティション分割がサポートされていません。 そのため、このシナリオは容易に並列化できるものではありません。

PARTITION BY 値が異なる複数ステップのクエリ

  • 入力: 8 個のパーティションを持つ Event Hub
  • 入力: 8 個のパーティションを持つ Event Hub
  • 互換性レベル:1.0 または 1.1

クエリ:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId, PartitionId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1 Partition By TollBoothId
    GROUP BY TumblingWindow(minute, 3), TollBoothId

ご覧のように、2 番目のステップは TollBoothId をパーティション キーとして使用しています。 この手順は最初の手順と同じではないので、シャッフルが必要です。

PARTITION BY 値が異なる複数ステップのクエリ

  • 入力: 8 つのパーティションを持つ Event Hub ([パーティション キー列] は設定されていません。既定値の [PartitionId] です)
  • 出力: 8 個のパーティションを持つ Event Hub ([パーティション キー列] は [TollBoothId] を使うように設定する必要があります)
  • 互換性レベル - 1.2 以上

クエリ:

    WITH Step1 AS (
    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

互換性レベル 1.2 以上では、既定で並列クエリの実行が可能です。 たとえば、"TollBoothId" 列が入力パーティション キーとして設定されている限り、前のセクションのクエリはパーティション分割されます。 PARTITION BY PartitionId 句は必要ありません。

ジョブの最大ストリーミングユニット数を計算する

Stream Analytics ジョブで使用できるストリーミング ユニットの合計数は、ジョブに定義されているクエリのステップ数と各ステップのパーティション数によって異なります。

クエリでのステップ

1 つのクエリに 1 つ以上のステップを含めることができます。 各ステップは、WITH キーワードで定義されたサブクエリです。 次のクエリの SELECT ステートメントのように、WITH キーワードの外にあるクエリ (1 つのクエリのみ) もステップとしてカウントされます。

クエリ:

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )
    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute,3), TollBoothId

このクエリには 2 つのステップがあります。

注意

このクエリについては、この記事で後ほど詳しく説明します。

ステップをパーティション分割する

ステップをパーティション分割するには、次の条件を満たす必要があります。

  • 入力ソースはパーティションされる必要があります。
  • クエリの SELECT ステートメントは、パーティション分割された入力ソースから読み取る。
  • ステップ内のクエリに PARTITION BY キーワードを含める。

クエリをパーティション分割すると、入力イベントが処理されて個々のパーティション グループに集計され、グループごとに出力イベントが生成されます。 結合された集計を取得する場合は、第二の非パーティション化ステップを作成する必要があります。

ストリーミング ユニットの最大数をジョブに対して計算する

パーティション分割されていないすべてのステップは、Stream Analytics ジョブの 1 つのストリーミング ユニット (SU V2s) にスケールアップされます。 さらに、パーティション分割されたステップでパーティションごとに 1 つの SU V2 を追加できます。 をいくつか次の表に示します。

クエリ ジョブの最大 SU 数
  • クエリに 1 つのステップが含まれている。
  • ステップがパーティションされていない。
1 SU V2
  • 入力データ ストリームは 16 個にパーティション分割されている。
  • クエリに 1 つのステップが含まれている。
  • ステップはパーティション分割されている。
16 SU V2 (1 * 16 パーティション)
  • クエリに 2 つのステップが含まれている。
  • どちらのステップもパーティション分割されていない。
1 SU V2
  • 入力データ ストリームは 3 つにパーティション分割されている。
  • クエリに 2 つのステップが含まれている。 入力ステップはパーティション分割されているが、2 番目のステップはパーティション分割されていない。
  • SELECT ステートメントはパーティション分割された入力から読み取る。
4 SU V2 (パーティション分割されたステップの場合は 3 つ+ 非パーティションステップの場合は 1)

スケーリングの例

次のクエリでは、3 つのブースがある料金所を 3 分間に通過する車の台数を計算します。 このクエリは、最大 1 つの SU V2 にスケーリングできます。

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

クエリにより多くの SU を使用するには、入力データ ストリームとクエリの両方をパーティション分割します。 データ ストリーム パーティションが 3 に設定されているので、変更を加えた次のクエリを最大 3 個の SU V2 にスケールできます。

    SELECT COUNT(*) AS Count, TollBoothId
    FROM Input1 Partition By PartitionId
    GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId

クエリをパーティション分割すると、入力イベントが処理され、個別のパーティション グループに集約されます。 このクエリでは、各グループの出力イベントが生成されます。 GROUP BY フィールドが入力データ ストリームのパーティション キーでない場合、パーティション分割を実行すると予期しない結果を生じることがあります。 たとえば、前のクエリの TollBoothId フィールドは Input1 のパーティション キーではないとします。 その結果、TollBooth #1 のデータを複数のパーティションに分散できます。

Stream Analytics では、 各 Input1 パーティションが個別に処理されます。 その結果、クエリは、同じタンブリング ウィンドウ内の同じ料金所の車数の複数のレコードを作成します。 入力パーティション キーを変更できない場合は、次の例のようにパーティション間で値を集計する非パーティションステップを追加して、この問題を解決します。

    WITH Step1 AS (
        SELECT COUNT(*) AS Count, TollBoothId
        FROM Input1 Partition By PartitionId
        GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
    )

    SELECT SUM(Count) AS Count, TollBoothId
    FROM Step1
    GROUP BY TumblingWindow(minute, 3), TollBoothId

このクエリは 4 SU V2 にスケーリングできます。

注意

2 つのストリームを結合する場合は、結合の作成に使用する列のパーティション キーによってストリームがパーティション分割されていることを確認します。 また、両方のストリームに同じ数のパーティションがあることも確認します。

大規模な高いスループットの実現

大規模な高いスループットを維持するために、驚異的並列ジョブが必要ですが十分ではありません。 すべてのストレージ システムおよびそれに対応する Stream Analytics 出力では、考えられる最高の書き込みスループットを実現する方法がそれぞれ異なっています。 大規模なシナリオと同様に、一部の課題では、解決に適切な構成が必要です。 このセクションでは、一般的な出力をいくつか取り上げてその構成について説明し、1 秒あたり 1,000、5,000、10,000 のイベントのインジェスト率を維持するためのサンプルを提供します。

以下の観測記録は、ステートレス (パススルー) クエリを使用する Stream Analytics ジョブを使って行ったものです。これは、Event Hubs、Azure SQL、または Azure Cosmos DB への書き込みを行う基本的な JavaScript ユーザー定義関数 (UDF) です。

Event Hubs

取り込み率 (1 秒あたりのイベント数) ストリーミング ユニット数 出力リソース
1K 1/3 2 TU
5キロ 1 6 TU
10K 2 10 TU

Event Hubs ソリューションは、ストリーミング ユニット (SU) とスループットの観点から直線的にスケーリングをおこなうため、Stream Analytics からのデータを分析およびストリーム配信するための最も効率的かつパフォーマンスの高い方法となります。 ジョブを最大 66 SU V2 にスケールアップできます。これは、1 日あたり最大 400 MB/秒または 38 兆件のイベントの処理に大まかに変換されます。

Azure SQL

取り込み率 (1 秒あたりのイベント数) ストリーミング ユニット数 出力リソース
1K 2/3 S3
5キロ 3 P4
10K 6 P6

Azure SQL では、パーティション分割の継承と呼ばれる並列書き込みがサポートされていますが、既定では有効になっていません。 ただし、パーティション分割の継承と完全な並列クエリを一緒に有効にしても、高いスループットを実現するには不十分である場合があります。 SQL 書き込みスループットは、データベース構成およびテーブル スキーマに大きく依存します。 SQL 出力パフォーマンスに関する記事に、書き込みスループットを最大限に高めることができるパラメーターの詳細が記載されています。 Azure SQL Database への Azure Stream Analytics 出力に関する記事に記載されているように、このソリューションは 8 個のパーティションを超える完全な並列パイプラインとして直線的にスケールせず、SQL 出力の前に再分割が必要な場合があります (INTO に関する記事を参照してください)。 高い IO 率と、数分おきに発生するログ バックアップからのオーバーヘッドを維持するために、Premium SKU が必要です。

Azure Cosmos DB

取り込み率 (1 秒あたりのイベント数) ストリーミング ユニット数 出力リソース
1K 2/3 20 K RU
5キロ 4 60 K RU
10K 8 120 K RU

Azure Cosmos DB Stream Analytics からの出力は、互換性レベル 1.2 のネイティブ統合を使用するように更新されます。 互換性レベル 1.2 は、新しいジョブの既定の互換性レベルである 1.1 に比べてスループットが著しく高く、RU 消費量が低下しています。 ソリューションは/deviceId 上にパーティション分割された Azure Cosmos DB コンテナーを使用し、ソリューションの残りの部分は全く同じように構成されます。

Azure の大規模なストリーミングのサンプルはすべて、負荷をシミュレートするテスト クライアントによってデータが取り込まれる Event Hubs を入力として使用します。 各入力イベントは 1 KB の JSON ドキュメントであり、構成されたインジェスト率からスループット レート (1 MB/秒、5 MB/秒、および 10 MB/秒) に容易に換算されます。 イベントは、最大 1,000 台のデバイス向けに以下の JSON データ (短縮された形式) を送信する IoT デバイスをシミュレートします。

{
    "eventId": "b81d241f-5187-40b0-ab2a-940faf9757c0",
    "complexData": {
        "moreData0": 51.3068118685458,
        "moreData22": 45.34076957651598
    },
    "value": 49.02278128887753,
    "deviceId": "contoso://device-id-1554",
    "type": "CO2",
    "createdAt": "2019-05-16T17:16:40.000003Z"
}

注意

構成は、ソリューションで使用されるさまざまなコンポーネントによって変更される可能性があります。 見積もりの精度を高めるには、ご使用のシナリオに合わせてサンプルをカスタマイズしてください。

ボトルネックの特定

Azure Stream Analytics ジョブの [メトリックス] ウィンドウを使用して、パイプラインのボトルネックを特定します。 スループットについての [Input/Output Events](入出力イベント) および [透かしの遅延] または [Backlogged Events](バックログされたイベント) を確認して、ジョブが入力速度に対応しているかどうかを確認します。 Event Hubs のメトリックスについては、スロットルされた要求 を検索し、その結果に基づいてしきい値ユニットを調整します。 Azure Cosmos DB メトリックスについては、スループットの下の [パーティション キーの範囲ごとの使用された最大 RU/秒] を確認して、パーティション キーの範囲が均一に消費されていることを確認します。 Azure SQL DB については、 [ログ IO] および [CPU] を監視します。

助けを得る

詳細については、 Azure Stream Analytics の Microsoft Q&A 質問ページをお試しください。

次のステップ