Sdílet prostřednictvím


Vytvoření datového připojení centra událostí pro Azure Synapse Data Explorer pomocí jazyka C# (Preview)

Azure Synapse Data Explorer je rychlá a vysoce škálovatelná služba pro zkoumání dat protokolů a telemetrie. Azure Synapse Data Explorer nabízí příjem dat (načítání dat) ze služby Event Hubs, IoT Hubs a objektů blob zapsaných do kontejnerů objektů blob.

V tomto článku vytvoříte datové připojení centra událostí pro Azure Synapse Data Explorer pomocí jazyka C#.

Požadavky

  • Předplatné Azure. Vytvořte bezplatný účet Azure.

  • Vytvoření fondu Průzkumníka dat pomocí nástroje Synapse Studio nebo webu Azure Portal

  • Vytvořte databázi Průzkumníka dat.

    1. V nástroji Synapse Studio v levém podokně vyberte Data.

    2. Vyberte + fond Průzkumníka dat (přidat nový zdroj) >a použijte následující informace:

      Nastavení Navrhovaná hodnota Popis
      Název fondu contosodataexplorer Název fondu Průzkumníka dat, který se má použít
      Název TestDatabase Název databáze musí být v rámci clusteru jedinečný.
      Výchozí doba uchovávání 365 Časové období (ve dnech), pro které je zaručeno, že jsou data k dispozici pro dotazování. Časový rozsah se začíná měřit od okamžiku, kdy jsou data ingestována.
      Výchozí období mezipaměti 31 Časové období (ve dnech), pro které se mají uchovávat často dotazovaná data dostupná v úložišti SSD nebo paměti RAM, a ne v dlouhodobějším úložišti.
    3. Výběrem možnosti Vytvořit vytvořte databázi. Vytvoření obvykle trvá méně než minutu.

Poznámka:

Ingestování dat z centra událostí do fondů Průzkumníka dat nebude fungovat, pokud váš pracovní prostor Synapse používá spravovanou virtuální síť s povolenou ochranou před exfiltrací dat.

Vytvoření tabulky v testovacím clusteru

Vytvořte tabulku s názvem StormEvents , která odpovídá schématu StormEvents.csv dat v souboru.

Tip

Následující fragmenty kódu vytvoří instanci klienta pro téměř každé volání. To se provádí, aby každý fragment kódu byl jednotlivě spustitelný. V produkčním prostředí se instance klienta znovu zadají a měly by se uchovávat tak dlouho, jak je to potřeba. Jedna instance klienta na identifikátor URI stačí, i když pracujete s více databázemi (databázi je možné zadat na úrovni příkazu).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Definování mapování ingestace

Namapujte příchozí data CSV na názvy sloupců použitých při vytváření tabulky. Zřiďte objekt mapování sloupců CSV v této tabulce.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Instalace balíčku NuGet v jazyce C#

Ověřování

Ke spuštění následujícího příkladu potřebujete aplikaci Microsoft Entra a instanční objekt, který má přístup k prostředkům. Pokud chcete vytvořit bezplatnou aplikaci Microsoft Entra a přidat přiřazení role na úrovni předplatného, přečtěte si téma Vytvoření aplikace Microsoft Entra. Potřebujete také ID adresáře (tenanta), ID aplikace a tajný klíč klienta.

Přidání datového připojení centra událostí

Následující příklad ukazuje, jak přidat datové připojení centra událostí programově. Informace o přidání datového připojení centra událostí pomocí webu Azure Portal najdete v tématu připojení k centru událostí.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Nastavení Navrhovaná hodnota Popis pole
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID vašeho tenanta Označuje se také jako ID adresáře.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID předplatného, které používáte k vytvoření prostředku.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx ID klienta aplikace, která má přístup k prostředkům ve vašem tenantovi.
clientSecret xxxxxxxxxxxxxxxx Tajný klíč klienta aplikace, která má přístup k prostředkům ve vašem tenantovi.
resourceGroupName testrg Název skupiny prostředků obsahující váš cluster.
clusterName mykustocluster Název clusteru.
databaseName mykustodatabase Název cílové databáze v clusteru
dataConnectionName myeventhubconnect Požadovaný název datového připojení.
tableName StormEvents Název cílové tabulky v cílové databázi.
mappingRuleName StormEvents_CSV_Mapping Název mapování sloupců související s cílovou tabulkou.
dataFormat csv Formát dat zprávy.
eventHubResourceId ID prostředku ID prostředku vašeho centra událostí, které obsahuje data pro příjem dat.
consumerGroup $Default Skupina příjemců vašeho centra událostí.
location USA – střed Umístění prostředku datového připojení
komprese Gzip nebo None Typ komprese dat.

Generovat data

Podívejte se na ukázkovou aplikaci , která generuje data a odesílá je do centra událostí.

Událost může obsahovat jeden nebo více záznamů až do limitu velikosti. V následující ukázce odesíláme dvě události, každý má připojených pět záznamů:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Vyčištění prostředků

Pokud chcete datové připojení odstranit, použijte následující příkaz:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Další kroky