Dela via


Mata in data med Kusto .NET SDK

Det finns två klientbibliotek för .NET: ett inmatningsbibliotek och ett databibliotek. Mer information om .NET SDK finns i om .NET SDK. I biblioteken kan du mata in (läsa in) data i ett kluster och fråga data från din kod. I den här artikeln skapar du först en tabell och datamappning i ett testkluster. Sedan köar du en inmatning till klustret och verifierar resultatet.

Förutsättningar

  • Ett Microsoft-konto eller en Microsoft Entra användaridentitet. En Azure-prenumeration krävs inte.
  • Ett kluster och en databas. Skapa ett kluster och en databas.

Installera biblioteket för inmatning

Install-Package Microsoft.Azure.Kusto.Ingest

Lägga till autentisering och skapa anslutningssträng

Autentisering

För att autentisera ett program använder SDK:t ditt Microsoft Entra klientorganisations-ID. Du hittar ditt klient-ID genom att använda följande URL. Byt ut YourDomain mot din domän.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Om din domän till exempel är contoso.com blir URL:en: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klicka på den här URL:en för att se resultatet. Den första raden ser ut så här.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Klient-ID är i det här fallet 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

I det här exemplet används en interaktiv Microsoft Entra användarautentisering för att komma åt klustret. Du kan också använda Microsoft Entra programautentisering med certifikat eller programhemlighet. Se till att ange rätt värden för tenantId och clusterUri innan du kör den här koden.

SDK:t är ett bekvämt sätt att konfigurera autentiseringsmetoden som en del av anslutningssträng. Fullständig dokumentation om anslutningssträngar finns i anslutningssträngar.

Anteckning

Den aktuella versionen av SDK stöder inte interaktiv användarautentisering på .NET Core. Om det behövs använder du Microsoft Entra användarnamn/lösenord eller programautentisering i stället.

Skapa anslutningssträngen

Nu kan du skapa anslutningssträng. Du skapar måltabellen och mappningen i ett senare steg.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Ange information om källfilen

Ange sökvägen till källfilen. Det här exemplet används en exempelfil som finns på Azure Blob Storage. StormEvents-exempeldatauppsättningen innehåller väderrelaterade data från National Centers for Environmental Information.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Skapa en tabell i ditt testkluster

Skapa en tabell med namnet StormEvents som matchar schemat för data i filen StormEvents.csv.

Tips

Följande kodfragment skapar en instans av en klient för nästan varje anrop. Detta görs för att göra varje kodfragment individuellt körbart. I produktion är klientinstanserna reentrant och bör behållas så länge som det behövs. Det räcker med en enskild klientinstans per URI, även när du arbetar med flera databaser (databasen kan anges på kommandonivå).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definiera mappning av inmatning

Mappa inkommande CSV-data till de kolumnnamn som används när du skapade tabellen. Etablera ett CSV-kolumnmappningsobjekt i den tabellen.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definiera batchprincip för tabellen

Batchbearbetning av inkommande data optimerar datashardstorleken, som styrs av batchbearbetningsprincipen för inmatning. Ändra principen med kommandot inmatningsbatchhanteringsprincip. Använd den här principen för att minska svarstiden för långsamt ankommande data.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Vi rekommenderar att du definierar ett Raw Data Size värde för inmatade data och inkrementellt minskar storleken till 250 MB, samtidigt som du kontrollerar om prestandan förbättras.

Du kan använda Flush Immediately egenskapen för att hoppa över batchbearbetning, även om detta inte rekommenderas för storskalig inmatning eftersom det kan orsaka dåliga prestanda.

Köa ett meddelande för inmatning

Köa ett meddelande för att hämta data från Blob Storage och mata in data. En anslutning upprättas till inmatningsklustret och en annan klient skapas för att fungera med slutpunkten.

Tips

Följande kodfragment skapar en instans av en klient för nästan varje anrop. Detta görs för att göra varje kodfragment individuellt körbart. I produktion är klientinstanserna reentrant och bör behållas så länge som det behövs. Det räcker med en enskild klientinstans per URI, även när du arbetar med flera databaser (databasen kan anges på kommandonivå).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Verifiera att data har matats in i tabellen

Vänta fem till tio minuter tills den köade inmatningen schemalägger inmatningen och läser in data i klustret. Kör sedan följande kod för att hämta posterna i tabellen StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Köra frågor för felsökning

Logga in på https://dataexplorer.azure.com och anslut till klustret. Kör följande kommando i din databas för att se om det fanns inmatningsfel under de senaste fyra timmarna. Ersätt namnet på databasen innan du kör.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Kör följande kommando för att visa status för alla åtgärder för inmatning under de sista fyra timmarna. Ersätt namnet på databasen innan du kör.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Rensa resurser

Om du planerar att följa våra andra artiklar behåller du de resurser som du har skapat. Om inte kör du följande kommando i din databas för att rensa tabellen StormEvents.

.drop table StormEvents