REST API を使用してデータを取り込む方法

Kusto.Ingest ライブラリは、クラスターにデータを取り込む場合に推奨されます。 ただし、Kusto.Ingest パッケージに依存せずに、ほぼ同じ機能を実現することもできます。 この記事では、運用グレードのパイプライン用 にクラスターへのキューインジェスト を使用する方法について説明します。

Note

下のコードは C# で記述されており、サンプル コードを簡略化するために Azure Storage SDK、Microsoft Authentication Library (MSAL)、NewtonSoft.JSON パッケージを使用しています。 必要に応じて、対応するコードを適切な Azure Storage REST API 呼び出し、.NET 以外の MSAL パッケージ、使用可能な任意の JSON 処理パッケージに置き換えることができます。

この記事では、推奨されるインジェストのモードを取り扱います。 Kusto.Ingest ライブラリの場合、その対応するエンティティは IKustoQueuedIngestClient インターフェイスです。 ここでは、クライアント コードは、インジェスト通知メッセージを Azure キューに投稿することで、クラスターと対話します。 これらのメッセージへの参照は、Kusto データ管理サービス (インジェスト サービスともいう) から取得されます。 サービスとの対話は、Microsoft Entra ID で認証する必要があります。

次のコードは、Kusto データ管理サービスで、Kusto.Ingest ライブラリを使用せずにキュー データ インジェストを処理する方法を示しています。 この例は、環境やその他の制限のために、完全な .NET がアクセス不可または使用不可である場合に役立つ可能性があります。

このコードには、Azure Storage クライアントを作成し、BLOB にデータをアップロードするための手順が含まれています。 各手順は、サンプル コードの後で、さらに詳細に説明されています。

  1. インジェスト サービスにアクセスするための認証トークンを取得する
  2. インジェスト サービスにクエリを実行して、次の情報を取得します。
  3. (2) で Kusto から取得されたいずれかの BLOB コンテナー上の BLOB にデータをアップロードする
  4. ターゲットのデータベースとテーブルを識別し、(3) の BLOB を指すインジェスト メッセージを作成する
  5. (4) で構成したインジェスト メッセージを、(2) で取得したインジェスト キューに投稿します
  6. インジェスト中にサービスによって検出されたすべてのエラーを取得する
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

運用グレードのパイプラインでのキューインジェストの使用

Microsoft Entra ID から認証証拠を取得する

ここでは、Microsoft Authentication Library (MSAL) を使用して、Kusto データ管理 サービスにアクセスするためのMicrosoft Entra トークンを取得し、その入力キューを要求します。 MSAL は複数のプラットフォームで使用できます。

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

インジェスト リソースを取得する

データ管理サービスへの HTTP POST 要求を手動で構築して、インジェスト リソースを返すことを要求します。 これらのリソースには、DM サービスがリッスンしているキューや、データをアップロードするための BLOB コンテナーが含まれます。 データ管理サービスでは、これらのキューのいずれかに到達するインジェスト要求が含まれているすべてのメッセージを処理します。

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Kusto ID トークンを取得する

インジェスト メッセージは非ダイレクト チャネル (Azure キュー) を介してクラスターに渡されるため、インジェスト サービスにアクセスするためのインバンド承認検証を実行できなくなります。 解決策として、ID トークンをすべてのインジェスト メッセージにアタッチする方法があります。 このトークンにより、インバンド認可検証が可能になります。 この署名付きトークンは、インジェスト メッセージを受信したときにインジェスト サービスによって検証できます。

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Azure BLOB コンテナーにデータをアップロードする

この手順は、インジェストのために渡されるローカル ファイルの Azure BLOB へのアップロードに関するものです。 このコードでは、Azure Storage SDK を使用します。 依存関係を使用できない場合は、Azure Blob Service の REST API で実現できます。

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

インジェスト メッセージを作成する

NewtonSoft.JSON パッケージでは、ターゲットのデータベースとテーブルを識別し、BLOB を指すための有効なインジェスト要求をもう一度作成します。 このメッセージは、関連する Kusto データ管理サービスがリッスンしている Azure キューにポストされます。

いくつかの考慮すべき点を次に示します。

  • この要求は、インジェスト メッセージにとって必要最小限です。

Note

ID トークンは必須であり、AdditionalProperties JSON オブジェクトの一部である必要があります。

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

インジェスト メッセージをインジェスト キューに投稿する

最後に、作成したメッセージを、前に取得した選択したインジェスト キューに投稿します。

Note

v12 より前の .Net ストレージ クライアント バージョンでは、既定では、メッセージを Base64 にエンコードします。詳細については、ストレージのドキュメントを参照してください。v12 より後の .Net ストレージ クライアント バージョンを使用している場合は、メッセージ コンテンツを正しくエンコードする必要があります。

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Azure キューからのエラー メッセージがないかどうかを確認する

インジェストの後に、データ管理によって書き込まれる関連するキューからのエラー メッセージがないかどうかを確認します。 エラー メッセージの構造の詳細については、「インジェスト エラー メッセージの構造」を参照してください。

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

インジェスト メッセージ - JSON ドキュメント形式

インジェスト メッセージの内部構造

Kusto データ管理サービスで入力の Azure キューから読み取ることを予測しているメッセージは、次の形式の JSON ドキュメントです。

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
プロパティ 説明
Id メッセージ識別子 (GUID)
BlobPath 読み取り/書き込み/削除のアクセス許可を付与する SAS キーを含む、BLOB へのパス (URI)。 データの取り込みを完了したら、インジェスト サービスが BLOB を削除できるように、アクセス許可が必要です。
RawDataSize 非圧縮データのサイズ (バイト単位)。 この値を指定すると、インジェスト サービスは、複数の BLOB を集計することでインジェストを最適化できます。 このプロパティは省略可能ですが、指定しない場合、サービスはサイズを取得するためだけに BLOB にアクセスします。
DatabaseName ターゲット データベース名
TableName ターゲット テーブル名
RetainBlobOnSuccess true に設定されている場合、インジェストが正常に完了しても BLOB は削除されません。 既定値は false です
FlushImmediately true に設定されている場合、集計はすべてスキップされます。 既定値は false です
ReportLevel 成功/エラー レポートのレベル: 0 - エラー、1 - なし、2 - すべて
ReportMethod レポート メカニズム: 0 - キュー、1 - テーブル
AdditionalProperties formattagscreationTime などのその他のプロパティ。 詳細については、データ インジェストのプロパティに関するページを参照してください。

インジェスト エラー メッセージの構造

データ管理で入力の Azure キューから読み取ることを予測しているメッセージは、次の形式の JSON ドキュメントです。

プロパティ 説明
OperationId サービス側で操作を追跡するために使用できる操作識別子 (GUID)
データベース ターゲット データベース名
テーブル ターゲット テーブル名
FailedOn エラーのタイムスタンプ
IngestionSourceId 取り込みに失敗したデータ チャンクを識別する GUID
IngestionSourcePath 取り込みに失敗したデータ チャンクへのパス (URI)
詳細 エラー メッセージ
ErrorCode エラー コード。 すべてのエラー コードについては、「 インジェスト エラー コード」を参照してください。
FailureStatus エラーが永続的か一時的かを示します。
RootActivityId サービス側で操作を追跡するために使用できる関連付け識別子 (GUID)
OriginatesFromUpdatePolicy エラーが誤ったトランザクション更新ポリシーが原因で発生したかどうかを示します。
ShouldRetry そのまま再試行したとしたら、インジェストが成功する可能性があったかどうかを示します。