Verwenden von Event Hubs und .NET zum Senden und Empfangen von Atlas Kafka-Themennachrichten

In dieser Schnellstartanleitung erfahren Sie, wie Sie Atlas Kafka-Themenereignisse senden und empfangen. Wir verwenden Azure Event Hubs und die .NET-Bibliothek Azure.Messaging.EventHubs.

Voraussetzungen

Wenn Sie noch nicht mit Event Hubs arbeiten, lesen Sie die Übersicht über Event Hubs , bevor Sie diese Schnellstartanleitung durcharbeiten.

Für diese Schnellstartanleitung sind bestimmte Voraussetzungen erforderlich:

  • Ein Microsoft Azure-Abonnement. Zur Verwendung von Azure-Diensten, einschließlich Event Hubs, benötigen Sie ein Azure-Abonnement. Wenn Sie nicht über ein Azure-Konto verfügen, können Sie sich für eine kostenlose Testversion registrieren oder Ihre MSDN-Abonnentenvorteile nutzen, wenn Sie ein Konto erstellen.
  • Microsoft Visual Studio 2022. Die Event Hubs-Clientbibliothek nutzt neue Features, die in C# 8.0 eingeführt wurden. Sie können die Bibliothek weiterhin mit früheren C#-Versionen verwenden, aber die neue Syntax ist nicht verfügbar. Um die vollständige Syntax zu verwenden, wird empfohlen, dass Sie mit dem .NET Core SDK 3.0 oder höher kompilieren und die Sprachversion auf festgelegt ist latest. Wenn Sie eine Visual Studio-Version vor Visual Studio 2019 verwenden, verfügt es nicht über die Tools, die zum Erstellen von C# 8.0-Projekten erforderlich sind. Visual Studio 2022, einschließlich der kostenlosen Community Edition, kann hier heruntergeladen werden.
  • Ein aktives Microsoft Purview-Konto.
  • Ein Event Hubs, der mit Ihrem Microsoft Purview-Konto konfiguriert ist, um Nachrichten zu senden und zu empfangen:
    • Ihr Konto ist möglicherweise bereits konfiguriert. Sie können Ihr Microsoft Purview-Konto im Azure-Portal unter Einstellungen, Kafka-Konfiguration überprüfen. Wenn sie noch nicht konfiguriert ist, befolgen Sie diese Anleitung.

Veröffentlichen von Nachrichten in Microsoft Purview

Im Folgenden erstellen wir eine .NET Core-Konsolenanwendung, die Ereignisse über das Event Hubs Kafka-Thema ATLAS_HOOK an Microsoft Purview sendet.

Um Nachrichten in Microsoft Purview zu veröffentlichen, benötigen Sie entweder einen verwalteten Event Hubs oder mindestens einen Event Hub mit einer Hookkonfiguration.

Erstellen eines Visual Studio-Projekts

Erstellen Sie als Nächstes eine C#-.NET-Konsolenanwendung in Visual Studio:

  1. Starten Sie Visual Studio.
  2. Wählen Sie im Fenster Start die Option Neues Projekt>erstellen Konsolen-App (.NET Framework) aus. .NET-Version 4.5.2 oder höher ist erforderlich.
  3. Geben Sie unter Projektnameden Namen PurviewKafkaProducer ein.
  4. Wählen Sie Erstellen aus, um das Projekt zu erstellen.

Erstellen einer Konsolenanwendung

  1. Starten Sie Visual Studio 2022.
  2. Wählen Sie Neues Projekt erstellen aus.
  3. Führen Sie im Dialogfeld Neues Projekt erstellen die folgenden Schritte aus: Wenn dieses Dialogfeld nicht angezeigt wird, wählen Sie Datei im Menü aus, wählen Sie Neu und dann Projekt aus.
    1. Wählen Sie C# als Programmiersprache aus.
    2. Wählen Sie Konsole für den Typ der Anwendung aus.
    3. Wählen Sie konsolen-App (.NET Core) aus der Ergebnisliste aus.
    4. Klicken Sie dann auf Weiter.

Hinzufügen des Event Hubs-NuGet-Pakets

  1. Wählen Sie im Menü Extras>NuGet-Paket-Manager-Paket-Manager-Konsole> aus.

  2. Führen Sie den folgenden Befehl aus, um das NuGet-Paket Azure.Messaging.EventHubs und das NuGet-Paket Azure.Messaging.EventHubs.Producer zu installieren:

    Install-Package Azure.Messaging.EventHubs
    
    Install-Package Azure.Messaging.EventHubs.Producer
    

