Azure IoT Data Processor Preview パイプラインから Azure Data Explorer にデータを送信する

重要

Azure Arc によって有効にされる Azure IoT Operations Preview は、 現在プレビュー段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。

ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件」を参照してください。

Azure Data Explorer の変換先を使用して、Azure IoT Data Processor Preview パイプラインから Azure Data Explorer のテーブルにデータを書き込みます。 変換先ステージは、メッセージを Azure Data Explorer に送信する前にバッチ処理します。

前提条件

Azure Data Explorer の変換先パイプライン ステージを構成して使用するには、次のものが必要です。

Azure Data Explorer を設定する

データ パイプラインから Azure Data Explorer に書き込む前に、パイプラインからデータベースへのアクセス権を付与する必要があります。 サービス プリンシパルまたはマネージド ID を使用して、データベースに対するパイプラインを認証できます。 マネージド ID を使用する利点は、サービス プリンシパルのライフサイクルを管理する必要がないことです。 マネージド ID は Azure によって自動的に管理され、割り当てられているリソースのライフサイクルに関連付けられます。

クライアント シークレットを持つサービス プリンシパルを作成するには、次のようにします。

  1. サービス プリンシパルを作成するには、次の Azure CLI コマンドを使います。

    az ad sp create-for-rbac --name <YOUR_SP_NAME> 
    
  2. このコマンドの出力には、appIddisplayNamepasswordtenant が含まれます。 Microsoft Fabric などのクラウド リソースへのアクセスの構成、シークレットの作成、パイプラインの変換先の構成を行うときに使うので、これらの値を記録しておきます。

    {
        "appId": "<app-id>",
        "displayName": "<name>",
        "password": "<client-secret>",
        "tenant": "<tenant-id>"
    }
    

Azure Data Explorer データベースへの管理者アクセス権を付与するには、データベース クエリ タブで次のコマンドを実行します。

.add database <DatabaseName> admins (<ApplicationId>) <Notes>

変換先ステージが Azure Data Explorer に接続するようにするには、認証の詳細を含むシークレットにアクセスする必要があります。 シークレットを作成するには:

  1. 次のコマンドを使って、サービス プリンシパルの作成時に記録したクライアント シークレットを含むシークレットを Azure Key Vault に追加します。

    az keyvault secret set --vault-name <your-key-vault-name> --name AccessADXSecret --value <client-secret>
    
  2. Azure IoT Operations Preview デプロイのシークレットの管理に関するページの手順に従って、Kubernetes クラスターへのシークレット参照を追加します。

バッチ処理

データ プロセッサは、Azure Data Explorer にバッチで書き込みます。 データ プロセッサでデータを送信する前にバッチ処理する場合、Azure Data Explorer には、独自の既定のインジェスト バッチ処理ポリシーがあります。 そのため、データ プロセッサが Azure Data Explorer の変換先にデータを書き込んだ直後に、Azure Data Explorer にデータが表示されない場合があります。

パイプラインから送信されたらすぐにデータを Azure Data Explorer で表示するために、インジェスト バッチ処理ポリシーの数を 1 に設定できます。 インジェスト バッチ処理ポリシーを編集するには、データベース クエリ タブで次のコマンドを実行します。

.alter database <your-database-name> policy ingestionbatching
```
{
    "MaximumBatchingTimeSpan" : "00:00:30",
    "MaximumNumberOfItems" : 1,
    "MaximumRawDataSizeMB": 1024
}
```

変換先ステージを構成する

Azure Data Explorer 変換先ステージ JSON 構成で、ステージの詳細を定義します。 ステージを作成するには、フォーム ベースの UI を使用するか、[詳細設定] タブで次の JSON 構成を指定します。

フィールド タイプ 内容 必要 Default
[表示名] String データ プロセッサ UI に表示する名前。 はい - Azure IoT MQ output
説明 String ステージの実行内容を示すわかりやすい説明。 いいえ Write to topic default/topic1
クラスター URL String URI (この値はデータ インジェスト URI ではありません)。 はい -
データベース String データベース名。 はい -
テーブル String 書き込むテーブルの名前。 はい -
Batch Batch データをバッチ処理する方法。 いいえ 60s 10s
再試行 再試行 使用する再試行ポリシー。 いいえ default fixed
認証1 String Azure Data Explorer に接続する認証の詳細。 Service principal または Managed identity サービス プリンシパル はい -
列 > 名前 string 列の名前。 はい temperature
列 > パス Path 列の値を読み取るデータの各レコード内の場所。 いいえ .{{name}} .temperature

1認証: 現在、変換先ステージでは、Azure Data Explorer に接続するときにサービス プリンシパル ベースの認証またはマネージド ID がサポートされています。

サービス プリンシパル ベースの認証を構成するには、次の値を指定します。 サービス プリンシパルを作成し、クラスターにシークレット参照を追加したときに、これらの値をメモしました。

フィールド Description 必須
TenantId テナント ID。 はい
ClientId データベースへのアクセス権を持つサービス プリンシパルを作成するときに記録したアプリ ID。 はい
Secret クラスターで作成したシークレット参照。 はい

サンプル構成

次の JSON 例は、データベースの quickstart テーブルにメッセージ全体を書き込む Azure Data Explorer 変換先ステージの完全な構成を示しています。

{
    "displayName": "Azure data explorer - 71c308",
    "type": "output/dataexplorer@v1",
    "viewOptions": {
        "position": {
            "x": 0,
            "y": 784
        }
    },
    "clusterUrl": "https://clusterurl.region.kusto.windows.net",
    "database": "databaseName",
    "table": "quickstart",
    "authentication": {
        "type": "servicePrincipal",
        "tenantId": "tenantId",
        "clientId": "clientId",
        "clientSecret": "secretReference"
    },
    "batch": {
        "time": "5s",
        "path": ".payload"
    },
    "columns": [
        {
            "name": "Timestamp",
            "path": ".Timestamp"
        },
        {
            "name": "AssetName",
            "path": ".assetName"
        },
        {
            "name": "Customer",
            "path": ".Customer"
        },
        {
            "name": "Batch",
            "path": ".Batch"
        },
        {
            "name": "CurrentTemperature",
            "path": ".CurrentTemperature"
        },
        {
            "name": "LastKnownTemperature",
            "path": ".LastKnownTemperature"
        },
        {
            "name": "Pressure",
            "path": ".Pressure"
        },
        {
            "name": "IsSpare",
            "path": ".IsSpare"
        }
    ],
    "retry": {
        "type": "fixed",
        "interval": "20s",
        "maxRetries": 4
    }
}

この構成は次のことを定義します。

  • メッセージは 5 秒間でバッチ処理されます。
  • バッチ パス .payload を使用して、列のデータを検索します。

次の例は、Azure Data Explorer 変換先ステージへの入力メッセージのサンプルを示しています。

{
  "payload": {
    "Batch": 102,
    "CurrentTemperature": 7109,
    "Customer": "Contoso",
    "Equipment": "Boiler",
    "IsSpare": true,
    "LastKnownTemperature": 7109,
    "Location": "Seattle",
    "Pressure": 7109,
    "Timestamp": "2023-08-10T00:54:58.6572007Z",
    "assetName": "oven"
  },
  "qos": 0,
  "systemProperties": {
    "partitionId": 0,
    "partitionKey": "quickstart",
    "timestamp": "2023-11-06T23:42:51.004Z"
  },
  "topic": "quickstart"
}