Azure Data Explorer の Node ライブラリを使用してデータを取り込む

Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Azure Data Explorer では、Node 用のクライアント ライブラリとして、取り込みライブラリデータ ライブラリの 2 つが用意されています。 これらのライブラリを使用すると、クラスターにデータを取り込み (読み込み)、コードからデータのクエリを行うことができます。 この記事ではまず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、クラスターに対するインジェストをキューに入れて、結果を検証します。

Azure サブスクリプションをお持ちでない場合は、開始する前に無料の Azure アカウントを作成してください。

前提条件

  • Microsoft アカウントまたはMicrosoft Entraユーザー ID。 Azure サブスクリプションは不要です。
  • Azure Data Explorer クラスターとデータベース。 クラスターとデータベースを作成します
  • 開発用コンピューターに Node.js がインストール済み

データ ライブラリと取り込みライブラリをインストールする

azure-kusto-ingestazure-kusto-data をインストールします。

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

import ステートメントおよび定数を追加する

ライブラリからクラスをインポートします。


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

アプリケーションを認証するために、Azure Data ExplorerはMicrosoft Entraテナント ID を使用します。 テナント ID を検索するには、「Microsoft 365 のテナント ID の検索」に従ってください。

このコードを実行する前に、authorityIdkustoUrikustoIngestUrikustoDatabase の値を設定します。

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

では、接続文字列を作成します。 この例では、デバイス認証を使用してクラスターにアクセスします。 コンソール出力を確認して認証を完了します。 Microsoft Entraアプリケーション証明書、アプリケーション キー、ユーザーとパスワードを使用することもできます。

ターゲット テーブルとマッピングは後のステップで作成します。

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

ソース ファイルの情報を設定する

さらに多くのクラスをインポートし、データソースファイルの定数を設定します。 この例では、Azure Blob Storage でホストされているサンプル ファイルを使います。 StormEvents サンプル データセットには、国立環境情報センターの気象関連データが含まれています。

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

テスト クラスターにテーブルを作成する

StormEvents.csv ファイル内のデータのスキーマと一致するテーブルを作成します。 このコードを実行すると、次のようなメッセージが返されます: "サインインするには、Web ブラウザーを使用してページ https://microsoft.com/devicelogin を開き、コード XXXXXXXXX を入力して認証します"。 この手順に従ってサインインし、元のページに戻って次のコード ブロックを実行します。 接続を行う後続のコード ブロックでは、再びサインインする必要があります。

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

インジェストのマッピングを定義する

受信した CSV データを、テーブル作成時に使用される列名とデータ型にマップします。

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

取り込みのためにメッセージをキューに入れる

BLOB ストレージからデータをプルし、そのデータを Azure Data Explorer に取り込むために、メッセージをキューに入れます。

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

テーブルにデータが格納されていることを確認する

データがテーブルに取り込まれたことを確認します。 キューに入れられたインジェストが取り込みをスケジュールされて、Azure Data Explorer にデータが読み込まれるまで、5 ~ 10 分待ちます。 その後、次のコードを実行して、StormEvents テーブル内のレコードの数を取得します。

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

トラブルシューティングのクエリを実行する

https://dataexplorer.azure.com にサインインして、クラスターに接続します。 データベースで次のコマンドを実行し、過去 4 時間以内にインジェスト エラーがあったかどうかを調べます。 実行する前にデータベース名を置き換えてください。

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

次のコマンドを実行し、過去 4 時間以内のすべてのインジェスト操作の状態を表示します。 実行する前にデータベース名を置き換えてください。

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

リソースをクリーンアップする

他の記事に進む場合は、作成したリソースをそのままにします。 行わない場合は、データベースで次のコマンドを実行して、StormEvents テーブルをクリーンアップします。

.drop table StormEvents