Dela via


Så här matar du in data med REST-API:et

Kusto.Ingest-biblioteket rekommenderas för inmatning av data till klustret. Du kan dock fortfarande uppnå nästan samma funktioner, utan att vara beroende av Kusto.Ingest-paketet. Den här artikeln visar hur du använder köad inmatning till klustret för pipelines i produktionsklass.

Anteckning

Koden nedan är skriven i C# och använder Azure Storage SDK, Microsoft Authentication Library (MSAL) och NewtonSoft.JSON-paketet för att förenkla exempelkoden. Vid behov kan motsvarande kod ersättas med lämpliga REST API-anrop för Azure Storage , non-.NET MSAL-paket och eventuella tillgängliga JSON-hanteringspaket.

Den här artikeln handlar om det rekommenderade inmatningsläget. För Kusto.Ingest-biblioteket är dess motsvarande entitet gränssnittet IKustoQueuedIngestClient . Här interagerar klientkoden med klustret genom att publicera meddelanden om inmatningsmeddelanden till en Azure-kö. Referenser till meddelandena hämtas från Kusto-Datahantering(kallas även inmatningstjänsten). Interaktion med tjänsten måste autentiseras med Microsoft Entra-ID.

Följande kod visar hur Kusto Datahantering-tjänsten hanterar köad datainmatning utan att använda Kusto.Ingest-biblioteket. Det här exemplet kan vara användbart om fullständig .NET är otillgänglig eller otillgänglig på grund av miljön eller andra begränsningar.

Koden innehåller stegen för att skapa en Azure Storage-klient och ladda upp data till en blob. Varje steg beskrivs mer detaljerat efter exempelkoden.

  1. Hämta en autentiseringstoken för åtkomst till inmatningstjänsten
  2. Fråga inmatningstjänsten för att hämta:
  3. Ladda upp data till en blob på en av blobcontainrarna som hämtats från Kusto i (2)
  4. Skapa ett inmatningsmeddelande som identifierar måldatabasen och tabellen och som pekar på bloben från (3)
  5. Publicera inmatningsmeddelandet som vi skrev i (4) till en inmatningskö som hämtats i (2)
  6. Hämta eventuella fel som hittades av tjänsten under inmatningen
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
    public IList<string> IngestionQueues { get; set; } = new List<string>();
    public IList<string> TempStorageContainers { get; set; } = new List<string>();

    public string FailureNotificationsQueue { get; set; } = string.Empty;
    public string SuccessNotificationsQueue { get; set; } = string.Empty;
}

public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
    // Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
    var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
    // 1. Authenticate the interactive user (or application) to access Kusto ingestion service
    var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
    // 2a. Retrieve ingestion resources
    var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
    // 2b. Retrieve Kusto identity token
    var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
    // 3. Upload file to one of the blob containers we got from Azure Data Explorer.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the containers in order to prevent throttling
    var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
    var blobUriWithSas = UploadFileToBlobContainer(
        file, ingestionResources.TempStorageContainers.First(), blobName,
        out var blobSizeBytes
    );
    // 4. Compose ingestion command
    var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
    // 5. Post ingestion command to one of the previously obtained ingestion queues.
    // This example uses the first one, but when working with multiple blobs,
    // one should round-robin the queues in order to prevent throttling
    PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);

    Thread.Sleep(20000);

    // 6a. Read success notifications
    var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
    foreach (var sm in successes)
    {
        Console.WriteLine($"Ingestion completed: {sm}");
    }

    // 6b. Read failure notifications
    var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
    foreach (var em in errors)
    {
        Console.WriteLine($"Ingestion error: {em}");
    }
}

Använda köad inmatning för pipelines i produktionsklass

Hämta autentiseringsbevis från Microsoft Entra-ID

Här använder vi Microsoft Authentication Library (MSAL) för att hämta en Microsoft Entra token för att komma åt Kusto Datahantering-tjänsten och be om dess indataköer. MSAL är tillgängligt på flera plattformar.

// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
    // Create an authentication client for Azure AD:
    var authClient = PublicClientApplicationBuilder.Create("<appId>")
        .WithAuthority("https://login.microsoftonline.com/<appTenant>")
        .WithRedirectUri("<appRedirectUri>")
        .Build();
    // Acquire user token for the interactive user for Azure Data Explorer:
    var result = authClient.AcquireTokenInteractive(
        new[] { $"{resource}/.default" } // Define scopes
    ).ExecuteAsync().Result;
    return result.AccessToken;
}

Hämta inmatningsresurser

Konstruera en HTTP POST-begäran manuellt till Datahantering-tjänsten och begära att inmatningsresurserna returneras. Dessa resurser omfattar köer som DM-tjänsten lyssnar på och blobcontainrar för datauppladdning. Datahantering-tjänsten bearbetar alla meddelanden som innehåller inmatningsbegäranden som kommer till någon av dessa köer.

// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get ingestion resources\" }";
    var ingestionResources = new IngestionResourcesSnapshot();
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    // Input queues
    var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
    foreach (var token in tokens)
    {
        ingestionResources.IngestionQueues.Add((string)token[1]);
    }
    // Temp storage containers
    tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
    foreach (var token in tokens)
    {
        ingestionResources.TempStorageContainers.Add((string)token[1]);
    }
    // Failure notifications queue
    var singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.FailureNotificationsQueue = (string)singleToken;
    // Success notifications queue
    singleToken =
        responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
    ingestionResources.SuccessNotificationsQueue = (string)singleToken;
    return ingestionResources;
}

// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
    var request = WebRequest.Create(uriString);
    request.Method = "POST";
    request.ContentType = "application/json";
    request.ContentLength = body.Length;
    request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
    using var bodyStream = request.GetRequestStream();
    using (var sw = new StreamWriter(bodyStream))
    {
        sw.Write(body);
        sw.Flush();
    }
    bodyStream.Close();
    return request.GetResponse();
}

Hämta en Kusto-identitetstoken

Inmatningsmeddelanden överlämnas till klustret via en icke-direkt kanal (Azure-kö), vilket gör det omöjligt att utföra inbandsauktoriseringsvalidering för åtkomst till inmatningstjänsten. Lösningen är att koppla en identitetstoken till varje inmatningsmeddelande. Token aktiverar in-band-auktoriseringsverifiering. Den här signerade token kan sedan verifieras av inmatningstjänsten när den tar emot inmatningsmeddelandet.

// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
    var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
    var requestBody = "{ \"csl\": \".get kusto identity token\" }";
    var jsonPath = "Tables[0].Rows[*].[0]";
    using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
    using var sr = new StreamReader(response.GetResponseStream());
    using var jtr = new JsonTextReader(sr);
    var responseJson = JObject.Load(jtr);
    var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
    return (string)identityToken;
}

Ladda upp data till Azure Blob-containern

Det här steget handlar om att ladda upp en lokal fil till en Azure Blob som kommer att överlämnas för inmatning. Den här koden använder Azure Storage SDK. Om beroende inte är möjligt kan det uppnås med Rest-API:et för Azure Blob Service.

// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
    var blobUri = new Uri(blobContainerUri);
    var blobContainer = new BlobContainerClient(blobUri);
    var blob = blobContainer.GetBlobClient(blobName);
    using (var stream = File.OpenRead(filePath))
    {
        blob.UploadAsync(BinaryData.FromStream(stream));
        blobSize = blob.GetProperties().Value.ContentLength;
    }
    return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}

Skapa inmatningsmeddelandet

NewtonSoft.JSON-paketet skapar återigen en giltig inmatningsbegäran för att identifiera måldatabasen och tabellen, och det pekar på bloben. Meddelandet publiceras i Azure Queue som relevant Kusto-Datahantering-tjänst lyssnar på.

Här är några saker att tänka på.

  • Den här begäran är det lägsta för inmatningsmeddelandet.

Anteckning

Identitetstoken är obligatorisk och måste vara en del av JSON-objektet AdditionalProperties .

internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
    var message = new JObject
    {
        { "Id", Guid.NewGuid().ToString() },
        { "BlobPath", dataUri },
        { "RawDataSize", blobSizeBytes },
        { "DatabaseName", db },
        { "TableName", table },
        { "RetainBlobOnSuccess", true }, // Do not delete the blob on success
        { "FlushImmediately", true }, // Do not aggregate
        { "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
        { "ReportMethod", 0 }, // Failures are reported to an Azure Queue
        {
            "AdditionalProperties", new JObject(
                new JProperty("authorizationContext", identityToken),
                new JProperty("mappingReference", mappingRef),
                // Data is in JSON format
                new JProperty("format", "multijson")
            )
        }
    };
    return message.ToString();
}

Publicera inmatningsmeddelandet i inmatningskön

Publicera slutligen meddelandet som du skapade i den valda inmatningskö som du tidigare fick.

Anteckning

.Net Storage-klientversioner under v12 kodar som standard meddelandet till base64 Mer information finns i lagringsdokument. Om du använder .Net Storage-klientversioner ovan v12 måste du koda meddelandeinnehållet korrekt.

internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    queue.SendMessage(message);
}

Sök efter felmeddelanden från Azure-kön

Efter inmatningen söker vi efter felmeddelanden från den relevanta kön som Datahantering skriver till. Mer information om meddelandestrukturen för fel finns i Strukturen för inmatningsfelmeddelande.

internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
    var queue = new QueueClient(new Uri(queueUriWithSas));
    var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
    var messages = messagesFromQueue.Select(m => m.MessageText);
    return messages;
}

Inmatningsmeddelanden – JSON-dokumentformat

Intern struktur för inmatningsmeddelande

Meddelandet som Kusto Datahantering-tjänsten förväntar sig att läsa från indata i Azure Queue är ett JSON-dokument i följande format.

{
    "Id" : "<ID>",
    "BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
    "RawDataSize" : "<RawDataSizeInBytes>",
    "DatabaseName": "<DatabaseName>",
    "TableName" : "<TableName>",
    "RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
    "FlushImmediately": "<true|false>",
    "ReportLevel" : <0-Failures, 1-None, 2-All>,
    "ReportMethod" : <0-Queue, 1-Table>,
    "AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Egenskap Beskrivning
Id Meddelandeidentifierare (GUID)
BlobPath Sökväg (URI) till bloben, inklusive SAS-nyckeln som ger behörighet att läsa/skriva/ta bort den. Behörigheter krävs så att inmatningstjänsten kan ta bort bloben när den har slutfört inmatningen av data.
RawDataSize Storleken på okomprimerade data i byte. Om du anger det här värdet kan inmatningstjänsten optimera inmatningen genom att potentiellt aggregera flera blobar. Den här egenskapen är valfri, men om den inte anges kommer tjänsten åt bloben bara för att hämta storleken.
DatabaseName Måldatabasnamn
TableName Måltabellnamn
RetainBlobOnSuccess Om värdet trueär tas bloben inte bort när inmatningen har slutförts. Standard är false
FlushImmediately Om inställningen är trueinställd på hoppas alla aggregeringar över. Standard är false
ReportLevel Rapportnivå för lyckade/fel: 0-Fel, 1-Ingen, 2-Alla
ReportMethod Rapporteringsmekanism: 0-Queue, 1-Table
Ytterligare egenskaper Andra egenskaper som format, tagsoch creationTime. Mer information finns i datainmatningsegenskaper.

Meddelandestruktur för inmatningsfel

Meddelandet som Datahantering förväntar sig att läsa från indata i Azure Queue är ett JSON-dokument i följande format.

Egenskap Beskrivning
OperationId Åtgärdsidentifierare (GUID) som kan användas för att spåra åtgärden på tjänstsidan
Databas Måldatabasnamn
Tabell Måltabellens namn
FailedOn Feltidsstämpel
IngestionSourceId GUID som identifierar datasegmentet som inte kunde matas in
IngestionSourcePath Sökväg (URI) till datasegmentet som inte kunde matas in
Information Felmeddelande
Felkod Felkoden. Alla felkoder finns i Felkoder för inmatning.
FailureStatus Anger om felet är permanent eller tillfälligt
RootActivityId Korrelationsidentifieraren (GUID) som kan användas för att spåra åtgärden på tjänstsidan
OriginatesFromUpdatePolicy Anger om felet orsakades av en felaktig transaktionsuppdateringsprincip
ShouldRetry Anger om inmatningen kan lyckas om ett nytt försök görs som det är