Verwenden von Event Hubs und .NET zum Senden und Empfangen von Atlas Kafka-Themennachrichten
Hinweis
Der Microsoft Purview Data Catalog ändert seinen Namen in Microsoft Purview Unified Catalog. Alle Features bleiben unverändert. Die Namensänderung wird angezeigt, wenn die neue Microsoft Purview Data Governance-Benutzeroberfläche in Ihrer Region allgemein verfügbar ist. Überprüfen Sie den Namen in Ihrer Region.
In dieser Schnellstartanleitung erfahren Sie, wie Sie Atlas Kafka-Themenereignisse senden und empfangen. Wir verwenden Azure Event Hubs und die .NET-Bibliothek Azure.Messaging.EventHubs.
Wenn Sie noch nicht mit Event Hubs arbeiten, lesen Sie die Übersicht über Event Hubs , bevor Sie diese Schnellstartanleitung durcharbeiten.
Für diese Schnellstartanleitung sind bestimmte Voraussetzungen erforderlich:
- Ein Microsoft Azure-Abonnement. Zur Verwendung von Azure-Diensten, einschließlich Event Hubs, benötigen Sie ein Azure-Abonnement. Wenn Sie nicht über ein Azure-Konto verfügen, können Sie sich für eine kostenlose Testversion registrieren oder Ihre MSDN-Abonnentenvorteile nutzen, wenn Sie ein Konto erstellen.
-
Microsoft Visual Studio 2022. Die Event Hubs-Clientbibliothek nutzt neue Features, die in C# 8.0 eingeführt wurden. Sie können die Bibliothek weiterhin mit früheren C#-Versionen verwenden, aber die neue Syntax ist nicht verfügbar. Um die vollständige Syntax zu verwenden, wird empfohlen, dass Sie mit dem .NET Core SDK 3.0 oder höher kompilieren und die Sprachversion auf festgelegt ist
latest
. Wenn Sie eine Visual Studio-Version vor Visual Studio 2019 verwenden, verfügt es nicht über die Tools, die zum Erstellen von C# 8.0-Projekten erforderlich sind. Visual Studio 2022, einschließlich der kostenlosen Community Edition, kann hier heruntergeladen werden. - Ein aktives Microsoft Purview-Konto.
-
Ein Event Hubs, der mit Ihrem Microsoft Purview-Konto konfiguriert ist, um Nachrichten zu senden und zu empfangen:
- Ihr Konto ist möglicherweise bereits konfiguriert. Sie können Ihr Microsoft Purview-Konto im Azure-Portal unter Einstellungen, Kafka-Konfiguration überprüfen. Wenn sie noch nicht konfiguriert ist, befolgen Sie diese Anleitung.
Im Folgenden erstellen wir eine .NET Core-Konsolenanwendung, die Ereignisse über das Event Hubs Kafka-Thema ATLAS_HOOK an Microsoft Purview sendet.
Um Nachrichten in Microsoft Purview zu veröffentlichen, benötigen Sie entweder einen verwalteten Event Hubs oder mindestens einen Event Hub mit einer Hookkonfiguration.
Erstellen Sie als Nächstes eine C#-.NET-Konsolenanwendung in Visual Studio:
- Starten Sie Visual Studio.
- Wählen Sie im Fenster Start die Option Neues Projekt>erstellen Konsolen-App (.NET Framework) aus. .NET-Version 4.5.2 oder höher ist erforderlich.
- Geben Sie unter Projektnameden Namen PurviewKafkaProducer ein.
- Wählen Sie Erstellen aus, um das Projekt zu erstellen.
- Starten Sie Visual Studio 2022.
- Wählen Sie Neues Projekt erstellen aus.
- Führen Sie im Dialogfeld Neues Projekt erstellen die folgenden Schritte aus: Wenn dieses Dialogfeld nicht angezeigt wird, wählen Sie Datei im Menü aus, wählen Sie Neu und dann Projekt aus.
- Wählen Sie C# als Programmiersprache aus.
- Wählen Sie Konsole für den Typ der Anwendung aus.
- Wählen Sie konsolen-App (.NET Core) aus der Ergebnisliste aus.
- Klicken Sie dann auf Weiter.
Wählen Sie im Menü Extras>NuGet-Paket-Manager-Paket-Manager-Konsole> aus.
Führen Sie den folgenden Befehl aus, um das NuGet-Paket Azure.Messaging.EventHubs und das NuGet-Paket Azure.Messaging.EventHubs.Producer zu installieren:
Install-Package Azure.Messaging.EventHubs
Install-Package Azure.Messaging.EventHubs.Producer
Fügen Sie die folgenden
using
-Anweisungen am Anfang der Program.cs-Datei hinzu:using System; using System.Text; using System.Threading.Tasks; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer;
Fügen Sie der
Program
-Klasse Konstanten für den Event Hubs-Verbindungszeichenfolge- und Event Hubs-Namen hinzu.private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>";
Ersetzen Sie die
Main
-Methode durch die folgendeasync Main
Methode, und fügen Sie einasync ProduceMessage
hinzu, um Nachrichten in Microsoft Purview zu pushen. Weitere Informationen finden Sie in den Kommentaren im Code.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; / Create an event producer client to add events in the event hub EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName); await ProduceMessage(producer); } static async Task ProduceMessage(EventHubProducerClient producer) { // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add events to the batch. An event is a represented by a collection of bytes and metadata. eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>"))); eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>"))); // Use the producer client to send the batch of events to the event hub await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 3 events has been published."); }
Erstellen Sie das Projekt. Vergewissern Sie sich, dass keine Fehler vorliegen.
Führen Sie das Programm aus, und warten Sie auf die Bestätigungsmeldung.
Hinweis
Den vollständigen Quellcode mit weiteren Informationskommentaren finden Sie in dieser Datei auf GitHub.
Beispielcode, der eine SQL-Tabelle mit zwei Spalten mithilfe einer JSON-Nachricht zum Erstellen einer Entität erstellt
{
"msgCreatedBy":"nayenama",
"message":{
"type":"ENTITY_CREATE_V2",
"user":"admin",
"entities":{
"entities":[
{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"temporary":false,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table added via Kafka"
},
"relationshipAttributes":{
"columns":[
{
"guid":"-1102395743156037",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
}
},
{
"guid":"-1102395743156038",
"typeName":"azure_sql_column",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
}
}
]
},
"guid":"-1102395743156036",
"version":0
}
],
"referredEntities":{
"-1102395743156037":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
"precision":23,
"length":8,
"description":"Sales Order ID",
"scale":3,
"name":"OrderID",
"data_type":"int"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156037",
"version":2
},
"-1102395743156038":{
"typeName":"azure_sql_column",
"attributes":{
"owner":null,
"userTypeId":61,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
"description":"Sales Order Date",
"scale":3,
"name":"OrderDate",
"data_type":"datetime"
},
"relationshipAttributes":{
"table":{
"guid":"-1102395743156036",
"typeName":"azure_sql_table",
"entityStatus":"ACTIVE",
"displayText":"SalesOrderTable",
"uniqueAttributes":{
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
}
}
},
"guid":"-1102395743156038",
"status":"ACTIVE",
"createdBy":"ServiceAdmin",
"version":0
}
}
}
},
"version":{
"version":"1.0.0"
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1
}
Als Nächstes erfahren Sie, wie Sie eine .NET Core-Konsolenanwendung schreiben, die Nachrichten von Event Hubs mithilfe eines Ereignisprozessors empfängt. Der Ereignisprozessor verwaltet persistente Prüfpunkte und parallele Empfange von Event Hubs. Dies vereinfacht den Prozess des Empfangens von Ereignissen. Sie müssen den ATLAS_ENTITIES Event Hub verwenden, um Nachrichten von Microsoft Purview zu empfangen.
Zum Empfangen von Nachrichten von Microsoft Purview benötigen Sie entweder eine verwaltete Event Hubs-Instanz oder eine Event Hubs-Benachrichtigungskonfiguration.
Warnung
Das Event Hubs SDK verwendet die neueste verfügbare Version der Speicher-API. Diese Version ist möglicherweise nicht unbedingt auf Ihrer Stack Hub-Plattform verfügbar. Wenn Sie diesen Code in Azure Stack Hub ausführen, treten Laufzeitfehler auf, es sei denn, Sie verwenden die spezifische Version, die Sie verwenden. Wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden, überprüfen Sie die unterstützte Azure Storage-API-Version für Ihren Azure Stack Hub-Build, und legen Sie diese Version in Ihrem Code als Ziel fest.
Die höchste verfügbare Version des Speicherdiensts ist Version 2019-02-02. Standardmäßig verwendet die Event Hubs SDK-Clientbibliothek die höchste verfügbare Version in Azure (2019-07-07 zum Zeitpunkt der Veröffentlichung des SDK). Wenn Sie Azure Stack Hub Version 2005 verwenden, müssen Sie zusätzlich zu den Schritten in diesem Abschnitt auch Code hinzufügen, der auf die Api-Version 2019-02-02 des Speicherdiensts ausgerichtet ist. Informationen zum Festlegen einer bestimmten Storage-API-Version finden Sie in diesem Beispiel auf GitHub.
Wir verwenden Azure Storage als Prüfpunktspeicher. Führen Sie die folgenden Schritte aus, um ein Azure Storage-Konto zu erstellen.
Abrufen der Verbindungszeichenfolge für das Speicherkonto
Notieren Sie sich die Verbindungszeichenfolge und den Containernamen. Sie verwenden sie im Empfangscode.
- Wählen Sie im Projektmappen-Explorer Fenster die Projektmappe EventHubQuickStart aus, und halten Sie sie gedrückt (oder klicken Sie mit der rechten Maustaste darauf), zeigen Sie auf Hinzufügen, und wählen Sie Neues Projekt aus.
- Wählen Sie Konsolen-App (.NET Core) und dann Weiter aus.
- Geben Sie PurviewKafkaConsumer als Projektnamen ein, und wählen Sie Erstellen aus.
Wählen Sie im Menü Extras>NuGet-Paket-Manager-Paket-Manager-Konsole> aus.
Führen Sie den folgenden Befehl aus, um das NuGet-Paket Azure.Messaging.EventHubs zu installieren:
Install-Package Azure.Messaging.EventHubs
Führen Sie den folgenden Befehl aus, um das NuGet-Paket Azure.Messaging.EventHubs.Processor zu installieren:
Install-Package Azure.Messaging.EventHubs.Processor
Fügen Sie die folgenden
using
-Anweisungen am Anfang der Program.cs-Datei hinzu.using System; using System.Text; using System.Threading.Tasks; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor;
Fügen Sie der Klasse für die
Program
Event Hubs-Verbindungszeichenfolge und den Event Hub-Namen Konstanten hinzu. Ersetzen Sie Platzhalter in Klammern durch die tatsächlichen Werte, die Sie beim Erstellen des Event Hubs und des Speicherkontos (Zugriffsschlüssel – primäre Verbindungszeichenfolge) erhalten haben. Stellen Sie sicher, dass die{Event Hubs namespace connection string}
die Verbindungszeichenfolge auf Namespaceebene und nicht die Event Hub-Zeichenfolge ist.private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>"; private const string eventHubName = "<EVENT HUB NAME>"; private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>"; private const string blobContainerName = "<BLOB CONTAINER NAME>";
Verwenden Sie ATLAS_ENTITIES als Event Hub-Name, wenn Sie Nachrichten an Microsoft Purview senden.
Ersetzen Sie die
Main
-Methode durch die folgendeasync Main
Methode. Weitere Informationen finden Sie in den Kommentaren im Code.static async Task Main() { // Read from the default consumer group: $Default string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 10 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(10)); // Stop the processing await processor.StopProcessingAsync(); }
Fügen Sie nun die folgenden Ereignis- und Fehlerhandlermethoden zur -Klasse hinzu.
static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Write the body of the event to the console window Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray())); // Update checkpoint in the blob storage so that the app receives only new events the next time it's run await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Erstellen Sie das Projekt. Vergewissern Sie sich, dass keine Fehler vorliegen.
Hinweis
Den vollständigen Quellcode mit weiteren Informationskommentaren finden Sie in dieser Datei auf GitHub.
Führen Sie die Empfängeranwendung aus.
{
"version":
{"version":"1.0.0",
"versionParts":[1]
},
"msgCompressionKind":"NONE",
"msgSplitIdx":1,
"msgSplitCount":1,
"msgSourceIP":"10.244.155.5",
"msgCreatedBy":
"",
"msgCreationTime":1618588940869,
"message":{
"type":"ENTITY_NOTIFICATION_V2",
"entity":{
"typeName":"azure_sql_table",
"attributes":{
"owner":"admin",
"createTime":0,
"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
"name":"SalesOrderTable",
"description":"Sales Order Table"
},
"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
"status":"ACTIVE",
"displayText":"SalesOrderTable"
},
"operationType":"ENTITY_UPDATE",
"eventTime":1618588940567
}
}
Sehen Sie sich weitere Beispiele auf GitHub an.