Megosztás a következőn keresztül:


Event Hub-adatkapcsolat létrehozása az Azure Synapse Data Explorerhez c# (előzetes verzió) használatával

Az Azure Synapse Data Explorer egy gyors és nagy mértékben méretezhető adatfeltárási szolgáltatás napló- és telemetriai adatokhoz. Az Azure Synapse Data Explorer az Event Hubsból, az IoT Hubsból és a blobtárolókba írt blobokból kínál betöltést (adatbetöltést).

Ebben a cikkben egy Event Hub-adatkapcsolatot hoz létre az Azure Synapse Data Explorerhez a C# használatával.

Előfeltételek

  • Azure-előfizetés. Hozzon létre egy ingyenes Azure-fiókot.

  • Data Explorer-készlet létrehozása a Synapse Studióval vagy az Azure Portallal

  • Data Explorer-adatbázis létrehozása.

    1. A Synapse Studióban a bal oldali panelen válassza az Adatok lehetőséget.

    2. Válassza az + (Új erőforrás hozzáadása) >Adatkezelő-készletet, és használja a következő információkat:

      Beállítás Ajánlott érték Leírás
      Készlet neve contosodataexplorer A használni kívánt Adatkezelő-készlet neve
      Név TestDatabase Az adatbázis nevének egyedinek kell lennie a fürtön belül.
      Alapértelmezett megőrzési idő 365 Az az időtartam (napokban), amelyre vonatkozóan garantáltan elérhetőek maradnak az adatok a lekérdezéshez. Az időtartam az adatok betöltésének időpontjával kezdődik.
      Alapértelmezett gyorsítótár-időszak 31 Az az időtartam (napokban), amely miatt a gyakran lekérdezett adatok nem hosszabb távú, hanem SSD-tárolóban vagy RAM-ban érhetők el.
    3. Az adatbázis létrehozásához válassza a Létrehozás lehetőséget . Az adatbázis létrehozása általában kevesebb mint egy percet vesz igénybe.

Feljegyzés

Ha a Synapse-munkaterület felügyelt virtuális hálózatot használ, és engedélyezve van az adatkiszivárgás elleni védelem, nem fog működni az adatok Eseményközpontból az Adatkezelő-készletekbe való betöltése.

Tábla létrehozása a tesztfürtön

Hozzon létre egy táblát StormEvents , amely megfelel a fájlban lévő adatok sémájának StormEvents.csv .

Tipp.

Az alábbi kódrészletek szinte minden híváshoz létrehoznak egy ügyfélpéldányt. Ez azért történik, hogy az egyes kódrészletek egyenként futtathatók legyenek. Éles környezetben az ügyfélpéldányok újraküldésre kerülnek, és a szükséges ideig meg kell tartani őket. URI-nként egyetlen ügyfélpéldány elegendő, még akkor is, ha több adatbázissal dolgozik (az adatbázis parancsszinten is megadható).

var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableCreateCommand(
            table,
            new[]
            {
                Tuple.Create("StartTime", "System.DateTime"),
                Tuple.Create("EndTime", "System.DateTime"),
                Tuple.Create("EpisodeId", "System.Int32"),
                Tuple.Create("EventId", "System.Int32"),
                Tuple.Create("State", "System.String"),
                Tuple.Create("EventType", "System.String"),
                Tuple.Create("InjuriesDirect", "System.Int32"),
                Tuple.Create("InjuriesIndirect", "System.Int32"),
                Tuple.Create("DeathsDirect", "System.Int32"),
                Tuple.Create("DeathsIndirect", "System.Int32"),
                Tuple.Create("DamageProperty", "System.Int32"),
                Tuple.Create("DamageCrops", "System.Int32"),
                Tuple.Create("Source", "System.String"),
                Tuple.Create("BeginLocation", "System.String"),
                Tuple.Create("EndLocation", "System.String"),
                Tuple.Create("BeginLat", "System.Double"),
                Tuple.Create("BeginLon", "System.Double"),
                Tuple.Create("EndLat", "System.Double"),
                Tuple.Create("EndLon", "System.Double"),
                Tuple.Create("EpisodeNarrative", "System.String"),
                Tuple.Create("EventNarrative", "System.String"),
                Tuple.Create("StormSummary", "System.Object"),
            });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

Adatbetöltési leképezés meghatározása

A bejövő CSV-adatok leképezése a tábla létrehozásakor használt oszlopnevekre. CSV-oszlopleképezési objektum kiépítése ezen a táblán.