Schreiben von Code, der Nachrichten an den Event Hub sendet

  1. Fügen Sie die folgenden using -Anweisungen am Anfang der Datei Program.cs hinzu:

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
  2. Fügen Sie der Program -Klasse Konstanten für die Event Hubs-Verbindungszeichenfolge und den Event Hubs-Namen hinzu.

    private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    private const string eventHubName = "<EVENT HUB NAME>";
    
  3. Ersetzen Sie die Main -Methode durch die folgende async Main Methode, und fügen Sie ein async ProduceMessage hinzu, um Nachrichten in Microsoft Purview zu pushen. Weitere Informationen finden Sie in den Kommentaren im Code.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
     		/ Create an event producer client to add events in the event hub
            EventHubProducerClient producer = new EventHubProducerClient(ehubNamespaceConnectionString, eventHubName);
    
     		await ProduceMessage(producer);
        }
    
     	static async Task ProduceMessage(EventHubProducerClient producer)
    
        {
     		// Create a batch of events 
     		using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
     		// Add events to the batch. An event is a represented by a collection of bytes and metadata. 
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<First event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Second event>")));
     		eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("<Third event>")));
    
     		// Use the producer client to send the batch of events to the event hub
     		await producerClient.SendAsync(eventBatch);
     		Console.WriteLine("A batch of 3 events has been published.");
    
     	}
    
  4. Erstellen Sie das Projekt. Vergewissern Sie sich, dass keine Fehler vorliegen.

  5. Führen Sie das Programm aus, und warten Sie auf die Bestätigungsmeldung.

    Hinweis

    Den vollständigen Quellcode mit weiteren Informationskommentaren finden Sie in dieser Datei auf GitHub.

Beispielcode, der eine SQL-Tabelle mit zwei Spalten mithilfe einer JSON-Nachricht zum Erstellen einer Entität erstellt

	
	{
    "msgCreatedBy":"nayenama",
    "message":{
        "type":"ENTITY_CREATE_V2",
        "user":"admin",
        "entities":{
            "entities":[
                {
                    "typeName":"azure_sql_table",
                    "attributes":{
                        "owner":"admin",
                        "temporary":false,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
                        "name":"SalesOrderTable",
                        "description":"Sales Order Table added via Kafka"
                    },
                    "relationshipAttributes":{
                        "columns":[
                            {
                                "guid":"-1102395743156037",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID"
                                }
                            },
                            {
                                "guid":"-1102395743156038",
                                "typeName":"azure_sql_column",
                                "uniqueAttributes":{
                                    "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate"
                                }
                            }
                        ]
                    },
                    "guid":"-1102395743156036",
                    "version":0
                }
            ],
            "referredEntities":{
                "-1102395743156037":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderID",
                        "precision":23,
                        "length":8,
                        "description":"Sales Order ID",
                        "scale":3,
                        "name":"OrderID",
                        "data_type":"int"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156037",
                    "version":2
                },
                "-1102395743156038":{
                    "typeName":"azure_sql_column",
                    "attributes":{
                        "owner":null,
                        "userTypeId":61,
                        "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable#OrderDate",
                        "description":"Sales Order Date",
                        "scale":3,
                        "name":"OrderDate",
                        "data_type":"datetime"
                    },
                    "relationshipAttributes":{
                        "table":{
                            "guid":"-1102395743156036",
                            "typeName":"azure_sql_table",
                            "entityStatus":"ACTIVE",
                            "displayText":"SalesOrderTable",
                            "uniqueAttributes":{
                                "qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable"
                            }
                        }
                    },
                    "guid":"-1102395743156038",
                    "status":"ACTIVE",
                    "createdBy":"ServiceAdmin",
                    "version":0
                }
            }
        }
    },
    "version":{
        "version":"1.0.0"
    },
    "msgCompressionKind":"NONE",
    "msgSplitIdx":1,
    "msgSplitCount":1
}


Empfangen von Microsoft Purview-Nachrichten

Als Nächstes erfahren Sie, wie Sie eine .NET Core-Konsolenanwendung schreiben, die Nachrichten von Event Hubs mithilfe eines Ereignisprozessors empfängt. Der Ereignisprozessor verwaltet persistente Prüfpunkte und parallele Empfange von Event Hubs. Dies vereinfacht den Prozess des Empfangens von Ereignissen. Sie müssen den ATLAS_ENTITIES Event Hub verwenden, um Nachrichten von Microsoft Purview zu empfangen.

Zum Empfangen von Nachrichten von Microsoft Purview benötigen Sie entweder eine verwaltete Event Hubs-Instanz oder eine Event Hubs-Benachrichtigungskonfiguration.

Warnung

Das Event Hubs SDK verwendet die neueste verfügbare Version der Speicher-API. Diese Version ist möglicherweise nicht unbedingt auf Ihrer Stack Hub-Plattform verfügbar. Wenn Sie diesen Code in Azure Stack Hub ausführen, treten Laufzeitfehler auf, es sei denn, Sie verwenden die spezifische Version, die Sie verwenden. Wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden, überprüfen Sie die unterstützte Azure Storage-API-Version für Ihren Azure Stack Hub-Build, und legen Sie diese Version in Ihrem Code als Ziel fest.

