如何使用 REST API 內嵌數據

Kusto.Ingest 連結庫是用來將數據擷取至叢集的慣用連結庫。 然而,您仍可以達到近乎相同的功能,而不需要依賴於 Kusto.Ingest 封裝。 本文說明如何使用 佇列擷取 至叢集進行生產等級管線。

注意

下列程式碼是以 C# 撰寫,並利用 Azure 儲存體 SDK、Microsoft 驗證程式庫 (MSAL) 和 NewtonSoft.JSON 封裝,以簡化範本程式碼。 如有需要,您可以使用 Azure 儲存體 REST API 呼叫、non-.NET MSAL 封裝和任何可用 JSON 處理封裝,取代對應的程式碼。

本文將探討擷取的建議模式。 針對 Kusto.Ingest 程式庫,其對應的實體是 IKustoQueuedIngestClient 介面。 在這裡,用戶端程式代碼會將擷取通知訊息張貼到 Azure 佇列,以與您的叢集互動。 訊息的參考是從 Kusto 資料管理 (也稱為 Ingestion) 服務中取得。 與服務的互動必須使用 Microsoft Entra標識碼進行驗證。

下列程式碼說明 Kusto 資料管理服務如何處理已排入佇列的資料,而不須使用 Kusto.Ingest 程式庫。 如果因環境或其他限制無法存取或使用完整的 .NET,則此範例可能會很實用。

程式碼包含建立 Azure 儲存體用戶端和將資料上傳至 Blob 的步驟。 在範例程式碼之後,會更詳加說明每個步驟。

  1. 取得用來存取擷取服務的驗證令牌
  2. 查詢擷取服務以取得:
  3. 於 (2) 從 Kusto 取得的其中一個 Blob 上將資料上傳至 Blob
  4. 撰寫擷取訊息,其訊息會識別目標資料庫和資料表,並指向來自 (3) 的 Blob
  5. 將我們在 (4) 撰寫的擷取訊息張貼至在 (2 中取得的擷取佇列)
  6. 擷取在擷取期間服務發現的任何錯誤
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

針對生產等級管線使用佇列擷取

從 Microsoft Entra標識碼取得驗證辨識項

在這裡,我們會使用 Microsoft 驗證連結庫 (MSAL) 來取得 Microsoft Entra 令牌來存取 Kusto 資料管理 服務,並要求其輸入佇列。 MSAL 可供多個平台使用。

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

擷取擷取資源

手動建立資料管理服務的 HTTP POST 要求,以要求傳回擷取資源。 這些資源包括 DM 服務正在接聽的佇列,以及用於資料上傳的 Blob 容器。 針對抵達其中一個佇列的擷取要求,資料管理服務將會處理任何包含其相關擷取要求的訊息。

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

取得 Kusto 身分識別權杖

擷取訊息會透過非直接通道 (Azure 佇列) 傳送至叢集,因此無法執行頻內授權驗證來存取擷取服務。 解決方法便是將身分識別權杖附加至每個擷取訊息。 權杖會啟用頻內授權驗證。 接著,當擷取服務收到擷取訊息時,就可以驗證此已簽署的令牌。

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

將資料上傳至 Azure Blob 容器

此步驟是關於將本機檔案上傳至 Azure Blob,進而遞交以供擷取。 此程式碼會使用 Azure 儲存體 SDK。 如果完全沒有相依性,您可以使用 Azure Blob 服務 REST API 來達成。

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

撰寫擷取訊息

NewtonSoft.JSON 封裝會再次撰寫有效的擷取要求,以識別目標資料庫和資料表,接著指向 Blob。 訊息將會張貼至相關 Kusto 資料管理服務正在接聽的 Azure 佇列。

下列是一些須考量的事項。

  • 此要求是擷取訊息的下限。

注意

身分識別權證是必要的,且必須是 AdditionalProperties JSON 物件的一部分。

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

將擷取訊息張貼至擷取佇列

最後,將您建構的訊息張貼到您先前取得的選取擷取佇列。

注意

依預設,v12 以下的 .Net 儲存體用戶端版本會將訊息編碼為 base64,如需詳細資訊,請參閱儲存體文件。如果正在使用 v12 以上的 .Net 儲存體用戶端版本,則您必須正確編碼訊息內容。

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

檢查 Azure 佇列中的錯誤訊息

在擷取之後,我們會檢查從資料管理寫入的相關佇列中失敗訊息。 如需失敗訊息結構的詳細資訊,請參閱擷取失敗訊息結構

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

擷取訊息 - JSON 文件檔案格式

擷取訊息內部結構

Kusto 資料管理服務預期從輸入 Azure 佇列讀取的訊息是採用下列格式的 JSON 文件檔案。

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
屬性 Description
識別碼 訊息識別碼 (GUID)
BlobPath blob) 的路徑 (URI,包括授與讀取/寫入/刪除許可權的 SAS 金鑰。 需要許可權,才能在擷取服務完成擷取數據之後刪除 Blob。
RawDataSize 未壓縮資料的大小 (位元組)。 提供此值可讓擷取服務藉由匯總多個 Blob 來優化擷取。 這個屬性是選擇性的,但如果未指定,服務會直接存取 Blob 以擷取大小。
DatabaseName 目標資料庫名稱
TableName 目標資料表名稱
RetainBlobOnSuccess 如果設定為 true ,在成功完成擷取後,將不會刪除 Blob。 預設為 false
FlushImmediately 如果設定為 true,則會略過任何彙總。 預設為 false
ReportLevel 成功/錯誤回報層級:0-失敗、1-無、2-所有
ReportMethod 報告機制:0-佇列、1-資料表
AdditionalProperties 其他屬性,例如 formattagscreationTime。 如需詳細資訊,請參閱資料擷取屬性

擷取失敗訊息結構

資料管理預期從輸入 Azure 佇列讀取的訊息是採用下列格式的 JSON 文件檔案。

屬性 描述
OperationId 作業識別碼 (GUID),可用來追蹤服務端上的作業
資料庫 目標資料庫名稱
Table 目標資料表名稱
FailedOn 失敗時間戳記
IngestionSourceId 識別無法擷取之數據區塊的 GUID
IngestionSourcePath 無法擷取之數據區塊的路徑 (URI)
詳細資料 失敗訊息
ErrorCode 錯誤碼。 如需所有錯誤碼,請參閱 擷取錯誤碼
FailureStatus 表示失敗是永久或暫時性的
RootActivityId 關聯標識碼 (GUID) ,可用來追蹤服務端上的作業
OriginatesFromUpdatePolicy 表示失敗的原因是否為錯誤的交易更新原則
ShouldRetry 表示依原樣重試時,是否能成功擷取