Så här matar du in data med REST-API:et
Kusto.Ingest-biblioteket rekommenderas för inmatning av data till klustret. Du kan dock fortfarande uppnå nästan samma funktioner, utan att vara beroende av Kusto.Ingest-paketet. Den här artikeln visar hur du använder köad inmatning till klustret för pipelines i produktionsklass.
Anteckning
Koden nedan är skriven i C# och använder Azure Storage SDK, Microsoft Authentication Library (MSAL) och NewtonSoft.JSON-paketet för att förenkla exempelkoden. Vid behov kan motsvarande kod ersättas med lämpliga REST API-anrop för Azure Storage , non-.NET MSAL-paket och eventuella tillgängliga JSON-hanteringspaket.
Den här artikeln handlar om det rekommenderade inmatningsläget. För Kusto.Ingest-biblioteket är dess motsvarande entitet gränssnittet IKustoQueuedIngestClient . Här interagerar klientkoden med klustret genom att publicera meddelanden om inmatningsmeddelanden till en Azure-kö. Referenser till meddelandena hämtas från Kusto-Datahantering(kallas även inmatningstjänsten). Interaktion med tjänsten måste autentiseras med Microsoft Entra-ID.
Följande kod visar hur Kusto Datahantering-tjänsten hanterar köad datainmatning utan att använda Kusto.Ingest-biblioteket. Det här exemplet kan vara användbart om fullständig .NET är otillgänglig eller otillgänglig på grund av miljön eller andra begränsningar.
Koden innehåller stegen för att skapa en Azure Storage-klient och ladda upp data till en blob. Varje steg beskrivs mer detaljerat efter exempelkoden.
- Hämta en autentiseringstoken för åtkomst till inmatningstjänsten
- Fråga inmatningstjänsten för att hämta:
- Ladda upp data till en blob på en av blobcontainrarna som hämtats från Kusto i (2)
- Skapa ett inmatningsmeddelande som identifierar måldatabasen och tabellen och som pekar på bloben från (3)
- Publicera inmatningsmeddelandet som vi skrev i (4) till en inmatningskö som hämtats i (2)
- Hämta eventuella fel som hittades av tjänsten under inmatningen
// A container class for ingestion resources we are going to obtain
internal class IngestionResourcesSnapshot
{
public IList<string> IngestionQueues { get; set; } = new List<string>();
public IList<string> TempStorageContainers { get; set; } = new List<string>();
public string FailureNotificationsQueue { get; set; } = string.Empty;
public string SuccessNotificationsQueue { get; set; } = string.Empty;
}
public static void IngestSingleFile(string file, string db, string table, string ingestionMappingRef)
{
// Your Azure Data Explorer ingestion service URI, typically ingest-<your cluster name>.kusto.windows.net
var dmServiceBaseUri = @"https://ingest-{serviceNameAndRegion}.kusto.windows.net";
// 1. Authenticate the interactive user (or application) to access Kusto ingestion service
var bearerToken = AuthenticateInteractiveUser(dmServiceBaseUri);
// 2a. Retrieve ingestion resources
var ingestionResources = RetrieveIngestionResources(dmServiceBaseUri, bearerToken);
// 2b. Retrieve Kusto identity token
var identityToken = RetrieveKustoIdentityToken(dmServiceBaseUri, bearerToken);
// 3. Upload file to one of the blob containers we got from Azure Data Explorer.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the containers in order to prevent throttling
var blobName = $"TestData{DateTime.UtcNow:yyyy-MM-dd_HH-mm-ss.FFF}";
var blobUriWithSas = UploadFileToBlobContainer(
file, ingestionResources.TempStorageContainers.First(), blobName,
out var blobSizeBytes
);
// 4. Compose ingestion command
var ingestionMessage = PrepareIngestionMessage(db, table, blobUriWithSas, blobSizeBytes, ingestionMappingRef, identityToken);
// 5. Post ingestion command to one of the previously obtained ingestion queues.
// This example uses the first one, but when working with multiple blobs,
// one should round-robin the queues in order to prevent throttling
PostMessageToQueue(ingestionResources.IngestionQueues.First(), ingestionMessage);
Thread.Sleep(20000);
// 6a. Read success notifications
var successes = PopTopMessagesFromQueue(ingestionResources.SuccessNotificationsQueue, 32);
foreach (var sm in successes)
{
Console.WriteLine($"Ingestion completed: {sm}");
}
// 6b. Read failure notifications
var errors = PopTopMessagesFromQueue(ingestionResources.FailureNotificationsQueue, 32);
foreach (var em in errors)
{
Console.WriteLine($"Ingestion error: {em}");
}
}
Använda köad inmatning för pipelines i produktionsklass
Hämta autentiseringsbevis från Microsoft Entra-ID
Här använder vi Microsoft Authentication Library (MSAL) för att hämta en Microsoft Entra token för att komma åt Kusto Datahantering-tjänsten och be om dess indataköer. MSAL är tillgängligt på flera plattformar.
// Authenticates the interactive user and retrieves Azure AD Access token for specified resource
internal static string AuthenticateInteractiveUser(string resource)
{
// Create an authentication client for Azure AD:
var authClient = PublicClientApplicationBuilder.Create("<appId>")
.WithAuthority("https://login.microsoftonline.com/<appTenant>")
.WithRedirectUri("<appRedirectUri>")
.Build();
// Acquire user token for the interactive user for Azure Data Explorer:
var result = authClient.AcquireTokenInteractive(
new[] { $"{resource}/.default" } // Define scopes
).ExecuteAsync().Result;
return result.AccessToken;
}
Hämta inmatningsresurser
Konstruera en HTTP POST-begäran manuellt till Datahantering-tjänsten och begära att inmatningsresurserna returneras. Dessa resurser omfattar köer som DM-tjänsten lyssnar på och blobcontainrar för datauppladdning. Datahantering-tjänsten bearbetar alla meddelanden som innehåller inmatningsbegäranden som kommer till någon av dessa köer.
// Retrieve ingestion resources (queues and blob containers) with SAS from specified ingestion service using supplied access token
internal static IngestionResourcesSnapshot RetrieveIngestionResources(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get ingestion resources\" }";
var ingestionResources = new IngestionResourcesSnapshot();
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
// Input queues
var tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SecuredReadyForAggregationQueue')]");
foreach (var token in tokens)
{
ingestionResources.IngestionQueues.Add((string)token[1]);
}
// Temp storage containers
tokens = responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'TempStorage')]");
foreach (var token in tokens)
{
ingestionResources.TempStorageContainers.Add((string)token[1]);
}
// Failure notifications queue
var singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'FailedIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.FailureNotificationsQueue = (string)singleToken;
// Success notifications queue
singleToken =
responseJson.SelectTokens("Tables[0].Rows[?(@.[0] == 'SuccessfulIngestionsQueue')].[1]").FirstOrDefault();
ingestionResources.SuccessNotificationsQueue = (string)singleToken;
return ingestionResources;
}
// Executes a POST request on provided URI using supplied Access token and request body
internal static WebResponse SendPostRequest(string uriString, string authToken, string body)
{
var request = WebRequest.Create(uriString);
request.Method = "POST";
request.ContentType = "application/json";
request.ContentLength = body.Length;
request.Headers.Set(HttpRequestHeader.Authorization, $"Bearer {authToken}");
using var bodyStream = request.GetRequestStream();
using (var sw = new StreamWriter(bodyStream))
{
sw.Write(body);
sw.Flush();
}
bodyStream.Close();
return request.GetResponse();
}
Hämta en Kusto-identitetstoken
Inmatningsmeddelanden överlämnas till klustret via en icke-direkt kanal (Azure-kö), vilket gör det omöjligt att utföra inbandsauktoriseringsvalidering för åtkomst till inmatningstjänsten. Lösningen är att koppla en identitetstoken till varje inmatningsmeddelande. Token aktiverar in-band-auktoriseringsverifiering. Den här signerade token kan sedan verifieras av inmatningstjänsten när den tar emot inmatningsmeddelandet.
// Retrieves a Kusto identity token that will be added to every ingest message
internal static string RetrieveKustoIdentityToken(string ingestClusterBaseUri, string accessToken)
{
var ingestClusterUri = $"{ingestClusterBaseUri}/v1/rest/mgmt";
var requestBody = "{ \"csl\": \".get kusto identity token\" }";
var jsonPath = "Tables[0].Rows[*].[0]";
using var response = SendPostRequest(ingestClusterUri, accessToken, requestBody);
using var sr = new StreamReader(response.GetResponseStream());
using var jtr = new JsonTextReader(sr);
var responseJson = JObject.Load(jtr);
var identityToken = responseJson.SelectTokens(jsonPath).FirstOrDefault();
return (string)identityToken;
}
Ladda upp data till Azure Blob-containern
Det här steget handlar om att ladda upp en lokal fil till en Azure Blob som kommer att överlämnas för inmatning. Den här koden använder Azure Storage SDK. Om beroende inte är möjligt kan det uppnås med Rest-API:et för Azure Blob Service.
// Uploads a single local file to an Azure Blob container, returns blob URI and original data size
internal static string UploadFileToBlobContainer(string filePath, string blobContainerUri, string blobName, out long blobSize)
{
var blobUri = new Uri(blobContainerUri);
var blobContainer = new BlobContainerClient(blobUri);
var blob = blobContainer.GetBlobClient(blobName);
using (var stream = File.OpenRead(filePath))
{
blob.UploadAsync(BinaryData.FromStream(stream));
blobSize = blob.GetProperties().Value.ContentLength;
}
return $"{blob.Uri.AbsoluteUri}{blobUri.Query}";
}
Skapa inmatningsmeddelandet
NewtonSoft.JSON-paketet skapar återigen en giltig inmatningsbegäran för att identifiera måldatabasen och tabellen, och det pekar på bloben. Meddelandet publiceras i Azure Queue som relevant Kusto-Datahantering-tjänst lyssnar på.
Här är några saker att tänka på.
- Den här begäran är det lägsta för inmatningsmeddelandet.
Anteckning
Identitetstoken är obligatorisk och måste vara en del av JSON-objektet AdditionalProperties .
- Vid behov måste även csvMapping- eller JsonMapping-egenskaper tillhandahållas
- Mer information finns i artikeln om inmatningsmappning före skapande.
- Intern struktur för avsnittsinmatningsmeddelande ger en förklaring av inmatningsmeddelandestrukturen
internal static string PrepareIngestionMessage(string db, string table, string dataUri, long blobSizeBytes, string mappingRef, string identityToken)
{
var message = new JObject
{
{ "Id", Guid.NewGuid().ToString() },
{ "BlobPath", dataUri },
{ "RawDataSize", blobSizeBytes },
{ "DatabaseName", db },
{ "TableName", table },
{ "RetainBlobOnSuccess", true }, // Do not delete the blob on success
{ "FlushImmediately", true }, // Do not aggregate
{ "ReportLevel", 2 }, // Report failures and successes (might incur perf overhead)
{ "ReportMethod", 0 }, // Failures are reported to an Azure Queue
{
"AdditionalProperties", new JObject(
new JProperty("authorizationContext", identityToken),
new JProperty("mappingReference", mappingRef),
// Data is in JSON format
new JProperty("format", "multijson")
)
}
};
return message.ToString();
}
Publicera inmatningsmeddelandet i inmatningskön
Publicera slutligen meddelandet som du skapade i den valda inmatningskö som du tidigare fick.
Anteckning
.Net Storage-klientversioner under v12 kodar som standard meddelandet till base64 Mer information finns i lagringsdokument. Om du använder .Net Storage-klientversioner ovan v12 måste du koda meddelandeinnehållet korrekt.
internal static void PostMessageToQueue(string queueUriWithSas, string message)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
queue.SendMessage(message);
}
Sök efter felmeddelanden från Azure-kön
Efter inmatningen söker vi efter felmeddelanden från den relevanta kön som Datahantering skriver till. Mer information om meddelandestrukturen för fel finns i Strukturen för inmatningsfelmeddelande.
internal static IEnumerable<string> PopTopMessagesFromQueue(string queueUriWithSas, int count)
{
var queue = new QueueClient(new Uri(queueUriWithSas));
var messagesFromQueue = queue.ReceiveMessages(maxMessages: count).Value;
var messages = messagesFromQueue.Select(m => m.MessageText);
return messages;
}
Inmatningsmeddelanden – JSON-dokumentformat
Intern struktur för inmatningsmeddelande
Meddelandet som Kusto Datahantering-tjänsten förväntar sig att läsa från indata i Azure Queue är ett JSON-dokument i följande format.
{
"Id" : "<ID>",
"BlobPath" : "https://<AccountName>.blob.core.windows.net/<ContainerName>/<PathToBlob>?<SasToken>",
"RawDataSize" : "<RawDataSizeInBytes>",
"DatabaseName": "<DatabaseName>",
"TableName" : "<TableName>",
"RetainBlobOnSuccess" : "<RetainBlobOnSuccess>",
"FlushImmediately": "<true|false>",
"ReportLevel" : <0-Failures, 1-None, 2-All>,
"ReportMethod" : <0-Queue, 1-Table>,
"AdditionalProperties" : { "<PropertyName>" : "<PropertyValue>" }
}
Egenskap | Beskrivning |
---|---|
Id | Meddelandeidentifierare (GUID) |
BlobPath | Sökväg (URI) till bloben, inklusive SAS-nyckeln som ger behörighet att läsa/skriva/ta bort den. Behörigheter krävs så att inmatningstjänsten kan ta bort bloben när den har slutfört inmatningen av data. |
RawDataSize | Storleken på okomprimerade data i byte. Om du anger det här värdet kan inmatningstjänsten optimera inmatningen genom att potentiellt aggregera flera blobar. Den här egenskapen är valfri, men om den inte anges kommer tjänsten åt bloben bara för att hämta storleken. |
DatabaseName | Måldatabasnamn |
TableName | Måltabellnamn |
RetainBlobOnSuccess | Om värdet true är tas bloben inte bort när inmatningen har slutförts. Standard är false |
FlushImmediately | Om inställningen är true inställd på hoppas alla aggregeringar över. Standard är false |
ReportLevel | Rapportnivå för lyckade/fel: 0-Fel, 1-Ingen, 2-Alla |
ReportMethod | Rapporteringsmekanism: 0-Queue, 1-Table |
Ytterligare egenskaper | Andra egenskaper som format , tags och creationTime . Mer information finns i datainmatningsegenskaper. |
Meddelandestruktur för inmatningsfel
Meddelandet som Datahantering förväntar sig att läsa från indata i Azure Queue är ett JSON-dokument i följande format.
Egenskap | Beskrivning |
---|---|
OperationId | Åtgärdsidentifierare (GUID) som kan användas för att spåra åtgärden på tjänstsidan |
Databas | Måldatabasnamn |
Tabell | Måltabellens namn |
FailedOn | Feltidsstämpel |
IngestionSourceId | GUID som identifierar datasegmentet som inte kunde matas in |
IngestionSourcePath | Sökväg (URI) till datasegmentet som inte kunde matas in |
Information | Felmeddelande |
Felkod | Felkoden. Alla felkoder finns i Felkoder för inmatning. |
FailureStatus | Anger om felet är permanent eller tillfälligt |
RootActivityId | Korrelationsidentifieraren (GUID) som kan användas för att spåra åtgärden på tjänstsidan |
OriginatesFromUpdatePolicy | Anger om felet orsakades av en felaktig transaktionsuppdateringsprincip |
ShouldRetry | Anger om inmatningen kan lyckas om ett nytt försök görs som det är |
Feedback
https://aka.ms/ContentUserFeedback.
Kommer snart: Under hela 2024 kommer vi att fasa ut GitHub-problem som feedbackmekanism för innehåll och ersätta det med ett nytt feedbacksystem. Mer information finns i:Skicka och visa feedback för