Dela via


Skapa en Händelsehubb-dataanslutning för Azure Synapse Data Explorer med hjälp av C# (förhandsversion)

Azure Synapse Data Explorer är en snabb och mycket skalbar datautforskningstjänst för logg- och telemetridata. Azure Synapse Data Explorer erbjuder inmatning (datainläsning) från Event Hubs, IoT Hubs och blobar som skrivits till blobcontainrar.

I den här artikeln skapar du en Händelsehubb-dataanslutning för Azure Synapse Data Explorer med hjälp av C#.

Förutsättningar

  • En Azure-prenumeration. Skapa ett kostnadsfritt Azure-konto.

  • Skapa en Data Explorer-pool med Synapse Studio eller Azure-portalen

  • Skapa en Data Explorer-databas.

    1. Välj Data i fönstret till vänster i Synapse Studio.

    2. Välj + (Lägg till ny resurs) >Data Explorer-pool och använd följande information:

      Inställning Föreslaget värde beskrivning
      Poolnamn contosodataexplorer Namnet på datautforskarens pool som ska användas
      Name TestDatabase Databasnamnet måste vara unikt inom klustret.
      Standardkvarhållningsperiod 365 Det tidsintervall (i dagar) då det är garanterat att data förblir tillgängliga för frågor. Tidsintervallet mäts från det att data matas in.
      Standardcacheperiod 31 Det tidsintervall (i dagar) då data som frågor körs mot ofta ska vara tillgängliga i SSD-lagring eller RAM i stället för i långsiktig lagring.
    3. Välj Skapa för att skapa databasen. Det brukar ta mindre än en minut att skapa en databas.

Kommentar

Inmatning av data från en händelsehubb till Data Explorer-pooler fungerar inte om din Synapse-arbetsyta använder ett hanterat virtuellt nätverk med dataexfiltreringsskydd aktiverat.

Skapa en tabell i ditt testkluster

Skapa en tabell med namnet StormEvents som matchar schemat för data i filen StormEvents.csv.

Dricks

Följande kodfragment skapar en instans av en klient för nästan varje anrop. Detta görs för att göra varje kodfragment individuellt runnable. I produktion är klientinstanserna reentrant och bör behållas så länge som det behövs. Det räcker med en enskild klientinstans per URI, även när du arbetar med flera databaser (databasen kan anges på kommandonivå).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definiera mappning av inmatning

Mappa inkommande CSV-data till de kolumnnamn som används när du skapade tabellen. Etablera ett CSV-kolumnmappningsobjekt i tabellen.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Installera C# NuGet

Autentisering

Om du vill köra följande exempel behöver du ett Microsoft Entra-program och tjänstens huvudnamn som kan komma åt resurser. Information om hur du skapar ett kostnadsfritt Microsoft Entra-program och lägger till rolltilldelning på prenumerationsnivå finns i Skapa ett Microsoft Entra-program. Du behöver även katalog-ID(klient)-ID, program-ID och klienthemlighet.

Lägga till en Händelsehubb-dataanslutning

I följande exempel visas hur du lägger till en Händelsehubb-dataanslutning programmatiskt. Mer information om hur du lägger till en Händelsehubb-dataanslutning med hjälp av Azure-portalen finns i Ansluta till händelsehubben .

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Inställning Föreslaget värde Fältbeskrivning
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Ditt klientorganisations-ID. Kallas även katalog-ID.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Det prenumerations-ID som du använder för att skapa resurser.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx Klient-ID för programmet som kan komma åt resurser i din klientorganisation.
clientSecret xxxxxxxxxxxxxxxx Klienthemligheten för programmet som kan komma åt resurser i din klientorganisation.
resourceGroupName testrg Namnet på resursgruppen som innehåller klustret.
clusterName mykustocluster Namnet på klustret.
databaseName mykustodatabase Namnet på måldatabasen i klustret.
dataConnectionName myeventhubconnect Önskat namn på dataanslutningen.
tableName StormEvents Namnet på måltabellen i måldatabasen.
mappingRuleName StormEvents_CSV_Mapping Namnet på din kolumnmappning som är relaterad till måltabellen.
dataFormat csv Meddelandets dataformat.
eventHubResourceId Resurs-ID Resurs-ID för din händelsehubb som innehåller data för inmatning.
consumerGroup $Default Konsumentgruppen för din händelsehubb.
plats USA, centrala Platsen för dataanslutningsresursen.
komprimering Gzip eller Ingen Typen av datakomprimering.

Generera data

Se exempelappen som genererar data och skickar dem till en händelsehubb.

En händelse kan innehålla en eller flera poster, upp till dess storleksgräns. I följande exempel skickar vi två händelser, där var och en har fem poster tillagt:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Rensa resurser

Om du vill ta bort dataanslutningen använder du följande kommando:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Nästa steg