Azure Cosmos DB for PostgreSQL で Azure Stream Analytics を使用してデータを取り込む方法
適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)
Azure Stream Analytics は、デバイス、センサー、Web サイトからの大量の高速ストリーミング データを同時に処理するように設計された、リアルタイム分析およびイベント処理エンジンです。 Azure IoT Edge ランタイムでも使用でき、IoT デバイスでのデータ処理が可能になります。
Azure Cosmos DB for PostgreSQL は、IoT などのリアルタイム ワークロードで性能を発揮します。 これらのワークロードの場合、Stream Analytics は、Azure Event Hubs、Azure IoT Hub、Azure Blob Storage から Azure Cosmos DB for PostgreSQL にデータを前処理してストリームする代わりに、コードなしのパフォーマンスが高くスケーラブルな代替手段として機能します。
Stream Analytics を設定する手順
注意
この記事では、データソースの例として Azure IoT Hub を使用しますが、この手法は Stream Analytics でサポートされている他のソースにも適用できます。 また、次のデモ データは、Azure IoT デバイス テレメトリ シミュレーターから取得されます。 この記事では、シミュレーターの設定については説明しません。
Azure portal で、左上にあるポータル メニューを展開し、[リソースの作成] を選択します。
結果の一覧で、 [Analytics]>[Stream Analytics ジョブ] の順に選択します。
[新しい Stream Analytics ジョブ] ページに、次の情報を入力します。
- [サブスクリプション] - このジョブに使用する Azure サブスクリプションを選択します。
- リソース グループ - ご利用の IoT ハブと同じリソース グループを選択します。
- 名前 - Stream Analytics ジョブを特定するための名前を入力します。
- リージョン - Stream Analytics ジョブをホストする Azure リージョンを選択します。 パフォーマンスを向上させ、データ転送コストを削減するために、ユーザーに最も近い地理的な場所を使用します。
- ホスト環境 - Azure クラウドにデプロイする場合は [クラウド]、IoT Edge デバイスにデプロイする場合は [Edge] を選択します。
- ストリーミング ユニット - ジョブを実行するために必要なコンピューティング リソースのストリーミング ユニットの数を選択します。
[確認と作成] を選択し、次に [作成] を選択します。 右上に [デプロイは進行中です] という通知が表示されるはずです。
ジョブの入力を構成します。
リソースのデプロイが完了したら、Stream Analytics ジョブに移動します。 [入力]>[ストリーム入力の追加]>[IoT Hub] の順に選択します。
[IoT Hub] ページで以下の値を入力します。
- 入力エイリアス - ジョブ入力を特定する名前を入力します。
- サブスクリプション - ご利用の IoT Hub アカウントがある Azure サブスクリプションを選択します。
- IoT Hub – IoT ハブの名前を選択します。
[保存] を選択します。
入力ストリームが追加されたら、フローするデータセットを確認またはダウンロードすることもできます。 次のコードは、イベント例のデータを示しています。
{ "deviceId": "sim000001", "time": "2022-04-25T13:49:11.6892185Z", "counter": 1, "EventProcessedUtcTime": "2022-04-25T13:49:41.4791613Z", "PartitionId": 3, "EventEnqueuedUtcTime": "2022-04-25T13:49:12.1820000Z", "IoTHub": { "MessageId": null, "CorrelationId": "990407b8-4332-4cb6-a8f4-d47f304397d8", "ConnectionDeviceId": "sim000001", "ConnectionDeviceGenerationId": "637842405470327268", "EnqueuedTime": "2022-04-25T13:49:11.7060000Z" } }
ジョブの出力を構成します。
Stream Analytics ジョブ ページで、[出力]>[追加]>[PostgreSQL データベース (プレビュー)] の順に選択します。
[Azure PostgreSQL] ページに次の値を入力します。
- 出力エイリアス - ジョブの出力を特定する名前を入力します。
- [PostgreSQL データベース設定を手動で指定する] を選択し、[サーバーの完全修飾ドメイン名]、[データベース]、[テーブル]、[ユーザー名]、[パスワード] を入力します。 データセット例から、テーブル device_data を使用します。
[保存] を選択します。
変換クエリを定義します。
Stream Analytics ジョブ ページで、左側のメニューの [クエリ] を選択します。
このチュートリアルでは、IoT Hub から Azure Cosmos DB for PostgreSQL に代替イベントのみを取り込み、全体的なデータ サイズを小さくします。 次のクエリをコピーし、クエリ ペインに貼り付けます。
select counter, iothub.connectiondeviceid, iothub.correlationid, iothub.connectiondevicegenerationid, iothub.enqueuedtime from [src-iot-hub] where counter%2 = 0;
[クエリの保存] を選択します。
注意
このクエリを使用して、データをサンプリングするだけでなく、データ ストリームから目的の属性を抽出します。 Stream Analytics を使用したカスタム クエリ オプションは、データがデータベースに取り込まれる前にデータを前処理または変換する場合に役立ちます。
Stream Analytics ジョブを開始して出力を確認します。
ジョブ概要ページに戻り、[開始] を選択します。
[ジョブの開始] ページで、[ジョブ出力の開始時刻] の [今すぐ] を選んでから、[開始] を選択します。
最初にジョブを開始するときにしばらく時間がかかりますが、トリガーされると、データの到着時に引き続き実行されます。 数分後、クラスターにクエリを実行して、読み込まれたデータを確認できます。
citus=> SELECT * FROM public.device_data LIMIT 10; counter | connectiondeviceid | correlationid | connectiondevicegenerationid | enqueuedtime ---------+--------------------+--------------------------------------+------------------------------+------------------------------ 2 | sim000001 | 7745c600-5663-44bc-a70b-3e249f6fc302 | 637842405470327268 | 2022-05-25T18:24:03.4600000Z 4 | sim000001 | 389abfde-5bec-445c-a387-18c0ed7af227 | 637842405470327268 | 2022-05-25T18:24:05.4600000Z 6 | sim000001 | 3932ce3a-4616-470d-967f-903c45f71d0f | 637842405470327268 | 2022-05-25T18:24:07.4600000Z 8 | sim000001 | 4bd8ecb0-7ee1-4238-b034-4e03cb50f11a | 637842405470327268 | 2022-05-25T18:24:09.4600000Z 10 | sim000001 | 26cebc68-934e-4e26-80db-e07ade3775c0 | 637842405470327268 | 2022-05-25T18:24:11.4600000Z 12 | sim000001 | 067af85c-a01c-4da0-b208-e4d31a24a9db | 637842405470327268 | 2022-05-25T18:24:13.4600000Z 14 | sim000001 | 740e5002-4bb9-4547-8796-9d130f73532d | 637842405470327268 | 2022-05-25T18:24:15.4600000Z 16 | sim000001 | 343ed04f-0cc0-4189-b04a-68e300637f0e | 637842405470327268 | 2022-05-25T18:24:17.4610000Z 18 | sim000001 | 54157941-2405-407d-9da6-f142fc8825bb | 637842405470327268 | 2022-05-25T18:24:19.4610000Z 20 | sim000001 | 219488e5-c48a-4f04-93f6-12c11ed00a30 | 637842405470327268 | 2022-05-25T18:24:21.4610000Z (10 rows)
注意
Azure Cosmos DB for PostgreSQL の [テスト接続] 機能は現在サポートされていないため、接続が正常に動作した場合でもエラーがスローされる可能性があります。
次のステップ
Azure Cosmos DB for PostgreSQL を使用して、リアルタイム ダッシュボードを作成する方法について学習します。