Erfassen von Daten mit dem Kusto .NET SDK

Es gibt zwei Clientbibliotheken für .NET: eine Erfassungsbibliothek und eine Datenbibliothek. Weitere Informationen zum .NET SDK finden Sie unter Über .NET SDK. Mit diesen Bibliotheken können Sie über Ihren Code Daten in einem Cluster erfassen (laden) und Daten abfragen. In diesem Artikel erstellen Sie zunächst eine Tabelle und eine Datenzuordnung in einem Testcluster. Anschließend stellen Sie die Erfassung im Cluster in eine Warteschlange und überprüfen die Ergebnisse.

Voraussetzungen

Installieren der Erfassungsbibliothek

Install-Package Microsoft.Azure.Kusto.Ingest

Hinzufügen von Authentifizierung und Erstellen einer Verbindungszeichenfolge

Authentifizierung

Zum Authentifizieren einer Anwendung verwendet das SDK Ihre Microsoft Entra Mandanten-ID. Um Ihre Mandanten-ID zu suchen, verwenden Sie die folgende URL, und ersetzen Sie dabei YourDomain durch Ihre Domäne.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Wenn Ihre Domäne beispielsweise contoso.com ist, lautet die URL https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Klicken Sie auf diese URL, um die Ergebnisse anzuzeigen. Die erste Zeile lautet wie folgt.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

Die Mandanten-ID ist in diesem Fall 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

In diesem Beispiel wird eine interaktive Microsoft Entra Benutzerauthentifizierung für den Zugriff auf den Cluster verwendet. Sie können auch Microsoft Entra Anwendungsauthentifizierung mit Zertifikat oder Anwendungsgeheimnis verwenden. Achten Sie darauf, vor dem Ausführen dieses Codes die richtigen Werte für tenantId und clusterUri festzulegen.

Das SDK bietet eine bequeme Möglichkeit, die Authentifizierungsmethode als Teil der Verbindungszeichenfolge einzurichten. Eine vollständige Dokumentation zu Verbindungszeichenfolgen finden Sie unter Verbindungszeichenfolgen.

Hinweis

Die aktuelle Version des SDK unterstützt keine interaktive Benutzerauthentifizierung für .NET Core. Verwenden Sie bei Bedarf stattdessen Microsoft Entra Benutzernamen/Kennwort oder Anwendungsauthentifizierung.

Erstellen der Verbindungszeichenfolge

Jetzt können Sie die Verbindungszeichenfolge erstellen. Die Zieltabelle und die Zuordnung werden in einem späteren Schritt erstellt.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

Festlegen der Informationen zur Quelldatei

Legen Sie den Pfad für die Quelldatei fest. In diesem Beispiel wird eine Beispieldatei verwendet, die in Azure Blob Storage gehostet wird. Das StormEvents-Beispieldataset enthält wetterbezogene Daten aus den Nationalen Zentren für Umweltinformationen.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Erstellen einer Tabelle im Testcluster

Erstellen Sie eine Tabelle mit dem Namen StormEvents, die dem Schema der Daten in der Datei StormEvents.csv entspricht.

Tipp

Die folgenden Codeausschnitte erstellen eine Instanz eines Clients für fast jeden Aufruf. Dadurch kann jeder Ausschnitt einzeln ausgeführt werden. In der Produktionsumgebung sind die Clientinstanzen wiedereintrittsfähig und sollten so lange wie nötig aufbewahrt werden. Eine einzelne Clientinstanz pro URI ist ausreichend, auch wenn Sie mit mehreren Datenbanken arbeiten (Datenbanken können auf Befehlsebene angegeben werden).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definieren der Erfassungszuordnung

Ordnen Sie die eingehenden CSV-Daten den beim Erstellen der Tabelle verwendeten Spaltennamen zu. Stellen Sie ein Objekt für die CSV-Spaltenzuordnung in dieser Tabelle bereit.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Definieren der Batchverarbeitungsrichtlinie für die Tabelle

Durch die Batchverarbeitung eingehender Daten wird die Größe der Datenshards optimiert. Dieser Prozess wird durch die IngestionBatching-Richtlinie gesteuert. Ändern Sie die Richtlinie mit dem Befehl für die Verwaltung von Batchverarbeitungsrichtlinien für die Erfassung. Verwenden Sie diese Richtlinie, um die Latenz von langsam eintreffenden Daten zu reduzieren.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Es empfiehlt sich, den Wert Raw Data Size für die erfassten Daten zu definieren, die Größe schrittweise auf 250 MB zu verringern und gleichzeitig zu überprüfen, ob sich die Leistung verbessert.

Sie können die Eigenschaft Flush Immediately verwenden, um die Batchverarbeitung zu überspringen. Dies wird jedoch für die Erfassung im großen Umfang nicht empfohlen, da dies die Leistung beeinträchtigen kann.

Stellen einer Nachricht in eine Warteschlange für die Erfassung

Stellen Sie eine Nachricht zum Pullen von Daten aus Blob Storage in eine Warteschlange, und erfassen Sie die Daten. Eine Verbindung mit dem Erfassungscluster wird hergestellt, und ein anderer Client wird für die Verwendung mit diesem Endpunkt erstellt.

Tipp

Die folgenden Codeausschnitte erstellen eine Instanz eines Clients für fast jeden Aufruf. Dadurch kann jeder Ausschnitt einzeln ausgeführt werden. In der Produktionsumgebung sind die Clientinstanzen wiedereintrittsfähig und sollten so lange wie nötig aufbewahrt werden. Eine einzelne Clientinstanz pro URI ist ausreichend, auch wenn Sie mit mehreren Datenbanken arbeiten (Datenbanken können auf Befehlsebene angegeben werden).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Überprüfen, ob die Daten in der Tabelle erfasst wurden

Warten Sie fünf bis zehn Minuten, bis die Erfassung in die Warteschlange eingereiht wurde, um die Erfassung zu planen und die Daten in Ihren Cluster zu laden. Führen Sie dann den folgenden Code aus, um die Anzahl von Datensätzen in der Tabelle StormEvents zu erhalten.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Ausführen von Abfragen zur Problembehandlung

Melden Sie sich bei https://dataexplorer.azure.com an, und stellen Sie eine Verbindung mit Ihrem Cluster her. Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Erfassungsfehler aufgetreten sind. Ersetzen Sie den Namen der Datenbank vor dem Ausführen.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Führen Sie den folgenden Befehl aus, um den Status aller Erfassungsvorgänge in den letzten vier Stunden anzuzeigen. Ersetzen Sie den Namen der Datenbank vor dem Ausführen.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Bereinigen von Ressourcen

Wenn Sie unsere anderen Artikel durcharbeiten möchten, behalten Sie die erstellten Ressourcen bei. Wenn dies nicht der Fall ist, führen Sie den folgenden Befehl in der Datenbank aus, um die Tabelle StormEvents zu bereinigen.

.drop table StormEvents