Erfassen von Daten mit der REST-API

Die Kusto.Ingest-Bibliothek wird für die Erfassung von Daten in Ihrem Cluster bevorzugt. Aber auch ohne Abhängigkeit vom Paket „Kusto.Ingest“ können nahezu die gleichen Funktionen erreicht werden. In diesem Artikel erfahren Sie, wie Sie die Erfassung in die Warteschlange für Ihren Cluster für Pipelines auf Produktionsniveau verwenden.

Hinweis

Der folgende Code ist in C# geschrieben und verwendet das Azure Storage SDK, die Microsoft Authentication Library (MSAL) und das NewtonSoft.JSON-Paket, um den Beispielcode zu vereinfachen. Bei Bedarf kann der entsprechende Code durch geeignete Azure Storage-REST-API-Aufrufe , non-.NET MSAL-Paket und alle verfügbaren JSON-Verarbeitungspakete ersetzt werden.

In diesem Artikel wird der empfohlene Erfassungsmodus behandelt. Für die Kusto.Ingest-Bibliothek ist die entsprechende Entität die IKustoQueuedIngestClient-Schnittstelle . Hier interagiert der Clientcode mit Ihrem Cluster, indem Benachrichtigungen über die Erfassung an eine Azure-Warteschlange gesendet werden. Verweise auf die Nachrichten werden vom Kusto-Datenverwaltung-Dienst (auch als Erfassung bezeichnet) abgerufen. Die Interaktion mit dem Dienst muss mit Microsoft Entra ID authentifiziert werden.

Der folgende Code zeigt, wie der Kusto-Datenverwaltung-Dienst die Datenerfassung in der Warteschlange verarbeitet, ohne die Kusto.Ingest-Bibliothek zu verwenden. Dieses Beispiel kann nützlich sein, wenn auf vollständige .NET aufgrund der Umgebung oder anderer Einschränkungen nicht zugegriffen werden kann oder nicht verfügbar ist.

Der Code enthält die Schritte zum Erstellen eines Azure Storage-Clients und zum Hochladen der Daten in ein Blob. Jeder Schritt wird nach dem Beispielcode ausführlicher beschrieben.

  1. Abrufen eines Authentifizierungstokens für den Zugriff auf den Erfassungsdienst
  2. Fragen Sie den Erfassungsdienst ab, um Folgendes abzurufen:
  3. Hochladen von Daten in ein Blob auf einem der von Kusto in abgerufenen Blobcontainer (2)
  4. Verfassen Sie eine Erfassungsmeldung, die die Zieldatenbank und -tabelle identifiziert und auf das Blob von (3) verweist.
  5. Posten Sie die Erfassungsnachricht, die wir in (4) erstellt haben, in einer Erfassungswarteschlange, die in (2) abgerufen wurde.
  6. Abrufen eines Fehlers, der vom Dienst während der Erfassung gefunden wurde
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Verwenden der Erfassung in die Warteschlange für Pipelines auf Produktionsniveau

Abrufen von Authentifizierungsnachweisen aus Microsoft Entra ID

Hier verwenden wir die Microsoft Authentication Library (MSAL), um ein Microsoft Entra-Token für den Zugriff auf den Kusto-Datenverwaltung-Dienst abzurufen und nach dessen Eingabewarteschlangen zu fragen. MSAL ist auf mehreren Plattformen verfügbar.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Abrufen von Erfassungsressourcen

Erstellen Sie manuell eine HTTP POST-Anforderung an den Datenverwaltung-Dienst, und fordern Sie die Rückgabe der Erfassungsressourcen an. Zu diesen Ressourcen gehören Warteschlangen, an denen der DM-Dienst lauscht, und Blobcontainer für den Datenupload. Der Datenverwaltung-Dienst verarbeitet alle Nachrichten mit Erfassungsanforderungen, die in einer dieser Warteschlangen eingehen.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Abrufen eines Kusto-Identitätstokens

Erfassungsnachrichten werden über einen nicht direkten Kanal (Azure-Warteschlange) an Ihren Cluster übergeben, sodass eine In-Band-Autorisierungsüberprüfung für den Zugriff auf den Erfassungsdienst nicht möglich ist. Die Lösung besteht darin, an jede Erfassungsnachricht ein Identitätstoken anzufügen. Das Token ermöglicht die In-Band-Autorisierungsüberprüfung. Dieses signierte Token kann dann vom Erfassungsdienst überprüft werden, wenn er die Erfassungsnachricht empfängt.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Hochladen von Daten in den Azure Blob-Container

In diesem Schritt wird eine lokale Datei in ein Azure-Blob hochgeladen, das zur Erfassung übergeben wird. Dieser Code verwendet das Azure Storage SDK. Wenn abhängigkeiten nicht möglich sind, kann sie mit der Azure Blob Service-REST-API erreicht werden.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Verfassen der Erfassungsnachricht

