Share via


Erstellen einer Event-Hub-Datenverbindung für Azure Synapse Data Explorer mit C# (Vorschau)

Azure Synapse Data Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Protokoll- und Telemetriedaten. Azure Synapse Data Explorer bietet die Erfassung von Daten aus Event Hubs, IoT Hubs und Blobs, die in Blobcontainer geschrieben werden.

In diesem Artikel erstellen Sie eine Event-Hub-Datenverbindung für Azure Synapse Data Explorer mithilfe von C#.

Voraussetzungen

  • Ein Azure-Abonnement. Erstellen Sie ein kostenloses Azure-Konto.

  • Erstellen eines Data Explorer-Pools über Synapse Studio oder im Azure-Portal

  • Erstellen Sie eine Data Explorer-Datenbank.

    1. Wählen Sie in Synapse Studio im linken Bereich Daten aus.

    2. Wählen Sie + (Neue Ressource hinzufügen) >Data Explorer-Pool aus, und verwenden Sie die folgenden Informationen:

      Einstellung Vorgeschlagener Wert BESCHREIBUNG
      Poolname contosodataexplorer Name des zu verwendende Data Explorer-Pools
      Name TestDatabase Der Datenbankname muss innerhalb des Clusters eindeutig sein.
      Standardaufbewahrungszeitraum 365 Die Zeitspanne (in Tagen), für die garantiert wird, dass die Daten für Abfragen verfügbar bleiben. Die Zeitspanne wird ab dem Zeitpunkt gemessen, zu dem die Daten erfasst werden.
      Standardcachezeitraum 31 Die Zeitspanne (in Tagen), wie lange häufig abgefragte Daten im SSD-Speicher oder RAM (und nicht im längerfristigen Speicher) verfügbar bleiben.
    3. Wählen Sie Erstellen, um die Datenbank zu erstellen. Die Erstellung dauert in der Regel weniger als eine Minute.

Hinweis

Das Erfassen von Daten aus einem Event Hub in Data Explorer-Pools funktioniert nicht, wenn Ihr Synapse-Arbeitsbereich ein verwaltetes virtuelles Netzwerk mit aktiviertem Datenexfiltrationsschutz verwendet.

  • Visual Studio 2019. Sie können die kostenloseVisual Studio 2019 Community Edition herunterladen und verwenden. Aktivieren Sie beim Setup von Visual Studio die Option Azure-Entwicklung.

Erstellen einer Tabelle im Testcluster

Erstellen Sie eine Tabelle mit dem Namen StormEvents, die dem Schema der Daten in der Datei StormEvents.csv entspricht.

Tipp

Die folgenden Codeausschnitte erstellen eine Instanz eines Clients für fast jeden Aufruf. Dadurch kann jeder Ausschnitt einzeln ausgeführt werden. In der Produktionsumgebung sind die Clientinstanzen wiedereintrittsfähig und sollten so lange wie nötig aufbewahrt werden. Eine einzelne Clientinstanz pro URI ist ausreichend, auch wenn Sie mit mehreren Datenbanken arbeiten (Datenbanken können auf Befehlsebene angegeben werden).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definieren der Erfassungszuordnung

Ordnen Sie die eingehenden CSV-Daten den beim Erstellen der Tabelle verwendeten Spaltennamen zu. Stellen Sie ein Objekt für die CSV-Spaltenzuordnung in dieser Tabelle bereit.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installieren eines C#-NuGet-Pakets

Authentifizierung

Um das folgende Beispiel ausführen zu können, benötigen Sie eine Microsoft Entra-Anwendung und einen Dienstprinzipal, der auf Ressourcen zugreifen kann. Informationen zum Erstellen einer kostenlosen Microsoft Entra-Anwendung und Hinzufügen einer Rollenzuweisung auf Abonnementebene finden Sie unter Erstellen einer Microsoft Entra-Anwendung. Außerdem benötigen Sie die Verzeichnis-ID (Mandanten-ID), die Anwendungs-ID und den geheimen Clientschlüssel.

Hinzufügen einer Event Hub-Datenverbindung

Im folgenden Beispiel wird gezeigt, wie eine Event Hub-Datenverbindung programmgesteuert hinzugefügt wird. Weitere Informationen zum Hinzufügen einer Event-Hub-Datenverbindung mithilfe des Azure-Portals finden Sie unter Herstellen einer Verbindung mit dem Event Hub.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Einstellung Empfohlener Wert Feldbeschreibung
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Ihre Mandanten-ID. Wird auch als Verzeichnis-ID bezeichnet.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Die Abonnement-ID, die Sie für die Ressourcenerstellung verwenden.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Die Client-ID der Anwendung, die auf Ressourcen in Ihrem Mandanten zugreifen kann.
clientSecret xxxxxxxxxxxxxx Das Clientgeheimnis der Anwendung, die auf Ressourcen in Ihrem Mandanten zugreifen kann.
resourceGroupName testrg Der Name der Ressourcengruppe, die Ihren Cluster enthält.
clusterName mykustocluster Der Name Ihres Clusters.
databaseName mykustodatabase Der Name der Zieldatenbank in Ihrem Cluster.
dataConnectionName myeventhubconnect Der gewünschte Name Ihrer Datenverbindung.
tableName StormEvents Der Name der Zieltabelle in der Zieldatenbank.
mappingRuleName StormEvents_CSV_Mapping Der Name der Spaltenzuordnung, die mit der Zieltabelle verknüpft ist.
dataFormat csv Das Datenformat der Nachricht.
eventHubResourceId Ressourcen-ID Die Ressourcen-ID Ihres Event Hubs mit den Daten für die Erfassung.
consumerGroup $Default Die Consumergruppe Ihres Event Hubs.
location USA, Mitte Der Speicherort der Datenverbindungsressource.
compression Gzip oder Keine Der Typ der Datenkomprimierung

Generieren von Daten

Sehen Sie sich die Beispiel-App an, die Daten generiert und an einen Event Hub sendet.

Ein Ereignis kann entsprechend dem Größenlimit einen oder mehrere Datensätze enthalten. Im folgenden Beispiel werden zwei Ereignisse mit jeweils fünf angefügten Datensätzen gesendet:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Bereinigen von Ressourcen

Um die Datenverbindung zu löschen, verwenden Sie den folgenden Befehl:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Nächste Schritte