Valideren met behulp van een Avro-schema bij het streamen van gebeurtenissen met behulp van Event Hubs .NET SDK's (AMQP)
In deze quickstart leert u hoe u gebeurtenissen verzendt naar en ontvangt van een Event Hub met schemavalidatie met behulp van de Azure.Messaging.EventHubs .NET-bibliotheek.
Notitie
Azure Schema Registry is een functie van Event Hubs, die een centrale opslagplaats biedt voor schema's voor gebeurtenisgestuurde en berichtengerichte toepassingen. Het biedt de flexibiliteit voor uw producenten- en consumententoepassingen om gegevens uit te wisselen zonder dat u het schema hoeft te beheren en te delen. Het biedt ook een eenvoudig governanceframework voor herbruikbare schema's en definieert de relatie tussen schema's via een groeperingsconstructie (schemagroepen). Zie Azure Schema Registry in Event Hubs voor meer informatie.
Vereisten
Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.
Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:
- Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.
-
Microsoft Visual Studio 2022.
De Azure Event Hubs-clientbibliotheek maakt gebruik van nieuwe functies die in C# 8.0 zijn geïntroduceerd. U kunt de bibliotheek nog steeds gebruiken met eerdere C#-taalversies, maar de nieuwe syntaxis is niet beschikbaar. Als u gebruik wilt maken van de volledige syntaxis, wordt u aangeraden te compileren met de .NET Core SDK 3.0 of hoger en de taalversie in te stellen op
latest
. Als u Visual Studio gebruikt, weet dan dat de versies vóór Visual Studio 2019 niet compatibel met de hulpprogramma's die nodig zijn om C# 8.0 projecten te bouwen. Visual Studio 2019, met inbegrip van de gratis Community-editie, hier worden gedownload.
Een Event Hub maken
Volg de instructies in de quickstart: Een Event Hubs-naamruimte en een Event Hub maken om een Event Hubs-naamruimte en een Event Hub te maken. Volg vervolgens de instructies in De connection string ophalen om een connection string naar uw Event Hubs-naamruimte te krijgen.
Noteer de volgende instellingen die u in de huidige quickstart gaat gebruiken:
- Verbindingsreeks voor de Event Hubs-naamruimte
- Naam van de Event Hub
Een schema maken
Volg de instructies in Schemaregister maken om een schemagroep en een schema te maken.
Maak een schemagroep met de naam contoso-sg met behulp van de portal schemaregister. Gebruik Avro als het serialisatietype en Geen voor de compatibiliteitsmodus.
Maak in deze schemagroep een nieuw Avro-schema met schemanaam:
Microsoft.Azure.Data.SchemaRegistry.example.Order
met behulp van de volgende schema-inhoud.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Gebruiker toevoegen aan de rol SchemaRegisterlezer
Voeg uw gebruikersaccount toe aan de rol Schemaregisterlezer op het niveau van de naamruimte. U kunt ook de rol Inzender voor schemaregisters gebruiken, maar dat is niet nodig voor deze quickstart.
- Selecteer op de pagina Event Hubs-naamruimte de optie Toegangsbeheer (IAM) in het menu links.
- Selecteer op de pagina Toegangsbeheer (IAM)de optie + Toevoegen ->Roltoewijzing toevoegen in het menu.
- Selecteer op de pagina Toewijzingstypede optie Volgende.
- Selecteer op de pagina Rollende optie Schemaregisterlezer (preview) en selecteer vervolgens Volgende onderaan de pagina.
- Gebruik de koppeling + Leden selecteren om uw gebruikersaccount toe te voegen aan de rol en selecteer vervolgens Volgende.
- Selecteer op de pagina Beoordelen en toewijzende optie Beoordelen en toewijzen.
Gebeurtenissen produceren naar Event Hubs met schemavalidatie
Consoletoepassing maken voor gebeurtenisproducent
- Start Visual Studio 2019.
- Selecteer Een nieuw project maken.
- Voer in het dialoogvenster Een nieuw project maken de volgende stappen uit: Als dit dialoogvenster niet wordt weergegeven, selecteert u in het menu Bestand, Nieuw en vervolgens Project.
Selecteer de programmeertaal C# .
Selecteer Console als het type toepassing.
Selecteer Consoletoepassing in de lijst met resultaten.
Selecteer vervolgens Volgende.
- Voer OrderProducer in voor de projectnaam, SRQuickStart voor de naam van de oplossing en selecteer vervolgens OK om het project te maken.
Het Event Hubs NuGet-pakket toevoegen
Selecteer Tools>NuGet-pakketbeheer>Package Manager Console in het menu.
Voer de volgende opdrachten uit om Azure.Messaging.EventHubs en andere NuGet-pakketten te installeren. Druk op ENTER om de laatste opdracht uit te voeren.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Verifieer producententoepassingen om verbinding te maken met Azure via Visual Studio, zoals hier wordt weergegeven.
Meld u aan bij Azure met het gebruikersaccount dat lid is van de
Schema Registry Reader
rol op het niveau van de naamruimte. Zie Azure Schema Registry in Event Hubs voor informatie over schemaregisterrollen.
Code genereren met behulp van het Avro-schema
- Gebruik dezelfde inhoud die u hebt gebruikt om het schema te maken om een bestand met de naam
Order.avsc
te maken. Sla het bestand op in de project- of oplossingsmap. - Vervolgens kunt u dit schemabestand gebruiken om code voor .NET te genereren. U kunt elk hulpprogramma voor het genereren van externe code gebruiken, zoals avrogen voor het genereren van code. U kunt bijvoorbeeld uitvoeren
avrogen -s .\Order.avsc .
om code te genereren. - Zodra u code hebt gegenereerd, ziet u het bestand met de naam
Order.cs
in de\Microsoft\Azure\Data\SchemaRegistry\example
map. Voor het bovenstaande Avro-schema worden de C#-typen inMicrosoft.Azure.Data.SchemaRegistry.example
naamruimte gegenereerd. - Voeg het
Order.cs
bestand toe aan hetOrderProducer
project.
Code schrijven om gebeurtenissen te serialiseren en naar de Event Hub te verzenden
Voeg de volgende code toe aan het bestand
Program.cs
. Zie de opmerkingen bij de code voor meer informatie. Stappen op hoog niveau in de code zijn:- Maak een producerclient die u kunt gebruiken om gebeurtenissen naar een Event Hub te verzenden.
- Maak een schemaregisterclient die u kunt gebruiken om gegevens in een
Order
object te serialiseren en te valideren. - Maak een nieuw
Order
object met behulp van het gegenereerdeOrder
type. - Gebruik de schemaregisterclient om het
Order
object te serialiseren naarEventData
. - Maak een batch met gebeurtenissen.
- Voeg de gebeurtenisgegevens toe aan de gebeurtenisbatch.
- Gebruik de producerclient om de batch met gebeurtenissen naar de Event Hub te verzenden.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
Vervang de volgende waarden voor tijdelijke aanduidingen door de werkelijke waarden.
-
EVENTHUBSNAMESPACECONNECTIONSTRING
- connection string voor de Event Hubs-naamruimte -
EVENTHUBNAME
- naam van de Event Hub -
EVENTHUBSNAMESPACENAME
- naam van de Event Hubs-naamruimte -
SCHEMAGROUPNAME
- naam van de schemagroep
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
Bouw het project en controleer of er geen fouten zijn.
Voer het programma uit en wacht op het bevestigingsbericht.
A batch of 1 order has been published.
In de Azure Portal kunt u controleren of de Event Hub de gebeurtenissen heeft ontvangen. Schakel over naar de weergave Berichten in de sectie Metrische gegevens. U moet de pagina vernieuwen om de grafiek bij te werken. Het kan een paar seconden duren voordat wordt weergegeven dat de berichten zijn ontvangen.
Gebeurtenissen van Event Hubs gebruiken met schemavalidatie
In deze sectie wordt beschreven hoe u een .NET Core-consoletoepassing schrijft die gebeurtenissen van een Event Hub ontvangt en hoe u het schemaregister gebruikt om gebeurtenisgegevens te deserialiseren.
Aanvullende vereisten
- Maak het opslagaccount dat moet worden gebruikt voor de gebeurtenisverwerker.
Een consumententoepassing maken
- Klik in het venster Solution Explorer met de rechtermuisknop op de oplossing SRQuickStart, wijs Toevoegen aan en selecteer Nieuw project.
- Selecteer Consoletoepassing en selecteer Volgende.
- Voer OrderConsumer in als projectnaam en selecteer Maken.
- Klik in het venster Solution Explorer met de rechtermuisknop op OrderConsumer en selecteer Instellen als opstartproject.
Het Event Hubs NuGet-pakket toevoegen
Selecteer Tools>NuGet-pakketbeheer>Package Manager Console in het menu.
Controleer in het venster Package Manager Console of OrderConsumer is geselecteerd voor het standaardproject. Zo niet, gebruik dan de vervolgkeuzelijst om OrderConsumer te selecteren.
Voer de volgende opdracht uit om de vereiste NuGet-pakketten te installeren. Druk op Enter om de laatste opdracht uit te voeren.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Verifieer producententoepassingen om verbinding te maken met Azure via Visual Studio, zoals hier wordt weergegeven.
Meld u aan bij Azure met het gebruikersaccount dat lid is van de
Schema Registry Reader
rol op naamruimteniveau. Zie Azure Schema Registry in Event Hubs voor meer informatie over schemaregisterrollen.Voeg het
Order.cs
bestand dat u hebt gegenereerd als onderdeel van het maken van de producer-app toe aan het project OrderConsumer .Klik met de rechtermuisknop op OrderConsumer-project en selecteer Instellen als opstartproject.
Code schrijven om gebeurtenissen te ontvangen en deze te deserialiseren met behulp van schemaregister
Voeg de volgende code toe aan het bestand
Program.cs
. Zie de opmerkingen bij de code voor meer informatie. Stappen op hoog niveau in de code zijn:- Maak een consumentenclient die u kunt gebruiken om gebeurtenissen naar een Event Hub te verzenden.
- Maak een blobcontainerclient voor de blobcontainer in de Azure Blob Storage.
- Maak een gebeurtenisprocessorclient en registreer gebeurtenis- en fout-handlers.
- Maak in de gebeurtenis-handler een schemaregisterclient die u kunt gebruiken om gebeurtenisgegevens in een
Order
object te deserialiseren. - Deserialiseer de gebeurtenisgegevens in een
Order
object met behulp van de serializer. - Druk de informatie over de ontvangen bestelling af.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be userd as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Vervang de volgende waarden voor tijdelijke aanduidingen door de werkelijke waarden.
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- connection string voor de Event Hubs-naamruimte -
EVENTHUBNAME
- naam van de Event Hub -
EVENTHUBSNAMESPACENAME
- naam van de Event Hubs-naamruimte -
SCHEMAGROUPNAME
- naam van de schemagroep -
AZURESTORAGECONNECTIONSTRING
- connection string voor het Azure-opslagaccount -
BLOBCONTAINERNAME
- Naam van de blobcontainer
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
Bouw het project en controleer of er geen fouten zijn.
Voer de ontvangende toepassing uit.
U ziet nu een bericht dat de gebeurtenissen zijn ontvangen.
Received order with ID: 1234, amount: 45.29, description: First sample order.
Deze gebeurtenissen zijn de drie gebeurtenissen die u eerder naar de Event Hub hebt verzonden door het verzendprogramma uit te voeren.
Voorbeelden
Zie het Leesmij-artikel in onze GitHub-opslagplaats.
Resources opschonen
Verwijder de Event Hubs-naamruimte of verwijder de resourcegroep die de naamruimte bevat.