Event Hub-adatkapcsolat létrehozása az Azure Synapse Data Explorerhez c# (előzetes verzió) használatával
Az Azure Synapse Data Explorer egy gyors és nagy mértékben méretezhető adatfeltárási szolgáltatás napló- és telemetriai adatokhoz. Az Azure Synapse Data Explorer az Event Hubsból, az IoT Hubsból és a blobtárolókba írt blobokból kínál betöltést (adatbetöltést).
Ebben a cikkben egy Event Hub-adatkapcsolatot hoz létre az Azure Synapse Data Explorerhez a C# használatával.
Előfeltételek
Azure-előfizetés. Hozzon létre egy ingyenes Azure-fiókot.
Data Explorer-készlet létrehozása a Synapse Studióval vagy az Azure Portallal
Data Explorer-adatbázis létrehozása.
A Synapse Studióban a bal oldali panelen válassza az Adatok lehetőséget.
Válassza az + (Új erőforrás hozzáadása) >Adatkezelő-készletet, és használja a következő információkat:
Beállítás Ajánlott érték Leírás Készlet neve contosodataexplorer A használni kívánt Adatkezelő-készlet neve Név TestDatabase Az adatbázis nevének egyedinek kell lennie a fürtön belül. Alapértelmezett megőrzési idő 365 Az az időtartam (napokban), amelyre vonatkozóan garantáltan elérhetőek maradnak az adatok a lekérdezéshez. Az időtartam az adatok betöltésének időpontjával kezdődik. Alapértelmezett gyorsítótár-időszak 31 Az az időtartam (napokban), amely miatt a gyakran lekérdezett adatok nem hosszabb távú, hanem SSD-tárolóban vagy RAM-ban érhetők el. Az adatbázis létrehozásához válassza a Létrehozás lehetőséget . Az adatbázis létrehozása általában kevesebb mint egy percet vesz igénybe.
- Event Hub a betöltéshez szükséges adatokkal.
Feljegyzés
Ha a Synapse-munkaterület felügyelt virtuális hálózatot használ, és engedélyezve van az adatkiszivárgás elleni védelem, nem fog működni az adatok Eseményközpontból az Adatkezelő-készletekbe való betöltése.
- Visual Studio 2019, töltse le és használja az ingyenes Visual Studio 2019 Community Editiont. Engedélyezze az Azure-fejlesztést a Visual Studio beállítása során.
Tábla létrehozása a tesztfürtön
Hozzon létre egy táblát StormEvents
, amely megfelel a fájlban lévő adatok sémájának StormEvents.csv
.
Tipp.
Az alábbi kódrészletek szinte minden híváshoz létrehoznak egy ügyfélpéldányt. Ez azért történik, hogy az egyes kódrészletek egyenként futtathatók legyenek. Éles környezetben az ügyfélpéldányok újraküldésre kerülnek, és a szükséges ideig meg kell tartani őket. URI-nként egyetlen ügyfélpéldány elegendő, még akkor is, ha több adatbázissal dolgozik (az adatbázis parancsszinten is megadható).
var databaseName = "<DatabaseName>";
var table = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableCreateCommand(
table,
new[]
{
Tuple.Create("StartTime", "System.DateTime"),
Tuple.Create("EndTime", "System.DateTime"),
Tuple.Create("EpisodeId", "System.Int32"),
Tuple.Create("EventId", "System.Int32"),
Tuple.Create("State", "System.String"),
Tuple.Create("EventType", "System.String"),
Tuple.Create("InjuriesDirect", "System.Int32"),
Tuple.Create("InjuriesIndirect", "System.Int32"),
Tuple.Create("DeathsDirect", "System.Int32"),
Tuple.Create("DeathsIndirect", "System.Int32"),
Tuple.Create("DamageProperty", "System.Int32"),
Tuple.Create("DamageCrops", "System.Int32"),
Tuple.Create("Source", "System.String"),
Tuple.Create("BeginLocation", "System.String"),
Tuple.Create("EndLocation", "System.String"),
Tuple.Create("BeginLat", "System.Double"),
Tuple.Create("BeginLon", "System.Double"),
Tuple.Create("EndLat", "System.Double"),
Tuple.Create("EndLon", "System.Double"),
Tuple.Create("EpisodeNarrative", "System.String"),
Tuple.Create("EventNarrative", "System.String"),
Tuple.Create("StormSummary", "System.Object"),
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
Adatbetöltési leképezés meghatározása
A bejövő CSV-adatok leképezése a tábla létrehozásakor használt oszlopnevekre. CSV-oszlopleképezési objektum kiépítése ezen a táblán.
var tableMapping = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
var command =
CslCommandGenerator.GenerateTableMappingCreateCommand(
Data.Ingestion.IngestionMappingKind.Csv,
table,
tableMapping,
new[] {
new ColumnMapping() { ColumnName = "StartTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "0" } } },
new ColumnMapping() { ColumnName = "EndTime", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "1" } } },
new ColumnMapping() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "2" } } },
new ColumnMapping() { ColumnName = "EventId", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "3" } } },
new ColumnMapping() { ColumnName = "State", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "4" } } },
new ColumnMapping() { ColumnName = "EventType", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "5" } } },
new ColumnMapping() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "6" } } },
new ColumnMapping() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "7" } } },
new ColumnMapping() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "8" } } },
new ColumnMapping() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "9" } } },
new ColumnMapping() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "10" } } },
new ColumnMapping() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "11" } } },
new ColumnMapping() { ColumnName = "Source", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "12" } } },
new ColumnMapping() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "13" } } },
new ColumnMapping() { ColumnName = "EndLocation", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "14" } } },
new ColumnMapping() { ColumnName = "BeginLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "15" } } },
new ColumnMapping() { ColumnName = "BeginLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "16" } } },
new ColumnMapping() { ColumnName = "EndLat", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "17" } } },
new ColumnMapping() { ColumnName = "EndLon", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "18" } } },
new ColumnMapping() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "19" } } },
new ColumnMapping() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "20" } } },
new ColumnMapping() { ColumnName = "StormSummary", Properties = new Dictionary<string, string>() { { MappingConsts.Ordinal, "21" } } }
});
kustoClient.ExecuteControlCommand(databaseName, command);
}
A C# NuGet telepítése
- Telepítse a Microsoft.Azure.Management.Kusto NuGet csomagot.
Hitelesítés
Az alábbi példa futtatásához egy Olyan Microsoft Entra-alkalmazásra és szolgáltatásnévre van szüksége, amely hozzáfér az erőforrásokhoz. Ha ingyenes Microsoft Entra-alkalmazást szeretne létrehozni, és szerepkör-hozzárendelést szeretne hozzáadni az előfizetés szintjén, olvassa el a Microsoft Entra-alkalmazás létrehozása című témakört. Szüksége van a címtár (bérlő) azonosítójára, az alkalmazásazonosítóra és az ügyfél titkos kódjára is.
Event Hub-adatkapcsolat hozzáadása
Az alábbi példa bemutatja, hogyan vehet fel programozott módon egy Event Hub-adatkapcsolatot. Az Eseményközponthoz való csatlakozással kapcsolatos információkért tekintse meg az Event Hub-adatkapcsolat Azure Portalon való hozzáadását.
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
var credential = new ClientCredential(clientId, clientSecret);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var resourceGroupName = "testrg";
//The cluster and database that are created as part of the Prerequisites
var clusterName = "mykustocluster";
var databaseName = "mykustodatabase";
var dataConnectionName = "myeventhubconnect";
//The Event Hub that is created as part of the Prerequisites
var eventHubResourceId = "/subscriptions/xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx/resourceGroups/xxxxxx/providers/Microsoft.EventHub/namespaces/xxxxxx/eventhubs/xxxxxx";
var consumerGroup = "$Default";
var location = "Central US";
//The table and column mapping are created as part of the Prerequisites
var tableName = "StormEvents";
var mappingRuleName = "StormEvents_CSV_Mapping";
var dataFormat = DataFormat.CSV;
var compression = "None";
await kustoManagementClient.DataConnections.CreateOrUpdateAsync(resourceGroupName, clusterName, databaseName, dataConnectionName,
new EventHubDataConnection(eventHubResourceId, consumerGroup, location: location, tableName: tableName, mappingRuleName: mappingRuleName, dataFormat: dataFormat, compression: compression));
Beállítás | Ajánlott érték | Mező leírása |
---|---|---|
tenantId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | Az Ön bérlőazonosítója. Más néven könyvtárazonosító. |
subscriptionId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | Az erőforrás-létrehozáshoz használt előfizetés-azonosító. |
clientId | xxxxxxxx-xxxxx-xxxx-xxxx-xxxx-xxxxxxxxx | Annak az alkalmazásnak az ügyfélazonosítója, amely hozzáfér a bérlő erőforrásaihoz. |
clientSecret | xxxxxxxxxxxxxx | Az alkalmazás ügyfélkulcsa, amely hozzáfér a bérlő erőforrásaihoz. |
resourceGroupName | testrg | A fürtöt tartalmazó erőforráscsoport neve. |
clusterName | mykustocluster | A fürt neve. |
databaseName | mykustodatabase | A fürt céladatbázisának neve. |
dataConnectionName | myeventhubconnect | Az adatkapcsolat kívánt neve. |
tableName | StormEvents | A céltábla neve a céladatbázisban. |
mappingRuleName | StormEvents_CSV_Mapping | A céltáblához kapcsolódó oszlopleképezés neve. |
dataFormat | csv | Az üzenet adatformátuma. |
eventHubResourceId | Erőforrás-azonosító | Az eseményközpont erőforrás-azonosítója, amely a betöltéshez szükséges adatokat tárolja. |
consumerGroup | $Default | Az Event Hub fogyasztói csoportja. |
hely | USA középső régiója | Az adatkapcsolati erőforrás helye. |
tömörítés | Gzip vagy None | Az adattömörítés típusa. |
Adatok generálása
Tekintse meg az adatokat létrehozó és az eseményközpontba küldő mintaalkalmazást .
Egy esemény egy vagy több rekordot tartalmazhat, a méretkorlátig. A következő mintában két eseményt küldünk, mindegyikhez öt rekord van hozzáfűzve:
var events = new List<EventData>();
var data = string.Empty;
var recordsPerEvent = 5;
var rand = new Random();
var counter = 0;
for (var i = 0; i < 10; i++)
{
// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = rand.Next(-30, 50) };
var data += JsonConvert.SerializeObject(metric) + Environment.NewLine;
counter++;
// Create the event
if (counter == recordsPerEvent)
{
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
events.Add(eventData);
counter = 0;
data = string.Empty;
}
}
// Send events
eventHubClient.SendAsync(events).Wait();
Az erőforrások eltávolítása
Az adatkapcsolat törléséhez használja a következő parancsot:
kustoManagementClient.DataConnections.Delete(resourceGroupName, clusterName, databaseName, dataConnectionName);