Delen via


Een Event Hub-gegevensverbinding maken voor Azure Synapse Data Explorer met behulp van C# (preview)

Azure Synapse Data Explorer is een snelle en zeer schaalbare gegevensverkenningsservice voor logboek- en telemetriegegevens. Azure Synapse Data Explorer biedt opname (gegevens laden) van Event Hubs, IoT Hubs en blobs die zijn geschreven naar blobcontainers.

In dit artikel maakt u een Event Hub-gegevensverbinding voor Azure Synapse Data Explorer met behulp van C#.

Vereisten

  • Een Azure-abonnement. Maak een gratis Azure-account.

  • Een Data Explorer-pool maken met Synapse Studio of Azure Portal

  • Maak een Data Explorer-database.

    1. Selecteer Gegevens in Synapse Studio in het linkerdeelvenster.

    2. Selecteer + (Nieuwe resource toevoegen) >Data Explorer-pool en gebruik de volgende informatie:

      Instelling Voorgestelde waarde Beschrijving
      Poolnaam contosodataexplorer De naam van de Data Explorer-pool die moet worden gebruikt
      Naam TestDatabase De databasenaam moet uniek zijn binnen het cluster.
      Standaardretentieperiode 365 De periode (in dagen) dat de gegevens gegarandeerd beschikbaar blijven voor query's. De periode wordt gemeten vanaf het moment dat de gegevens zijn opgenomen.
      Standaardcacheperiode 31 De periode (in dagen) dat vaak opgevraagde gegevens beschikbaar blijven in de SSD-opslag of het RAM-geheugen in plaats van in de langetermijnopslag.
    3. Selecteer Maken om het profiel te maken. Het maakproces duurt meestal minder dan een minuut.

Notitie

Het opnemen van gegevens van een Event Hub in Data Explorer-pools werkt niet als uw Synapse-werkruimte gebruikmaakt van een beheerd virtueel netwerk met gegevensexfiltratiebeveiliging ingeschakeld.

Een tabel maken in het testcluster

Maak een tabel met de naam StormEvents die overeenkomt met het schema van de gegevens in het bestand StormEvents.csv.

Tip

Met de volgende codefragmenten maakt u voor bijna elke aanroep een exemplaar van een client. Dit wordt gedaan om elk fragment afzonderlijk uit te voeren. In productie worden de clientexemplaren opnieuw in gebruik genomen en moeten ze zo lang worden bewaard als nodig is. Eén clientexemplaren per URI zijn voldoende, zelfs wanneer u met meerdere databases werkt (database kan op opdrachtniveau worden opgegeven).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Toewijzing van opname definiëren

Wijs de binnenkomende CSV-gegevens toe aan de kolomnamen die zijn gebruikt bij het maken van de tabel. Richt een CSV-kolomtoewijzingsobject in voor die tabel.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

C# NuGet installeren

Verificatie

Als u het volgende voorbeeld wilt uitvoeren, hebt u een Microsoft Entra-toepassing en service-principal nodig die toegang heeft tot resources. Zie Een Microsoft Entra-toepassing maken om een gratis Microsoft Entra-toepassing te maken en roltoewijzing toe te voegen op abonnementsniveau. U hebt ook de map-id (tenant), de toepassings-id en het clientgeheim nodig.

Een Event Hub-gegevensverbinding toevoegen

In het volgende voorbeeld ziet u hoe u programmatisch een Event Hub-gegevensverbinding toevoegt. Zie verbinding maken met de Event Hub voor informatie over het toevoegen van een Event Hub-gegevensverbinding met behulp van Azure Portal.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Instelling Voorgestelde waarde Veldomschrijving
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx Uw tenant-id. Ook wel map-id genoemd.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx De abonnements-id die u gebruikt voor het maken van resources.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx De client-id van de toepassing die toegang heeft tot resources in uw tenant.
clientSecret xxxxxxxxxxxx Het clientgeheim van de toepassing die toegang heeft tot resources in uw tenant.
resourceGroupName testrg De naam van de resourcegroep die uw cluster bevat.
clusterName mykustocluster De naam van het cluster.
databaseName mykustodatabase De naam van de doeldatabase in uw cluster.
dataConnectionName myeventhubconnect De gewenste naam van uw gegevensverbinding.
tableName StormEvents De naam van de doeltabel in de doeldatabase.
mappingRuleName StormEvents_CSV_Mapping De naam van de kolomtoewijzing die is gerelateerd aan de doeltabel.
dataFormat CSV De gegevensindeling van het bericht.
eventHubResourceId Resource-id De resource-id van uw Event Hub die de gegevens bevat voor opname.
consumerGroup $Default De consumentengroep van uw Event Hub.
locatie US - centraal De locatie van de gegevensverbindingsresource.
compressie Gzip of Geen Het type gegevenscompressie.

Gegevens genereren

Bekijk de voorbeeld-app waarmee gegevens worden gegenereerd en naar een Event Hub worden verzonden.

Een gebeurtenis kan een of meer records bevatten, tot de groottelimiet. In het volgende voorbeeld verzenden we twee gebeurtenissen, waarbij elk vijf records is toegevoegd:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Resources opschonen

Gebruik de volgende opdracht om de gegevensverbinding te verwijderen:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Volgende stappen