Azure Cosmos DB for PostgreSQL で Azure Stream Analytics を使用してデータを取り込む方法

適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)

Azure Stream Analytics は、デバイス、センサー、Web サイトからの大量の高速ストリーミング データを同時に処理するように設計された、リアルタイム分析およびイベント処理エンジンです。 Azure IoT Edge ランタイムでも使用でき、IoT デバイスでのデータ処理が可能になります。

Azure Cosmos DB for PostgreSQL を使用した Stream Analytics アーキテクチャを示す図。

Azure Cosmos DB for PostgreSQL は、IoT などのリアルタイム ワークロードで性能を発揮します。 これらのワークロードの場合、Stream Analytics は、Azure Event Hubs、Azure IoT Hub、Azure Blob Storage から Azure Cosmos DB for PostgreSQL にデータを前処理してストリームする代わりに、コードなしのパフォーマンスが高くスケーラブルな代替手段として機能します。

Stream Analytics を設定する手順

注意

この記事では、データソースの例として Azure IoT Hub を使用しますが、この手法は Stream Analytics でサポートされている他のソースにも適用できます。 また、次のデモ データは、Azure IoT デバイス テレメトリ シミュレーターから取得されます。 この記事では、シミュレーターの設定については説明しません。

  1. Azure portal で、左上にあるポータル メニューを展開し、[リソースの作成] を選択します。

  2. 結果の一覧で、 [Analytics]>[Stream Analytics ジョブ] の順に選択します。

  3. [新しい Stream Analytics ジョブ] ページに、次の情報を入力します。

    • [サブスクリプション] - このジョブに使用する Azure サブスクリプションを選択します。
    • リソース グループ - ご利用の IoT ハブと同じリソース グループを選択します。
    • 名前 - Stream Analytics ジョブを特定するための名前を入力します。
    • リージョン - Stream Analytics ジョブをホストする Azure リージョンを選択します。 パフォーマンスを向上させ、データ転送コストを削減するために、ユーザーに最も近い地理的な場所を使用します。
    • ホスト環境 - Azure クラウドにデプロイする場合は [クラウド]、IoT Edge デバイスにデプロイする場合は [Edge] を選択します。
    • ストリーミング ユニット - ジョブを実行するために必要なコンピューティング リソースのストリーミング ユニットの数を選択します。
  4. [確認と作成] を選択し、次に [作成] を選択します。 右上に [デプロイは進行中です] という通知が表示されるはずです。

    Stream Analytics ジョブの作成フォームを示すスクリーンショット。

  5. ジョブの入力を構成します。

    Stream Analytics でのジョブ入力の構成を示すスクリーンショット。

    1. リソースのデプロイが完了したら、Stream Analytics ジョブに移動します。 [入力]>[ストリーム入力の追加]>[IoT Hub] の順に選択します。

    2. [IoT Hub] ページで以下の値を入力します。

      • 入力エイリアス - ジョブ入力を特定する名前を入力します。
      • サブスクリプション - ご利用の IoT Hub アカウントがある Azure サブスクリプションを選択します。
      • IoT Hub – IoT ハブの名前を選択します。
    3. [保存] を選択します。

    4. 入力ストリームが追加されたら、フローするデータセットを確認またはダウンロードすることもできます。 次のコードは、イベント例のデータを示しています。

      {
         "deviceId": "sim000001",
         "time": "2022-04-25T13:49:11.6892185Z",
         "counter": 1,
         "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z",
         "PartitionId": 3,
         "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z",
         "IoTHub": {
           "MessageId": null,
           "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8",
           "ConnectionDeviceId": "sim000001",
           "ConnectionDeviceGenerationId": "637842405470327268",
           "EnqueuedTime": "2022-04-25T13:49:11.7060000Z"
         }
      }
      
  6. ジョブの出力を構成します。

    1. Stream Analytics ジョブ ページで、[出力]>[追加]>[PostgreSQL データベース (プレビュー)] の順に選択します。

      PostgreSQL データベース出力の選択を示すスクリーンショット。

    2. [Azure PostgreSQL] ページに次の値を入力します。

      • 出力エイリアス - ジョブの出力を特定する名前を入力します。
      • [PostgreSQL データベース設定を手動で指定する] を選択し、[サーバーの完全修飾ドメイン名][データベース][テーブル][ユーザー名][パスワード] を入力します。 データセット例から、テーブル device_data を使用します。
    3. [保存] を選択します。

    Azure Stream Analytics でジョブ出力を構成する。

  7. 変換クエリを定義します。

    Azure Stream Analytics の変換クエリ。

    1. Stream Analytics ジョブ ページで、左側のメニューの [クエリ] を選択します。

    2. このチュートリアルでは、IoT Hub から Azure Cosmos DB for PostgreSQL に代替イベントのみを取り込み、全体的なデータ サイズを小さくします。 次のクエリをコピーし、クエリ ペインに貼り付けます。

      select
         counter,
         iothub.connectiondeviceid,
         iothub.correlationid,
         iothub.connectiondevicegenerationid,
         iothub.enqueuedtime
      from
         [src-iot-hub]
      where counter%2 = 0;
      
    3. [クエリの保存] を選択します。

      注意

      このクエリを使用して、データをサンプリングするだけでなく、データ ストリームから目的の属性を抽出します。 Stream Analytics を使用したカスタム クエリ オプションは、データがデータベースに取り込まれる前にデータを前処理または変換する場合に役立ちます。

  8. Stream Analytics ジョブを開始して出力を確認します。

    1. ジョブ概要ページに戻り、[開始] を選択します。

    2. [ジョブの開始] ページで、[ジョブ出力の開始時刻][今すぐ] を選んでから、[開始] を選択します。

    3. 最初にジョブを開始するときにしばらく時間がかかりますが、トリガーされると、データの到着時に引き続き実行されます。 数分後、クラスターにクエリを実行して、読み込まれたデータを確認できます。

      citus=> SELECT * FROM public.device_data LIMIT 10;
      
       counter | connectiondeviceid |            correlationid             | connectiondevicegenerationid |         enqueuedtime
      ---------+--------------------+--------------------------------------+------------------------------+------------------------------
             2 | sim000001          | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268           | 2022-05-25T18:24:03.4600000Z
             4 | sim000001          | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268           | 2022-05-25T18:24:05.4600000Z
             6 | sim000001          | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268           | 2022-05-25T18:24:07.4600000Z
             8 | sim000001          | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268           | 2022-05-25T18:24:09.4600000Z
            10 | sim000001          | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268           | 2022-05-25T18:24:11.4600000Z
            12 | sim000001          | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268           | 2022-05-25T18:24:13.4600000Z
            14 | sim000001          | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268           | 2022-05-25T18:24:15.4600000Z
            16 | sim000001          | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268           | 2022-05-25T18:24:17.4610000Z
            18 | sim000001          | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268           | 2022-05-25T18:24:19.4610000Z
            20 | sim000001          | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268           | 2022-05-25T18:24:21.4610000Z
      (10 rows)
      

注意

Azure Cosmos DB for PostgreSQL の [テスト接続] 機能は現在サポートされていないため、接続が正常に動作した場合でもエラーがスローされる可能性があります。

次のステップ

Azure Cosmos DB for PostgreSQL を使用して、リアルタイム ダッシュボードを作成する方法について学習します。