Von Bedeutung
データ フロー グラフを含む WebAssembly (WASM) はプレビュー段階です。 この機能には制限があり、運用環境のワークロードには適していません。
ベータ版、プレビュー版、またはまだ一般提供としてリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。
Azure IoT Operations のデータ フロー グラフは、エッジでのカスタム データ処理用 WebAssembly (WASM) モジュールをサポートします。 データ フロー パイプラインの一部として、カスタム ビジネス ロジックとデータ変換をデプロイできます。
ヒント
AI をインバンドで実行しますか? WASM 演算子内で小さな ONNX モデルをパッケージ化して実行するには、 WebAssembly データ フロー グラフで ONNX 推論を実行する方法に関するページを参照してください。
Von Bedeutung
データ フロー グラフは、現在、MQTT、Kafka、OpenTelemetry エンドポイントのみをサポートしています。 Data Lake、Microsoft Fabric OneLake、Azure Data Explorer、Local Storage などの他のエンドポイントの種類はサポートされていません。 詳細については、「既知の問題の」を参照してください。
[前提条件]
- Arc 対応 Kubernetes クラスターに Azure IoT Operations インスタンスをデプロイします。 詳細については、「Deploy Azure IoT Operations」をご覧ください。
- Azure Container Registry (ACR) を使用して、WASM モジュールとグラフを保存します。
- OCI Registry As Storage (ORAS) CLI をインストールして、WASM モジュールをレジストリにプッシュします。
- データ フロー グラフ用 WebAssembly モジュールの開発するに関するガイダンスに従って、カスタム WASM モジュールを開発します。
概要
Azure IoT Operations データ フロー グラフの WebAssembly (WASM) モジュールを使用すると、エッジでハイ パフォーマンスかつ安全にデータを処理できます。 WASM はサンドボックス環境で実行され、Rust と Python をサポートします。
WASM データ フロー グラフのしくみ
WASM データ フローの実装は次のワークフローに従います。
- WASM モジュールを開発する: サポートされている言語でカスタム処理ロジックを記述し、WebAssembly コンポーネント モデル形式にコンパイルします。
- グラフ定義を開発する: YAML 構成ファイルを使用して、モジュール間でのデータの移行方法を定義します。 詳細については、WebAssembly グラフ定義の構成に関する記事を参照してください。
- 成果物をレジストリに保存する: ORAS などの OCI 互換ツールを使用して、コンパイルされた WASM モジュールをコンテナー レジストリにプッシュします。
- レジストリ エンドポイントを構成する: Azure IoT Operations からコンテナー レジストリにアクセスできるように、認証と接続の詳細を設定します。
- データ フローを作成する: データ ソース、成果物名、宛先を定義します。
- デプロイと実行: Azure IoT Operations を使ってレジストリから WASM モジュールをプルし、グラフ定義に基づいて実行します。
例を使用して始める
これらの例は、一般的なシナリオで WASM データ フロー グラフを設定およびデプロイする方法を示しています。 例では、ハードコーディングされた値と簡略化された構成を使用しているため、すぐに始めることができます。
コンテナー レジストリを設定する
Azure IoT Operations では、WASM モジュールとグラフ定義をプルするためのコンテナー レジストリが必要です。 Azure Container Registry (ACR) または他の OCI 互換レジストリを使用できます。
Azure Container Registry を作成して構成するには、Azure Container Registry のデプロイに関する記事を参照してください。
ORAS CLI をインストールする
ORAS CLI を使用して、WASM モジュールとグラフ定義をコンテナー レジストリにプッシュします。 インストール手順については、ORAS のインストールに関する記事を参照してください。
パブリック レジストリからサンプル モジュールをプルする
事前構築済みのサンプル モジュールを使用します。
# Pull sample modules and graphs
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-simple:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/graph-complex:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/temperature:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/window:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/snapshot:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/format:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/humidity:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/collection:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/enrichment:1.0.0
oras pull ghcr.io/azure-samples/explore-iot-operations/filter:1.0.0
モジュールをレジストリにプッシュする
サンプル モジュールとグラフを取得したら、コンテナー レジストリにプッシュします。
<YOUR_ACR_NAME>を Azure Container Registry の名前に置き換えます。
# Log in to your ACR
az acr login --name <YOUR_ACR_NAME>
# Push modules to your registry
oras push <YOUR_ACR_NAME>.azurecr.io/graph-simple:1.0.0 --config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml graph-simple.yaml:application/yaml --disable-path-validation
oras push <YOUR_ACR_NAME>.azurecr.io/graph-complex:1.0.0 --config /dev/null:application/vnd.microsoft.aio.graph.v1+yaml graph-complex.yaml:application/yaml --disable-path-validation
oras push <YOUR_ACR_NAME>.azurecr.io/temperature:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm temperature-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/window:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm window-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/snapshot:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm snapshot-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/format:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm format-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/humidity:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm humidity-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/collection:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm collection-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/enrichment:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm enrichment-1.0.0.wasm:application/wasm
oras push <YOUR_ACR_NAME>.azurecr.io/filter:1.0.0 --artifact-type application/vnd.module.wasm.content.layer.v1+wasm filter-1.0.0.wasm:application/wasm
ヒント
独自のモジュールをプッシュしてカスタム グラフを作成することもできます。「 カスタム データ フロー グラフの構成」を参照してください。
レジストリ エンドポイントを作成する
レジストリ エンドポイントで、コンテナー レジストリへの接続を定義します。 データ フロー グラフでは、レジストリ エンドポイントを使用して、コンテナー レジストリから WASM モジュールとグラフ定義をプルします。 さまざまな認証方法やレジストリの種類を使用してレジストリ エンドポイントを構成する方法の詳細については、レジストリ エンドポイントの構成に関する記事を参照してください。
Azure Container Registry を使用した簡単なセットアップの場合は、システム割り当てマネージド ID 認証を使用してレジストリ エンドポイントを作成します。
現時点では、操作エクスペリエンスでレジストリ エンドポイントを作成することはできません。 Bicep または Kubernetes のマニフェストを使用する必要があります。 レジストリ エンドポイントを作成した後、 コンテナー レジストリにプッシュした グラフは、データ フロー グラフの操作エクスペリエンスで使用する準備が整います。
注
レジストリ エンドポイントを、複数のデータ フロー グラフや、Akri コネクタなどの他の Azure IoT Operations コンポーネント間で再利用できます。
拡張機能名を取得する
# Get extension name
az k8s-extension list \
--resource-group <RESOURCE_GROUP> \
--cluster-name <CLUSTER_NAME> \
--cluster-type connectedClusters \
--query "[?extensionType=='microsoft.iotoperations'].name" \
--output tsv
1 つ目のコマンドを実行すると、拡張機能名 (たとえば、azure-iot-operations-4gh3y) が返されます。
マネージド ID のアクセス許可を構成する
Azure IoT Operations がコンテナー レジストリから WASM モジュールをプルできるようにするには、マネージド ID に適切なアクセス許可を付与します。 IoT Operations 拡張機能には、Azure Container Registry の AcrPull ロールを必要とするシステム割り当てマネージド ID が使用されます。 次の前提条件を満たしていることを確認します。
- Azure Container Registry に対する所有者アクセス許可。
- コンテナー レジストリは別のリソース グループまたはサブスクリプションに存在できますが、IoT Operations のデプロイと同じテナントに存在する必要があります。
次のコマンドを実行して、AcrPull ロールを IoT Operations マネージド ID に割り当てます。
# Get the IoT Operations extension managed identity
export EXTENSION_OBJ_ID=$(az k8s-extension list --cluster-name $CLUSTER_NAME -g $RESOURCE_GROUP --cluster-type connectedClusters --query "[?extensionType=='microsoft.iotoperations'].identity.principalId" -o tsv)
# Get the application ID for the managed identity
export SYSTEM_ASSIGNED_MAN_ID=$(az ad sp show --id $EXTENSION_OBJ_ID --query "appId" -o tsv)
# Assign the AcrPull role to the managed identity
az role assignment create --role "AcrPull" --assignee $SYSTEM_ASSIGNED_MAN_ID --scope "/subscriptions/$SUBSCRIPTION_ID/resourceGroups/$RESOURCE_GROUP/providers/Microsoft.ContainerRegistry/registries/$ACR_NAME"
コンテナー レジストリ ロールの詳細については、Azure Container Registry のロールとアクセス許可に関する記事を参照してください。
Azure CLI で認証エラーが発生した場合は、Azure portal でアクセス許可を割り当てます。
- Azure portal で Azure Container Registry に移動します。
- メニューから [アクセス制御 (IAM)] を選択します。
- [追加>][ロール割り当ての追加] の順に選択します。
- AcrPull 組み込みロールを選択します。
- アクセスの割り当て先オプションとして [ユーザー、グループ、またはサービス プリンシパル] を選択します。
- IoT Operations 拡張機能名 (たとえば、
azure-iot-operations-4gh3y) を検索して選択します。 - [ 保存] を 選択してロールの割り当てを完了します。
詳細な手順については、「Azure portal を使用して Azure ロールを割り当てる」を参照してください。
例 1: 1 つの WASM モジュールを使用する基本的なデプロイ
この例では、WASM モジュールを使用して温度データを華氏から摂氏に変換します。
温度モジュールのソース コードは GitHub で入手できます。 コンテナー レジストリにプッシュしたプリコンパイル済みバージョン graph-simple:1.0.0 を使用します。
動作方法
グラフ定義で、単純な 3 ステージのパイプラインを作成します。
- ソース: MQTT から温度データを受け取ります
- マップ: 温度 WASM モジュールを使用してデータを処理します
- シンク: 変換されたデータを MQTT に送り返します
単純なグラフ定義のしくみと構造の詳細については、「例 1: 単純なグラフ定義」を参照してください。
入力形式:
{"temperature": {"value": 100.0, "unit": "F"}}
出力形式:
{"temperature": {"value": 37.8, "unit": "C"}}
次の構成では、この温度変換パイプラインを使用するデータ フロー グラフを作成します。 このグラフは、YAML 定義を含み、コンテナー レジストリから温度モジュールをプルする graph-simple:1.0.0 成果物を参照します。
データ フロー グラフを構成する
この構成では、温度変換ワークフローを実装する 3 つのノードを定義します。入力温度データをサブスクライブするソース ノード、WASM モジュールを実行するグラフ処理ノード、変換された結果を発行する宛先ノードです。
データ フロー グラフ リソースは、グラフ定義成果物を "ラップ" し、その抽象的なソース/シンク操作を具体的なエンドポイントに接続します。
- グラフ定義の
source操作は、データ フローのソース ノード (MQTT トピック) に接続します - グラフ定義の
sink操作は、データ フローの宛先ノード (MQTT トピック) に接続します - グラフ定義の処理操作はグラフ処理ノード内で実行されます
この分離により、処理ロジックを変更せずに、この分離により、処理ロジックを変更せずに、同じグラフ定義を異なる環境で異なるエンドポイントと共にデプロイできます。
操作エクスペリエンスでデータ フロー グラフを作成するには、[データ フロー] タブに移動します。
[+ 作成] の横にあるドロップダウン メニューを選択し、[データ フロー グラフの作成] を選択します
プレースホルダー名 new-data-flow を選択して、データ フローのプロパティを設定します。 データ フロー グラフの名前を入力し、使用するデータ フロー プロファイルを選択します。
データ フロー図で、[ ソース ] を選択してソース ノードを構成します。 [ ソースの詳細] で、[ 資産 ] または [ データ フロー エンドポイント] を選択します。
[資産] を選択した場合は、データをプルする資産を選択し、[適用] をクリックします。
[データ フロー エンドポイント] を選択した場合は、次の詳細を入力し、[適用] をクリックします。
Setting Description データ フロー エンドポイント 既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。 トピック 着信メッセージをサブスクライブするためのトピック フィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。 メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。
データ フロー ダイアグラムで、[ グラフ変換の追加 ] (省略可能) を選択して、グラフ処理ノードを追加します。 [グラフの選択] ウィンドウで、グラフの単純な 1 を選択し、[適用] をクリックします。
Von Bedeutung
この例では、
graph-simple:1.0.0した成果物を使用します。 独自の WASM モジュールを開発し、コンテナー レジストリにプッシュすることで、カスタム グラフを作成できます。 コンテナー レジストリにプッシュするグラフは、[ グラフの選択 ] ウィンドウで使用できます。ダイアグラム内のグラフ ノードを選択することで、グラフ演算子の設定を構成できます。 たとえば、モジュール温度/マップ演算子を選択し、
key2値example-value-2入力できます。 [ 適用 ] をクリックして変更を保存します。
データ フローダイアグラムで、[ 宛先 ] を選択して宛先ノードを構成します。
データ フロー グラフ名の下にある [保存] を選択して、データ フロー グラフを保存します。
データ フローをテストする
データ フローをテストするには、クラスター内から MQTT メッセージを送信します。 まず、「MQTT クライアントを使用して MQTT ブローカーへの接続をテストする」の手順に従って、MQTT クライアント ポッドをデプロイします。 ブローカーへの接続に必要な認証トークンと証明書は、この MQTT クライアントから提供されます。 MQTT クライアントをデプロイするには、次のコマンドを実行します。
kubectl apply -f https://raw.githubusercontent.com/Azure-Samples/explore-iot-operations/main/samples/quickstarts/mqtt-client.yaml
温度メッセージを送信する
最初のターミナル セッションで、華氏で温度データを送信するスクリプトを作成して実行します。
# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
# Create and run temperature.sh from within the MQTT client pod
while true; do
# Generate a random temperature value between 0 and 6000 Fahrenheit
random_value=$(shuf -i 0-6000 -n 1)
payload="{\"temperature\":{\"value\":$random_value,\"unit\":\"F\"}}"
echo "Publishing temperature: $payload"
# Publish to the input topic
mosquitto_pub -h aio-broker -p 18883 \
-m "$payload" \
-t "sensor/temperature/raw" \
-d \
--cafile /var/run/certs/ca.crt \
-D PUBLISH user-property __ts $(date +%s)000:0:df \
-D CONNECT authentication-method 'K8S-SAT' \
-D CONNECT authentication-data $(cat /var/run/secrets/tokens/broker-sat)
sleep 1
done'
注
MQTT ユーザー プロパティ __ts は、ハイブリッド論理クロック (HLC) を使用してメッセージがタイムリーに処理されるように、メッセージにタイムスタンプを追加するために使用されます。 タイムスタンプがあると、データ フローがメッセージを受け入れるかドロップするかを決定するのに役立ちます。 このプロパティの形式は <timestamp>:<counter>:<nodeid> です。 これにより、データ フロー処理の精度が向上しますが、必須ではありません。
このスクリプトを実行すると、ランダムな温度データが 1 秒ごとに sensor/temperature/raw トピックに発行されます。 次のようになります。
Publishing temperature: {"temperature":{"value":1234,"unit":"F"}}
Publishing temperature: {"temperature":{"value":5678,"unit":"F"}}
温度データの発行を継続するため、スクリプトは実行したままにしておきます。
プロセス済みのメッセージをサブスクライブする
2 つ目のターミナル セッション (MQTT クライアント ポッドにも接続されています) で、出力トピックをサブスクライブして、変換された温度値を確認します。
# Connect to the MQTT client pod
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "sensor/temperature/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'
WASM モジュールによって華氏から摂氏に変換された温度データが表示されます。
{"temperature":{"value":1292.2222222222222,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
{"temperature":{"value":203.33333333333334,"count":0,"max":0.0,"min":0.0,"average":0.0,"last":0.0,"unit":"C","overtemp":false}}
例 2: 複合グラフをデプロイする
この例では、温度、湿度、画像データなど複数のデータ型を処理する高度なデータ処理ワークフローを示します。 複雑なグラフ定義では、複数の WASM モジュールを調整して、高度な分析と物体検出を実行します。
動作方法
この複雑なグラフでは、3 つのデータ ストリームを処理し、それらを組み合わせてエンリッチされたセンサー分析を生成します。
- 温度処理: 華氏を摂氏に変換し、無効な読み取り値をフィルター処理し、統計値を計算します
- 湿度処理: 一定間隔で湿度の測定値を蓄積します
- 画像処理: カメラのスナップショットで物体検出を実行し、結果を書式設定します
複雑なグラフ定義のしくみ、その構造、複数の処理ステージを通じたデータ フローの詳細については、「例 2: 複雑なグラフ定義」を参照してください。
このグラフでは、Rust の例の特殊なモジュールを使用しています。
複雑なデータ フロー グラフを構成する
この構成は、graph-complex:1.0.0 成果物を使用してマルチセンサー処理ワークフローを実装します。 データ フロー グラフのデプロイが例 1 と似ていることに注目してください。処理ロジックは異なりますが、どちらも同じ 3 つのノード パターン (ソース、グラフ プロセッサ、宛先) を使用しています。
この類似性は、データ フロー グラフ リソースがグラフ定義を読み込んで実行するホスト環境として機能しているためです。 実際の処理ロジックはグラフ定義成果物 (graph-simple:1.0.0 と graph-complex:1.0.0) にあり、WASM モジュール間の操作と接続は YAML 仕様で記述されています。 データ フロー グラフ リソースは、成果物をプルし、モジュールのインスタンスを作成し、定義されたワークフローを通じてデータをルーティングするためのランタイム インフラストラクチャを提供します。
操作エクスペリエンスでデータ フロー グラフを作成するには、[データ フロー] タブに移動します。
[+ 作成] の横にあるドロップダウン メニューを選択し、[データ フロー グラフの作成] を選択します
プレースホルダー名 new-data-flow を選択して、データ フローのプロパティを設定します。 データ フロー グラフの名前を入力し、使用するデータ フロー プロファイルを選択します。
データ フロー図で、[ ソース ] を選択してソース ノードを構成します。 [ ソースの詳細] で、[ 資産 ] または [ データ フロー エンドポイント] を選択します。
[資産] を選択した場合は、データをプルする資産を選択し、[適用] をクリックします。
[データ フロー エンドポイント] を選択した場合は、次の詳細を入力し、[適用] をクリックします。
Setting Description データ フロー エンドポイント 既定の MQTT メッセージ ブローカー エンドポイントを使用するには、"既定値" を選択します。 トピック 着信メッセージをサブスクライブするためのトピック フィルター。 [トピック]>[行の追加] を使用して、複数のトピックを追加します。 メッセージ スキーマ 受信メッセージの逆シリアル化に使用するスキーマ。
データ フロー ダイアグラムで、[ グラフ変換の追加 ] (省略可能) を選択して、グラフ処理ノードを追加します。 [グラフの選択] ウィンドウで、graph-complex:1 を選択し、[適用] をクリックします。
Von Bedeutung
この例では、
graph-complex:1.0.0した成果物を使用します。 独自の WASM モジュールを開発し、コンテナー レジストリにプッシュすることで、カスタム グラフを作成できます。 コンテナー レジストリにプッシュするグラフは、[ グラフの選択 ] ウィンドウで使用できます。ダイアグラム内のグラフ ノードを選択することで、グラフ演算子の設定を構成できます。
Operator Description module-snapshot/branch 画像に対してオブジェクト検出を実行するように snapshotモジュールを構成します。snapshot_topic構成キーを設定して、画像データの入力トピックを指定できます。モジュール-温度/マップ key2温度値を別のスケールに変換します。[ 適用 ] をクリックして変更を保存します。
データ フローダイアグラムで、[ 宛先 ] を選択して宛先ノードを構成します。
データ フロー グラフ名の下にある [保存] を選択して、データ フロー グラフを保存します。
複雑なデータ フローをテストする
出力を確認する前に、ソース データを設定する必要があります。
RAW 画像ファイルを mqtt-client ポッドにアップロードする
画像ファイルは、snapshot モジュールが画像内のオブジェクトを検出するためのものです。 これらは GitHub の images フォルダーにあります。
まず、リポジトリをクローンして画像ファイルにアクセスします。
git clone https://github.com/Azure-Samples/explore-iot-operations.git
cd explore-iot-operations
RAW 画像ファイルを ./samples/wasm/images フォルダーから mqtt-client ポッドにアップロードするには、次のコマンドを使用します。
kubectl cp ./samples/wasm/images azure-iot-operations/mqtt-client:/tmp
ファイルがアップロードされていることを確認します。
kubectl exec -it mqtt-client -n azure-iot-operations -- ls /tmp/images
/tmp/images フォルダー内のファイルの一覧が表示されます。
beaker.raw laptop.raw sunny2.raw
binoculars.raw lawnmower.raw sunny4.raw
broom.raw milkcan.raw thimble.raw
camera.raw photocopier.raw tripod.raw
computer_mouse.raw radiator.raw typewriter.raw
daisy3.raw screwdriver.raw vacuum_cleaner.raw
digital_clock.raw sewing_machine.raw
hammer.raw sliding_door.raw
シミュレーションされた温度、湿度データを発行し、画像を送信する
温度、湿度データの発行、画像の送信を行うコマンドを 1 つのスクリプトにまとめることができます。 次のコマンドを使用します。
# Connect to the MQTT client pod and run the script
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
while true; do
# Generate a random temperature value between 0 and 6000
temp_value=$(shuf -i 0-6000 -n 1)
temp_payload="{\"temperature\":{\"value\":$temp_value,\"unit\":\"F\"}}"
echo "Publishing temperature: $temp_payload"
mosquitto_pub -h aio-broker -p 18883 \
-m "$temp_payload" \
-t "sensor/temperature/raw" \
--cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
-D PUBLISH user-property __ts $(date +%s)000:0:df
# Generate a random humidity value between 30 and 90
humidity_value=$(shuf -i 30-90 -n 1)
humidity_payload="{\"humidity\":{\"value\":$humidity_value}}"
echo "Publishing humidity: $humidity_payload"
mosquitto_pub -h aio-broker -p 18883 \
-m "$humidity_payload" \
-t "sensor/humidity/raw" \
--cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
-D PUBLISH user-property __ts $(date +%s)000:0:df
# Send an image every 2 seconds
if [ $(( $(date +%s) % 2 )) -eq 0 ]; then
file=$(ls /tmp/images/*.raw | shuf -n 1)
echo "Sending file: $file"
mosquitto_pub -h aio-broker -p 18883 \
-f $file \
-t "sensor/images/raw" \
--cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)" \
-D PUBLISH user-property __ts $(date +%s)000:0:df
fi
# Wait for 1 second before the next iteration
sleep 1
done'
出力をチェックする
新しいターミナルで、出力トピックをサブスクライブします。
kubectl exec --stdin --tty mqtt-client -n azure-iot-operations -- sh -c '
mosquitto_sub -h aio-broker -p 18883 -t "analytics/sensor/processed" --cafile /var/run/certs/ca.crt \
-D CONNECT authentication-method "K8S-SAT" \
-D CONNECT authentication-data "$(cat /var/run/secrets/tokens/broker-sat)"'
出力は次のようになります
{"temperature":[{"count":9,"max":2984.4444444444443,"min":248.33333333333337,"average":1849.6296296296296,"last":2612.222222222222,"unit":"C","overtemp":true}],"humidity":[{"count":10,"max":76.0,"min":30.0,"average":49.7,"last":38.0}],"object":[{"result":"milk can; broom; screwdriver; binoculars, field glasses, opera glasses; toy terrier"}]}
{"temperature":[{"count":10,"max":2490.5555555555557,"min":430.55555555555554,"average":1442.6666666666667,"last":1270.5555555555557,"unit":"C","overtemp":true}],"humidity":[{"count":9,"max":87.0,"min":34.0,"average":57.666666666666664,"last":42.0}],"object":[{"result":"broom; Saint Bernard, St Bernard; radiator"}]}
この出力には、温度と湿度のデータと、画像内で検出されたオブジェクトが含まれます。
カスタム WASM モジュールを開発する
データ フロー グラフのカスタム データ処理ロジックを作成するには、Rust または Python で WebAssembly モジュールを開発します。 カスタム モジュールを使用すると、組み込みの演算子では使用できない特殊なビジネス ロジック、データ変換、分析を実装できます。
以下のような包括的な開発ガイダンスが必要な場合:
- 開発環境を設定する
- Rust と Python で演算子を作成する
- データ モデルとインターフェイスを理解する
- モジュールのビルドとテスト
データ フロー グラフ用 WebAssembly モジュールの開発に関する記事を参照してください。
データ処理ワークフローを定義する YAML グラフ定義の作成と構成の詳細については、WebAssembly グラフ定義の構成に関する記事を参照してください。
カスタム データ フロー グラフの構成
このセクションでは、WASM モジュールを使用してデータ フロー グラフを構成する方法について詳しく説明します。 すべての構成オプション、データ フロー エンドポイント、詳細設定について説明します。
データ フロー グラフの概要
データ フロー グラフでは、処理のために WebAssembly モジュールをデータがどのように流れるかを定義します。 各グラフは次の要素で構成されます。
- グラフの有効/無効を制御するモード
- スケーリングとリソース設定を定義するデータ フロー プロファイルにリンクするプロファイル参照
- 必要に応じてグラフ状態の永続ストレージを有効にするディスク永続化
- ソース、処理、宛先コンポーネントを定義するノード
- ノード間のデータ フローを指定するノード接続
モードの構成
mode プロパティにより、データ フロー グラフがアクティブにデータを処理しているかどうかを決定します。 モードは Enabled または Disabled (大文字と小文字の区別はされません) に設定できます。 無効にすると、グラフのデータ処理は停止しますが、構成は保持されます。
データ フロー グラフを作成または編集するときに、[ データ フローのプロパティ ] ウィンドウで [ データ フローを有効にする] を [はい ] に設定して、モードを Enabledに設定できます。 オフのままにすると、モードは Disabledに設定されます。
プロファイル参照
プロファイル参照により、データ フロー グラフをデータ フロー プロファイルに接続し、スケーリング設定、インスタンス数、リソース制限を定義します。 プロファイル参照を指定しない場合は、代わりに Kubernetes 所有者参照を使用する必要があります。 ほとんどのシナリオでは、Azure IoT Operations が提供する既定のプロファイルを使用します。
データ フロー グラフを作成または編集するときに、[データ フローのプロパティ ] ウィンドウで、データ フロー プロファイルを選択します。 既定では、既定のデータ フロー プロファイルが選択されています。 データ フロー プロファイルの詳細については、「データ フロー プロファイルの構成」を参照してください。
Von Bedeutung
データ フロー グラフの作成時にのみ、データ フロー プロファイルを選択できます。 データ フロー グラフの作成後にデータ フロー プロファイルを変更することはできません。 既存のデータ フロー グラフのデータ フロー プロファイルを変更する場合は、元のデータ フロー グラフを削除し、新しいデータ フロー プロファイルを使用して新しいデータ フロー グラフを作成します。
ディスク永続化を要求する
Von Bedeutung
データ フロー グラフのディスク永続化には既知の問題があります。 現在、この機能は想定どおりに機能していません。 詳細については、「既知の問題の」を参照してください。
ディスク永続化を要求すると、データ フロー グラフは再起動後も状態を維持できます。 この機能を有効にすると、接続されたブローカーが再起動した場合にグラフは処理状態を回復できます。 この機能は、中間データの損失が問題となるステートフル処理のシナリオで役立ちます。 ディスク永続化の要求を有効にすると、サブスクライバー キュー内のメッセージなどの MQTT データはブローカーによってディスクに永続化されます。 このアプローチにより、停電時やブローカーの再起動時にデータ フローのデータ ソースでデータ損失が発生しなくなります。 ブローカーは最適なパフォーマンスを維持します。なぜなら、永続化はデータ フロー単位で構成され、永続化を必要とするデータ フローのみがこの機能を使用するからです。
データ フロー グラフは、サブスクリプション中に MQTTv5 ユーザー プロパティを使用してこの永続化要求を行います。 この機能は次の場合にのみ機能します。
- データ フローは MQTT ブローカーをソース (MQTT エンドポイントを持つソース ノード) として使用している
- MQTT ブローカーは、サブスクライバー キューなどのデータ型に対して動的永続化モードが
Enabledに設定され、永続化が有効になっている
この構成により、データ フロー グラフなどの MQTT クライアントは、MQTTv5 ユーザー プロパティを使用してサブスクリプションのディスク永続化を要求できます。 MQTT ブローカーの永続化の構成の詳細については、MQTT ブローカーの永続化の構成に関する記事を参照してください。
設定には Enabled または Disabled を使用できます。既定値は Disabled です。
データ フロー グラフを作成または編集する場合、[ データ フローのプロパティ ] ウィンドウで 、[ データの永続化の要求 ] を [ はい ] にチェックして、要求ディスクの永続化を Enabledに設定できます。 未チェックのままにすると、設定は Disabled。
ノード構成
ノードはデータ フロー グラフの構成要素です。 各ノードはグラフ内で一意の名前を持ち、特定の機能を実行します。 ノードには次の 3 種類があります。
ソース ノード
ソース ノードは、データがグラフに入る場所を定義します。 MQTT ブローカーまたは Kafka トピックからデータを受信するデータ フロー エンドポイントに接続します。 各ソース ノードでは以下を指定する必要があります。
- 構成済みのデータ フロー エンドポイントを指すエンドポイント参照
- サブスクライブする MQTT トピックまたは Kafka トピックの一覧形式のデータ ソース
- スキーマ推論用の Azure デバイス レジストリ資産にリンクする資産参照 (省略可能)
データ ソース配列を使用すると、エンドポイント構成を変更せずに複数のトピックをサブスクライブできます。 この柔軟性により、さまざまなデータ フロー間でエンドポイントを再利用できます。
注
現在、データ フロー グラフのデータ ソースとしてサポートされているのは MQTT エンドポイントと Kafka エンドポイントのみです。 詳細については、「 データ フロー エンドポイントの構成」を参照してください。
データ フロー図で、[ ソース ] を選択してソース ノードを構成します。 [ ソースの詳細] で [ データ フロー エンドポイント] を選択し、[ トピック] フィールドを使用して、受信メッセージをサブスクライブする MQTT トピック フィルターを指定します。 [行の追加] を選択し、新しいトピックを入力することで、複数の MQTT トピックを追加できます。
グラフ処理ノード
グラフ処理ノードには、データを変換する WebAssembly モジュールが含まれています。 これらのノードは、コンテナー レジストリから WASM 成果物をプルし、指定された構成パラメーターを使用して実行します。 各グラフ ノードには以下が必要です。
- 成果物をプルするレジストリ エンドポイントを指すレジストリ エンドポイント参照
- プルするモジュール名とバージョンを定義する成果物の仕様
- WASM モジュールに渡されるキーと値のペア形式の構成パラメーター
構成配列を使用すると、WASM 成果物をリビルドせずにモジュールの動作をカスタマイズできます。 一般的な構成オプションとして、処理パラメーター、しきい値、変換設定、機能フラグなどがあります。
データ フロー ダイアグラムで、[ グラフ変換の追加 ] (省略可能) を選択して、グラフ処理ノードを追加します。 [グラフの選択] ウィンドウで、目的のグラフ成果物 (単純グラフまたは複雑グラフ) を選択し、[適用] をクリックします。 ダイアグラム内のグラフ ノードを選択することで、グラフ演算子の設定を構成できます。
構成のキーと値のペアは、実行時に WASM モジュールに渡されます。 モジュールからこれらの値にアクセスして動作をカスタマイズできます。 このアプローチにより、次のことが可能になります。
- 同じ WASM モジュールを異なる構成でデプロイする
- モジュールをリビルドせずに処理パラメーターを調整する
- デプロイ要件に基づいて機能を有効または無効にする
- しきい値やエンドポイントなどの環境固有の値を設定する
宛先ノード
宛先ノードでは、処理されたデータが送信される場所を定義します。 これらは、MQTT ブローカー、クラウド ストレージ、またはその他のシステムにデータを送信するデータ フロー エンドポイントに接続します。 各宛先ノードでは以下を指定します。
- 構成済みのデータ フロー エンドポイントを指すエンドポイント参照
- 出力データの特定のトピック、パス、または場所としてのデータの出力先
- シリアル化形式とスキーマ検証を定義する出力スキーマ設定 (省略可能)
Azure Data Lake や Fabric OneLake などのストレージの宛先では、出力スキーマ設定を指定して、データのシリアル化と検証の方法を制御できます。
注
現時点では、MQTT、Kafka、OpenTelemetry エンドポイントのみがデータ フロー グラフのデータ変換先としてサポートされています。 詳細については、「 データ フロー エンドポイントの構成」を参照してください。
- データ フロー図で、[ 宛先 ] ノードを選択します。
- [データ フロー エンドポイントの詳細] ドロップダウンから目的の データ フロー エンドポイントを 選択します。
- [続行] を選択して、宛先を構成します。
- データの送信先となるトピックやテーブルなど、宛先に 必要な設定 を入力します。 データ変換先フィールドは、エンドポイントの種類に基づいて自動的に解釈されます。 たとえば、データ フロー エンドポイントが MQTT エンドポイントの場合、宛先の詳細ページでトピックを入力するように求められます。
ノード接続
ノード接続では、ノード間のデータ フロー パスを定義します。 各接続では、ソース ノードと宛先ノードが指定され、処理パイプラインが作成されます。 接続には、処理ステージ間のデータ整合性を確保するために、必要に応じてスキーマ検証を含めることができます。
スキーマ検証を指定すると、システムにより、ノード間を流れるデータの形式と構造が検証されます。 検証によって、データの不整合を早期に検出し、WASM モジュールが想定どおりの形式でデータを受信できるようになります。
グラフ処理ノードを選択すると、操作エクスペリエンスによってノード接続が自動的に作成されます。 グラフの作成後に接続を変更することはできません。
データ フロー エンドポイント
データ フロー グラフは、データ フロー エンドポイントを介して外部システムに接続します。 エンドポイントの種類によって、ソース、宛先、またはその両方として使用できるかがきまります。
MQTT エンドポイント
MQTT エンドポイントは、ソースと宛先の両方として機能できます。 次のような MQTT ブローカーに接続します。
- Azure IoT Operations ローカル MQTT ブローカー (すべてのデータ フローで必須)
- Azure Event Grid MQTT
- カスタム MQTT ブローカー
詳細な構成情報については、MQTT データ フロー エンドポイントの構成に関する記事を参照してください。
Kafka エンドポイント
Kafka エンドポイントは、ソースと宛先の両方として機能できます。 次のような Kafka 互換システムに接続します。
- Azure Event Hubs (Kafka 互換)
- Apache Kafka クラスター
- Confluent Cloud
詳細な構成情報については、Azure Event Hubs と Kafka データ フロー エンドポイントの構成に関する記事を参照してください。
ストレージ エンドポイント
ストレージ エンドポイントは宛先としてのみ機能できます。 長期的なデータ保持と分析のためにクラウド ストレージ システムに接続します。
- Azure Data Lake Storage
- Microsoft Fabric OneLake
- ローカル ストレージ
ストレージ エンドポイントには、通常、データのシリアル化形式を定義するために出力スキーマ設定が必要です。
レジストリ エンドポイント
レジストリ エンドポイントは、WASM モジュールとグラフ定義をプルするコンテナー レジストリへのアクセスを提供します。 データ フローで直接使用されることはありませんが、グラフ処理ノードから参照されます。
詳細な構成情報については、レジストリ エンドポイントの構成に関する記事を参照してください。