Das NewtonSoft.JSON-Paket erstellt erneut eine gültige Erfassungsanforderung, um die Zieldatenbank und -tabelle zu identifizieren, die auf das Blob verweist. Die Nachricht wird an die Azure-Warteschlange gesendet, auf der der relevante Kusto-Datenverwaltung-Dienst lauscht.

Hier sind einige Punkte, die berücksichtigt werden sollten.

  • Diese Anforderung ist das absolute Minimum für die Erfassungsnachricht.

Hinweis

Das Identitätstoken ist obligatorisch und muss Teil des AdditionalProperties-JSON-Objekts sein.

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Posten der Erfassungsnachricht in der Erfassungswarteschlange

Posten Sie schließlich die von Ihnen erstellte Nachricht in der ausgewählten Erfassungswarteschlange, die Sie zuvor abgerufen haben.

Hinweis

.NET-Speicherclientversionen unter v12: Codieren Sie die Nachricht standardmäßig in base64. Weitere Informationen finden Sie in der Speicherdokumentation. Wenn Sie .NET-Speicherclientversionen über v12 verwenden, müssen Sie den Nachrichteninhalt ordnungsgemäß codieren.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Überprüfen auf Fehlermeldungen aus der Azure-Warteschlange

Nach der Erfassung überprüfen wir auf Fehlermeldungen aus der relevanten Warteschlange, in die der Datenverwaltung schreibt. Weitere Informationen zur Fehlermeldungsstruktur finden Sie unter Struktur von Erfassungsfehlermeldungen.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

Erfassungsmeldungen : JSON-Dokumentformate

Interne Struktur der Erfassungsnachricht

Die Meldung, dass der Kusto-Datenverwaltung-Dienst erwartet, dass er aus der Azure-Eingabewarteschlange liest, ist ein JSON-Dokument im folgenden Format.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Eigenschaft BESCHREIBUNG
Id Nachrichtenbezeichner (GUID)
BlobPath Pfad (URI) zum Blob, einschließlich des SAS-Schlüssels, der Berechtigungen zum Lesen/Schreiben/Löschen des Blobs erteilt. Berechtigungen sind erforderlich, damit der Erfassungsdienst das Blob löschen kann, nachdem er die Erfassung der Daten abgeschlossen hat.
RawDataSize Größe der unkomprimierten Daten in Bytes. Wenn Sie diesen Wert angeben, kann der Erfassungsdienst die Erfassung optimieren, indem er möglicherweise mehrere Blobs aggregiert. Diese Eigenschaft ist optional, aber wenn nicht angegeben, greift der Dienst nur auf das Blob zu, um die Größe abzurufen.
DatabaseName Name der Zieldatenbank
TableName Name der Zieltabelle
RetainBlobOnSuccess Wenn die Einstellung auf truefestgelegt ist, wird das Blob nicht gelöscht, sobald die Erfassung erfolgreich abgeschlossen wurde. Die Standardeinstellung ist false.
FlushImmediately Wenn auf truefestgelegt ist, wird jede Aggregation übersprungen. Die Standardeinstellung ist false.
ReportLevel Erfolgs-/Fehlerberichterstattungsebene: 0-Fehler, 1-None, 2-All
ReportMethod Berichterstellungsmechanismus: 0-Warteschlange, 1-Tabelle
AdditionalProperties Andere Eigenschaften wie format, tagsund creationTime. Weitere Informationen finden Sie unter Datenerfassungseigenschaften.

Meldungsstruktur für Erfassungsfehler

Die Meldung, dass die Datenverwaltung erwartet, dass sie aus der Azure-Eingabewarteschlange gelesen wird, ist ein JSON-Dokument im folgenden Format.

Eigenschaft BESCHREIBUNG
OperationId Vorgangsbezeichner (GUID), der zum Nachverfolgen des Vorgangs auf dienstseitiger Seite verwendet werden kann
Datenbank Name der Zieldatenbank
Tabelle Name der Zieltabelle
FailedOn Fehlerzeitstempel
IngestionSourceId GUID zur Identifizierung des Datenblocks, der nicht erfasst werden konnte
IngestionSourcePath Pfad (URI) zum Datenblock, der nicht erfasst werden konnte
Details Fehlermeldung
ErrorCode Der Fehlercode. Informationen zu allen Fehlercodes finden Sie unter Fehlercodes für die Erfassung.
FailureStatus Gibt an, ob der Fehler dauerhaft oder vorübergehend ist.
RootActivityId Der Korrelationsbezeichner (GUID), der zum Nachverfolgen des Vorgangs auf der Dienstseite verwendet werden kann
OriginatesFromUpdatePolicy Gibt an, ob der Fehler durch eine fehlerhafte Transaktionsaktualisierungsrichtlinie verursacht wurde.
ShouldRetry Gibt an, ob die Erfassung erfolgreich sein könnte, wenn ein Wiederholungsversuch durchgeführt wird.