Sdílet prostřednictvím


Ingestování dat pomocí sady Kusto .NET SDK

Existují dvě klientské knihovny pro .NET: knihovna ingestování a datová knihovna. Další informace o sadě .NET SDK najdete v tématu o sadě .NET SDK. Tyto knihovny umožňují snadno ingestovat (načíst) data do clusteru a dotazovat se na data z vašeho kódu. V tomto článku nejprve vytvoříte tabulku a mapování dat v testovacím clusteru. Pak zařadíte příjem dat do clusteru a ověříte výsledky.

Požadavky

  • Účet Microsoft nebo identita uživatele Microsoft Entra. Předplatné Azure není povinné.
  • Cluster a databáze. Vytvořte cluster a databázi.

Instalace knihovny ingestování

Install-Package Microsoft.Azure.Kusto.Ingest

Přidání ověřování a vytváření připojovací řetězec

Ověřování

K ověření aplikace používá sada SDK VAŠE ID tenanta Microsoft Entra. ID tenanta zjistíte pomocí následující adresy URL, ve které YourDomain nahradíte svou doménou.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Pokud je vaše doména například contoso.com, je adresa URL https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Kliknutím na tuto adresu URL zobrazte výsledky. První řádek vypadá jako v následujícím příkladu.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

V tomto případě je ID tenanta 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Tento příklad používá interaktivní ověřování uživatelů Microsoft Entra pro přístup ke clusteru. Můžete také použít ověřování aplikace Microsoft Entra s certifikátem nebo tajným kódem aplikace. Před spuštěním tohoto kódu nezapomeňte nastavit správné hodnoty.tenantId clusterUri

Sada SDK poskytuje pohodlný způsob, jak nastavit metodu ověřování jako součást připojovací řetězec. Kompletní dokumentaci k připojovací řetězec najdete v tématu připojovací řetězec s.

Poznámka:

Aktuální verze sady SDK nepodporuje interaktivní ověřování uživatelů v .NET Core. V případě potřeby místo toho použijte uživatelské jméno nebo heslo nebo ověřování aplikace Microsoft Entra.

Vytvoření připojovací řetězec

Teď můžete vytvořit připojovací řetězec. Cílovou tabulku a mapování vytvoříte v pozdějším kroku.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Nastavení informací o zdrojovém souboru

Nastavte cestu ke zdrojovému souboru. Tento příklad používá ukázkový soubor hostovaný v Azure Blob Storage. Ukázková datová sada StormEvents obsahuje data související s počasím z národních center pro informace o životním prostředí.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Vytvoření tabulky v testovacím clusteru

Vytvořte tabulku s názvem StormEvents , která odpovídá schématu StormEvents.csv dat v souboru.

Tip

Následující fragmenty kódu vytvoří instanci klienta pro téměř každé volání. To se provádí, aby každý fragment kódu byl jednotlivě spustitelný. V produkčním prostředí se instance klienta znovu zadají a měly by se uchovávat tak dlouho, jak je to potřeba. Jedna instance klienta na identifikátor URI stačí, i když pracujete s více databázemi (databázi je možné zadat na úrovni příkazu).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definování mapování ingestace

Namapujte příchozí data CSV na názvy sloupců použitých při vytváření tabulky. Zřiďte objekt mapování sloupců CSV v této tabulce.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definování zásad dávkování pro tabulku

Dávkování příchozích dat optimalizuje velikost horizontálních oddílů dat, která se řídí zásadami dávkování příjmu dat. Upravte zásadu pomocí příkazu pro správu zásad dávkování příjmu dat. Pomocí této zásady můžete snížit latenci pomalých příchozích dat.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Doporučujeme definovat Raw Data Size hodnotu přijatých dat a přírůstkově snížit velikost na 250 MB a zároveň zkontrolovat, jestli se výkon zlepšuje.

Tuto vlastnost můžete použít Flush Immediately k přeskočení dávkování, i když se nedoporučuje pro příjem dat ve velkém měřítku, protože může způsobit nízký výkon.

Přidání zprávy do fronty pro ingestaci

Zařadíte zprávu do fronty pro načtení dat z úložiště objektů blob a ingestování dat. Vytvoří se připojení ke clusteru pro příjem dat a vytvoří se jiný klient pro práci s tímto koncovým bodem.

Tip

Následující fragmenty kódu vytvoří instanci klienta pro téměř každé volání. To se provádí, aby každý fragment kódu byl jednotlivě spustitelný. V produkčním prostředí se instance klienta znovu zadají a měly by se uchovávat tak dlouho, jak je to potřeba. Jedna instance klienta na identifikátor URI stačí, i když pracujete s více databázemi (databázi je možné zadat na úrovni příkazu).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Ověření ingestování dat do tabulky

Počkejte pět až deset minut, než příjem dat ve frontě naplánuje příjem dat a načte je do clusteru. Pak spuštěním následujícího kódu získejte počet záznamů v tabulce StormEvents.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Spuštění dotazů pro řešení potíží

Přihlaste se k https://dataexplorer.azure.com a připojte se k vašemu clusteru. Spuštěním následujícího příkazu ve vaší databázi zjistíte, jestli za poslední čtyři hodiny došlo k chybám ingestování. Přes spuštěním nahraďte název databáze.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Spuštěním následujícího příkazu zobrazíte stav všech operací ingestace za poslední čtyři hodiny. Přes spuštěním nahraďte název databáze.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Vyčištění prostředků

Pokud máte v úmyslu postupovat podle našich dalších článků, ponechte prostředky, které jste vytvořili. Pokud ne, spuštěním následujícího příkazu v databázi tabulku StormEvents vyčistěte.

.drop table StormEvents