Kusto Java SDK を使用してデータを取り込む

Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Java クライアント ライブラリを使用すると、Azure Data Explorer クラスターでデータの取り込み、管理コマンドの発行、データのクエリを実行できます。

この記事では、Azure Data Explorer の Java ライブラリを使用してデータを取り込む方法について説明します。 まず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、Java SDK を使用して Blob Storage からクラスターへのインジェストをキューに登録し、結果を確認します。

前提条件

コードの確認

このセクションは省略可能です。 コードの動作方法については、次のコード スニペットを確認してください。 このセクションをスキップするには、「アプリケーションの実行」に進んでください。

認証

プログラムでは、ConnectionStringBuilder Microsoft Entra認証資格情報を使用します。

  1. クエリと管理のために com.microsoft.azure.kusto.data.Client を作成します。

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. com.microsoft.azure.kusto.ingest.IngestClient を作成して使用し、Azure Data Explorer へのデータ インジェストをキューに入れます。

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

管理コマンド

.createなどの.drop管理コマンドは、 オブジェクトで com.microsoft.azure.kusto.data.Client を呼び出executeすことによって実行されます。

たとえば、StormEvents テーブルは次のように作成されます。

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

データ インジェスト

既存の Azure Blob Storage コンテナーのファイルを使用してインジェストをキューに登録します。

  • BlobSourceInfo を使用して、Blob Storage パスを指定します。
  • IngestionProperties を使用して、テーブル、データベース、マッピング名、およびデータ型を定義します。 次の例では、データ型は CSV です。
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

インジェスト プロセスは別のスレッドで開始され、main スレッドはインジェスト スレッドが完了するまで待機します。 このプロセスでは、CountdownLatch が使用されます。 インジェスト API (IngestClient#ingestFromBlob) は非同期ではありません。 while ループを使用して、現在の状態が 5 秒ごとにポーリングされ、インジェストの状態が Pending から別の状態に変わるまで待機します。 最終的な状態は、SucceededFailed、または PartiallySucceeded です。

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

ヒント

さまざまなアプリケーションで非同期にインジェストを処理するほかの方法があります。 たとえば、CompletableFuture を使用して、テーブルに対してクエリを実行したり、IngestionStatus に報告された例外を処理するなど、インジェスト後のアクションを定義するパイプラインを作成できます。

アプリケーションの実行

全般

サンプル コードを実行すると、次のアクションが行われます。

  1. テーブルを削除する: StormEvents テーブルが削除されます (存在する場合)。
  2. テーブルの作成: StormEvents テーブルが作成されます。
  3. マッピングの作成: StormEvents_CSV_Mapping マッピングが作成されます。
  4. ファイルのインジェスト: (Azure Blob Storage 内の) CSV ファイルがインジェスト用にキューに登録されます。

次のサンプル コードは、App.java からのものです。

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

ヒント

操作のさまざまな組み合わせを試すには、App.java 内の各メソッドをコメント解除したり、コメント化したりします。

アプリケーションの実行

  1. GitHub から次のサンプル コードをクローンします。

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. プログラムで使用される環境変数として、次の情報を使用してサービス プリンシパル情報を設定します。

    • クラスター エンドポイント
    • データベース名
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. ビルドして実行します。

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    次のように出力されます。

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

インジェスト プロセスが完了するまで数分待ちます。 正常に完了すると、Ingestion completed successfully というログ メッセージが表示されます。 この時点でプログラムを終了し、既にキューに登録されているインジェスト プロセスに影響を与えずに次の手順に進むことができます。

検証

キューに登録されたインジェストでインジェスト プロセスがスケジュールされ、Azure Data Explorer にデータが読み込まれるまで、5 分から 10 分待ちます。

  1. https://dataexplorer.azure.com にサインインして、クラスターに接続します。

  2. 次のコマンドを実行して、StormEvents テーブル内のレコードの数を取得します。

    StormEvents | count
    

トラブルシューティング

  1. 過去 4 時間以内のインジェスト エラーを表示するには、データベースで次のコマンドを実行します。

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. 過去 4 時間以内のすべてのインジェスト操作の状態を表示するには、次のコマンドを実行します。

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

リソースをクリーンアップする

作成したリソースを使用する予定がない場合は、データベースで次のコマンドを実行して、StormEvents テーブルを削除します。

.drop table StormEvents