Azure Data Explorer Go SDK 使用してデータを取り込む

Azure Data Explorer は、ログと利用統計情報データのための高速で拡張性に優れたデータ探索サービスです。 Azure Data Explorer サービスとやり取りするための Go SDK クライアント ライブラリが提供されます。 Go SDK を使用して、Azure Data Explorer クラスター内のデータを取り込み、制御し、クエリを実行できます。

この記事ではまず、テスト クラスター内にテーブルとデータ マッピングを作成します。 その後、Go SDK を使用してクラスターに対するインジェストをキューに登録し、結果を確認します。

前提条件

Go SDK をインストールする

Go モジュールを使用するサンプル アプリケーションを実行すると、Azure Data Explorer Go SDK が自動的にインストールされます。 別のアプリケーション用の Go SDK をインストールした場合は、次の例のように、Go モジュールを作成し、(go get を使用して) Azure Data Explorer パッケージをフェッチします。

go mod init foo.com/bar
go get github.com/Azure/azure-kusto-go/kusto

パッケージの依存関係は go.mod ファイルに追加されます。 これは Go アプリケーションで使用します。

コードの確認

この「コードの確認」セクションは省略可能です。 コードのしくみに関心がある場合は、次のコード スニペットで確認できます。 それ以外の場合は、「アプリケーションの実行」に進んでください。

認証

何らかの操作を実行する前に、プログラムから Azure Data Explorer サービスに対して認証を行う必要があります。

auth := kusto.Authorization{Config: auth.NewClientCredentialsConfig(clientID, clientSecret, tenantID)}
client, err := kusto.New(kustoEndpoint, auth)

kusto.Authorization のインスタンスは、サービス プリンシパルの資格情報を使用して作成されます。 その後、kusto.Client を作成するために使用されます。その際に、クラスター エンドポイントも受け入れる新しい関数が使用されます。

テーブルの作成

create table コマンドは、Kusto ステートメントによって表されます。Mgmt 関数は、管理コマンドを実行するために使用されます。 これは、コマンドを実行してテーブルを作成するために使用されます。

func createTable(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createTableCommand))
	if err != nil {
		log.Fatal("failed to create table", err)
	}
	log.Printf("Table %s created in DB %s\n", kustoTable, kustoDB)
}

ヒント

セキュリティを強化するため、既定では、Kusto ステートメントは定数となります。 NewStmt で文字列定数が受け入れられます。 UnsafeStmt API で非定数のステートメント セグメントを使用できますが、推奨されません。

Kusto の create table コマンドは、次のようになります。

.create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)

マッピングを作成する

インジェスト中にデータ マッピングが使用され、受信データが Azure Data Explorer テーブル内の列にマップされます。 詳細については、「データ マッピング」を参照してください。 テーブルと同じ方法でマッピングが作成されます。その場合、Mgmt 関数を使用し、データベース名と適切なコマンドを指定します。 完全なコマンドは、サンプルの GitHub リポジトリで入手できます。

func createMapping(kc *kusto.Client, kustoDB string) {
	_, err := kc.Mgmt(context.Background(), kustoDB, kusto.NewStmt(createMappingCommand))
	if err != nil {
		log.Fatal("failed to create mapping - ", err)
	}
	log.Printf("Mapping %s created\n", kustoMappingRefName)
}

データの取り込み

インジェストは、既存の Azure Blob Storage コンテナーのファイルを使用してキューに登録されます。

func ingestFile(kc *kusto.Client, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable string) {
	kIngest, err := ingest.New(kc, kustoDB, kustoTable)
	if err != nil {
		log.Fatal("failed to create ingestion client", err)
	}
	blobStorePath := fmt.Sprintf(blobStorePathFormat, blobStoreAccountName, blobStoreContainer, blobStoreFileName, blobStoreToken)
	err = kIngest.FromFile(context.Background(), blobStorePath, ingest.FileFormat(ingest.CSV), ingest.IngestionMappingRef(kustoMappingRefName, ingest.CSV))

	if err != nil {
		log.Fatal("failed to ingest file", err)
	}
	log.Println("Ingested file from -", blobStorePath)
}

インジェスト クライアントは、ingest.New を使用して作成されます。 FromFile 関数は、Azure Blob Storage URI を参照するために使用されます。 マッピング参照名とデータ型は FileOption の形式で渡されます。

アプリケーションの実行

  1. GitHub から次のサンプル コードをクローンします。

    git clone https://github.com/Azure-Samples/Azure-Data-Explorer-Go-SDK-example-to-ingest-data.git
    cd Azure-Data-Explorer-Go-SDK-example-to-ingest-data
    
  2. main.go のこのスニペットに示されているサンプル コードを実行します。

    func main {
        ...
        dropTable(kc, kustoDB)
        createTable(kc, kustoDB)
        createMapping(kc, kustoDB)
        ingestFile(kc, blobStoreAccountName, blobStoreContainer, blobStoreToken, blobStoreFileName, kustoMappingRefName, kustoDB, kustoTable)
        ...
    }
    

    ヒント

    操作のさまざまな組み合わせを試すために、main.go 内の各関数をコメント解除したり、コメント化したりすることができます。

    サンプル コードを実行すると、次のアクションが行われます。

    1. テーブルを削除する: StormEvents テーブルが削除されます (存在する場合)。
    2. テーブルの作成: StormEvents テーブルが作成されます。
    3. マッピングの作成: StormEvents_CSV_Mapping マッピングが作成されます。
    4. ファイルのインジェスト: (Azure Blob Storage 内の) CSV ファイルがインジェスト用にキューに登録されます。
  3. 認証用のサービス プリンシパルを作成するには、Azure CLI を使用して az ad sp create-for-rbac コマンドを指定します。 プログラムによって使用される環境変数の形式で、クラスター エンドポイントとデータベース名を指定してサービス プリンシパル情報を設定します。

    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export AZURE_SP_TENANT_ID="<replace with tenant>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  4. 以下のプログラムを実行します。

    go run main.go
    

    次のような出力が表示されます。

    Connected to Azure Data Explorer
    Using database - testkustodb
    Failed to drop StormEvents table. Maybe it does not exist?
    Table StormEvents created in DB testkustodb
    Mapping StormEvents_CSV_Mapping created
    Ingested file from - https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv
    

確認してトラブルシューティングを行う

キューに登録されたインジェストでインジェスト プロセスがスケジュールされ、Azure Data Explorer にデータが読み込まれるまで、5 分から 10 分待ちます。

  1. https://dataexplorer.azure.com にサインインして、クラスターに接続します。 その後、次のコマンドを実行して、StormEvents テーブル内のレコードの数を取得します。

    StormEvents | count
    
  2. データベースで次のコマンドを実行し、過去 4 時間以内にインジェスト エラーがあったかどうかを調べます。 実行する前にデータベース名を置き換えてください。

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  3. 次のコマンドを実行し、過去 4 時間以内のすべてのインジェスト操作の状態を表示します。 実行する前にデータベース名を置き換えてください。

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

リソースをクリーンアップする

他の記事に進む場合は、作成したリソースをそのままにします。 そのようにしない場合は、データベースで次のコマンドを実行し、StormEvents テーブルを削除します。

.drop table StormEvents

次のステップ