var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command =
        CslCommandGenerator.GenerateTableMappingCreateCommand(
            Data.Ingestion.IngestionMappingKind.Csv,
            table,
            tableMapping,
            new[] {
                new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
                new ColumnMapping() { ColumnName = "EndTime", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
                new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
                new ColumnMapping() { ColumnName = "EventId", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
                new ColumnMapping() { ColumnName = "State", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
                new ColumnMapping() { ColumnName = "EventType", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
                new ColumnMapping() { ColumnName = "InjuriesDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
                new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
                new ColumnMapping() { ColumnName = "DeathsDirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
                new ColumnMapping() { ColumnName = "DeathsIndirect", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
                new ColumnMapping() { ColumnName = "DamageProperty", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
                new ColumnMapping() { ColumnName = "DamageCrops", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
                new ColumnMapping() { ColumnName = "Source", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
                new ColumnMapping() { ColumnName = "BeginLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
                new ColumnMapping() { ColumnName = "EndLocation", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
                new ColumnMapping() { ColumnName = "BeginLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
                new ColumnMapping() { ColumnName = "BeginLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
                new ColumnMapping() { ColumnName = "EndLat", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
                new ColumnMapping() { ColumnName = "EndLon", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
                new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
                new ColumnMapping() { ColumnName = "EventNarrative", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
                new ColumnMapping() { ColumnName = "StormSummary", Properties =  new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
        });

    kustoClient.ExecuteControlCommand(databaseName, command);
}

A C# NuGet telepítése

Hitelesítés

Az alábbi példa futtatásához egy Olyan Microsoft Entra-alkalmazásra és szolgáltatásnévre van szüksége, amely hozzáfér az erőforrásokhoz. Ha ingyenes Microsoft Entra-alkalmazást szeretne létrehozni, és szerepkör-hozzárendelést szeretne hozzáadni az előfizetés szintjén, olvassa el a Microsoft Entra-alkalmazás létrehozása című témakört. Szüksége van a címtár (bérlő) azonosítójára, az alkalmazásazonosítóra és az ügyfél titkos kódjára is.

Event Hub-adatkapcsolat hozzáadása

Az alábbi példa bemutatja, hogyan vehet fel programozott módon egy Event Hub-adatkapcsolatot. Az Eseményközponthoz való csatlakozással kapcsolatos információkért tekintse meg az Event Hub-adatkapcsolat Azure Portalon való hozzáadását.

var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

var kustoManagementClient = new KustoManagementClient(credentials)
{
    SubscriptionId = subscriptionId
};

var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
    new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Beállítás Ajánlott érték Mező leírása
tenantId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx Az Ön bérlőazonosítója. Más néven könyvtárazonosító.
subscriptionId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx Az erőforrás-létrehozáshoz használt előfizetés-azonosító.
clientId xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx Annak az alkalmazásnak az ügyfélazonosítója, amely hozzáfér a bérlő erőforrásaihoz.
clientSecret xxxxxxxxxxxxxx Az alkalmazás ügyfélkulcsa, amely hozzáfér a bérlő erőforrásaihoz.
resourceGroupName testrg A fürtöt tartalmazó erőforráscsoport neve.
clusterName mykustocluster A fürt neve.
databaseName mykustodatabase A fürt céladatbázisának neve.
dataConnectionName myeventhubconnect Az adatkapcsolat kívánt neve.
tableName StormEvents A céltábla neve a céladatbázisban.
mappingRuleName StormEvents_CSV_Mapping A céltáblához kapcsolódó oszlopleképezés neve.
dataFormat csv Az üzenet adatformátuma.
eventHubResourceId Erőforrás-azonosító Az eseményközpont erőforrás-azonosítója, amely a betöltéshez szükséges adatokat tárolja.
consumerGroup $Default Az Event Hub fogyasztói csoportja.
hely USA középső régiója Az adatkapcsolati erőforrás helye.
tömörítés Gzip vagy None Az adattömörítés típusa.

Adatok generálása

Tekintse meg az adatokat létrehozó és az eseményközpontba küldő mintaalkalmazást .

Egy esemény egy vagy több rekordot tartalmazhat, a méretkorlátig. A következő mintában két eseményt küldünk, mindegyikhez öt rekord van hozzáfűzve:

var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;

for (var i = 0; i < 10; i++)
{
    // Create the data
    var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
    var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
    counter++;

    // Create the event
    if (counter == recordsPerEvent)
    {
        var eventData = new EventData(Encoding.UTF8.GetBytes(data));
        events.Add(eventData);

        counter = 0;
        data = string.Empty;
    }
}

// Send events
eventHubClient.SendAsync(events).Wait();

Az erőforrások eltávolítása

Az adatkapcsolat törléséhez használja a következő parancsot:

kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);

Következő lépések