Die höchste verfügbare Version des Speicherdiensts ist Version 2019-02-02. Standardmäßig verwendet die Event Hubs SDK-Clientbibliothek die höchste verfügbare Version in Azure (2019-07-07 zum Zeitpunkt der Veröffentlichung des SDK). Wenn Sie Azure Stack Hub Version 2005 verwenden, müssen Sie zusätzlich zu den Schritten in diesem Abschnitt auch Code hinzufügen, der auf die Api-Version 2019-02-02 des Speicherdiensts ausgerichtet ist. Informationen zum Festlegen einer bestimmten Storage-API-Version finden Sie in diesem Beispiel auf GitHub.

Erstellen eines Azure Storage-Containers und eines Blobcontainers

Wir verwenden Azure Storage als Prüfpunktspeicher. Führen Sie die folgenden Schritte aus, um ein Azure Storage-Konto zu erstellen.

  1. Erstellen eines Azure-Speicherkontos

  2. Erstellen eines Blobcontainers

  3. Abrufen der Verbindungszeichenfolge für das Speicherkonto

    Notieren Sie sich die Verbindungszeichenfolge und den Containernamen. Sie verwenden sie im Empfangscode.

Erstellen eines Visual Studio-Projekts für den Empfänger

  1. Wählen Sie im Projektmappen-Explorer Fenster die Projektmappe EventHubQuickStart aus, und halten Sie sie gedrückt (oder klicken Sie mit der rechten Maustaste darauf), zeigen Sie auf Hinzufügen, und wählen Sie Neues Projekt aus.
  2. Wählen Sie Konsolen-App (.NET Core) und dann Weiter aus.
  3. Geben Sie PurviewKafkaConsumer als Projektnamen ein, und wählen Sie Erstellen aus.

Hinzufügen des Event Hubs-NuGet-Pakets

  1. Wählen Sie im Menü Extras>NuGet-Paket-Manager-Paket-Manager-Konsole> aus.

  2. Führen Sie den folgenden Befehl aus, um das NuGet-Paket Azure.Messaging.EventHubs zu installieren:

    Install-Package Azure.Messaging.EventHubs
    
  3. Führen Sie den folgenden Befehl aus, um das NuGet-Paket Azure.Messaging.EventHubs.Processor zu installieren:

    Install-Package Azure.Messaging.EventHubs.Processor
    

Aktualisieren der Main-Methode

  1. Fügen Sie die folgenden using Anweisungen am Anfang der Datei Program.cs hinzu.

    using System;
    using System.Text;
    using System.Threading.Tasks;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Fügen Sie der Program -Klasse Konstanten für die Event Hubs-Verbindungszeichenfolge und den Event Hub-Namen hinzu. Ersetzen Sie Platzhalter in Klammern durch die tatsächlichen Werte, die Sie beim Erstellen des Event Hubs und des Speicherkontos (Zugriffsschlüssel – primäre Verbindungszeichenfolge) erhalten haben. Stellen Sie sicher, dass die {Event Hubs namespace connection string} Verbindungszeichenfolge auf Namespaceebene und nicht die Event Hub-Zeichenfolge ist.

        private const string ehubNamespaceConnectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
        private const string eventHubName = "<EVENT HUB NAME>";
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    

    Verwenden Sie ATLAS_ENTITIES als Event Hub-Name, wenn Sie Nachrichten an Microsoft Purview senden.

  3. Ersetzen Sie die Main -Methode durch die folgende async Main Methode. Weitere Informationen finden Sie in den Kommentaren im Code.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 10 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(10));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }    
    
  4. Fügen Sie nun die folgenden Ereignis- und Fehlerhandlermethoden zur -Klasse hinzu.

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Write the body of the event to the console window
            Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }    
    
  5. Erstellen Sie das Projekt. Vergewissern Sie sich, dass keine Fehler vorliegen.

    Hinweis

    Den vollständigen Quellcode mit weiteren Informationskommentaren finden Sie in dieser Datei auf GitHub.

  6. Führen Sie die Empfängeranwendung aus.

Beispiel für eine Nachricht, die von Microsoft Purview empfangen wurde

{
	"version":
		{"version":"1.0.0",
		 "versionParts":[1]
		},
		 "msgCompressionKind":"NONE",
		 "msgSplitIdx":1,
		 "msgSplitCount":1,
		 "msgSourceIP":"10.244.155.5",
		 "msgCreatedBy":
		 "",
		 "msgCreationTime":1618588940869,
		 "message":{
			"type":"ENTITY_NOTIFICATION_V2",
			"entity":{
				"typeName":"azure_sql_table",
					"attributes":{
						"owner":"admin",
						"createTime":0,
						"qualifiedName":"mssql://nayenamakafka.eventhub.sql.net/salespool/dbo/SalesOrderTable",
						"name":"SalesOrderTable",
						"description":"Sales Order Table"
						},
						"guid":"ead5abc7-00a4-4d81-8432-d5f6f6f60000",
						"status":"ACTIVE",
						"displayText":"SalesOrderTable"
					},
					"operationType":"ENTITY_UPDATE",
					"eventTime":1618588940567
				}
}

Nächste Schritte

Sehen Sie sich weitere Beispiele auf GitHub an.