Valideren met behulp van een Avro-schema bij het streamen van gebeurtenissen met behulp van Event Hubs .NET SDK's (AMQP)

In deze quickstart leert u hoe u gebeurtenissen verzendt naar en ontvangt van een Event Hub met schemavalidatie met behulp van de Azure.Messaging.EventHubs .NET-bibliotheek.

Notitie

Azure Schema Registry is een functie van Event Hubs, die een centrale opslagplaats biedt voor schema's voor gebeurtenisgestuurde en berichtengerichte toepassingen. Het biedt de flexibiliteit voor uw producenten- en consumententoepassingen om gegevens uit te wisselen zonder dat u het schema hoeft te beheren en te delen. Het biedt ook een eenvoudig governanceframework voor herbruikbare schema's en definieert de relatie tussen schema's via een groeperingsconstructie (schemagroepen). Zie Azure Schema Registry in Event Hubs voor meer informatie.

Vereisten

Als u nog geen ervaring hebt met Azure Event Hubs, raadpleegt u het Event Hubs-overzicht voordat u deze quickstart uitvoert.

Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:

  • Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.
  • Microsoft Visual Studio 2022. De Azure Event Hubs-clientbibliotheek maakt gebruik van nieuwe functies die in C# 8.0 zijn geïntroduceerd. U kunt de bibliotheek nog steeds gebruiken met eerdere C#-taalversies, maar de nieuwe syntaxis is niet beschikbaar. Als u gebruik wilt maken van de volledige syntaxis, wordt u aangeraden te compileren met de .NET Core SDK 3.0 of hoger en de taalversie in te stellen op latest. Als u Visual Studio gebruikt, weet dan dat de versies vóór Visual Studio 2019 niet compatibel met de hulpprogramma's die nodig zijn om C# 8.0 projecten te bouwen. Visual Studio 2019, met inbegrip van de gratis Community-editie, hier worden gedownload.

Een Event Hub maken

Volg de instructies in de quickstart: Een Event Hubs-naamruimte en een Event Hub maken om een Event Hubs-naamruimte en een Event Hub te maken. Volg vervolgens de instructies in De connection string ophalen om een connection string naar uw Event Hubs-naamruimte te krijgen.

Noteer de volgende instellingen die u in de huidige quickstart gaat gebruiken:

  • Verbindingsreeks voor de Event Hubs-naamruimte
  • Naam van de Event Hub

Een schema maken

Volg de instructies in Schemaregister maken om een schemagroep en een schema te maken.

  1. Maak een schemagroep met de naam contoso-sg met behulp van de portal schemaregister. Gebruik Avro als het serialisatietype en Geen voor de compatibiliteitsmodus.

  2. Maak in deze schemagroep een nieuw Avro-schema met schemanaam: Microsoft.Azure.Data.SchemaRegistry.example.Order met behulp van de volgende schema-inhoud.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Gebruiker toevoegen aan de rol SchemaRegisterlezer

Voeg uw gebruikersaccount toe aan de rol Schemaregisterlezer op het niveau van de naamruimte. U kunt ook de rol Inzender voor schemaregisters gebruiken, maar dat is niet nodig voor deze quickstart.

  1. Selecteer op de pagina Event Hubs-naamruimte de optie Toegangsbeheer (IAM) in het menu links.
  2. Selecteer op de pagina Toegangsbeheer (IAM)de optie + Toevoegen ->Roltoewijzing toevoegen in het menu.
  3. Selecteer op de pagina Toewijzingstypede optie Volgende.
  4. Selecteer op de pagina Rollende optie Schemaregisterlezer (preview) en selecteer vervolgens Volgende onderaan de pagina.
  5. Gebruik de koppeling + Leden selecteren om uw gebruikersaccount toe te voegen aan de rol en selecteer vervolgens Volgende.
  6. Selecteer op de pagina Beoordelen en toewijzende optie Beoordelen en toewijzen.

Gebeurtenissen produceren naar Event Hubs met schemavalidatie

Consoletoepassing maken voor gebeurtenisproducent

  1. Start Visual Studio 2019.
  2. Selecteer Een nieuw project maken.
  3. Voer in het dialoogvenster Een nieuw project maken de volgende stappen uit: Als dit dialoogvenster niet wordt weergegeven, selecteert u in het menu Bestand, Nieuw en vervolgens Project.
    1. Selecteer de programmeertaal C# .

    2. Selecteer Console als het type toepassing.

    3. Selecteer Consoletoepassing in de lijst met resultaten.

    4. Selecteer vervolgens Volgende.

      Afbeelding van het dialoogvenster Nieuw project.

  4. Voer OrderProducer in voor de projectnaam, SRQuickStart voor de naam van de oplossing en selecteer vervolgens OK om het project te maken.

Het Event Hubs NuGet-pakket toevoegen

  1. Selecteer Tools>NuGet-pakketbeheer>Package Manager Console in het menu.

  2. Voer de volgende opdrachten uit om Azure.Messaging.EventHubs en andere NuGet-pakketten te installeren. Druk op ENTER om de laatste opdracht uit te voeren.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Verifieer producententoepassingen om verbinding te maken met Azure via Visual Studio, zoals hier wordt weergegeven.

  4. Meld u aan bij Azure met het gebruikersaccount dat lid is van de Schema Registry Reader rol op het niveau van de naamruimte. Zie Azure Schema Registry in Event Hubs voor informatie over schemaregisterrollen.

Code genereren met behulp van het Avro-schema

  1. Gebruik dezelfde inhoud die u hebt gebruikt om het schema te maken om een bestand met de naam Order.avscte maken. Sla het bestand op in de project- of oplossingsmap.
  2. Vervolgens kunt u dit schemabestand gebruiken om code voor .NET te genereren. U kunt elk hulpprogramma voor het genereren van externe code gebruiken, zoals avrogen voor het genereren van code. U kunt bijvoorbeeld uitvoeren avrogen -s .\Order.avsc . om code te genereren.
  3. Zodra u code hebt gegenereerd, ziet u het bestand met de naam Order.cs in de \Microsoft\Azure\Data\SchemaRegistry\example map. Voor het bovenstaande Avro-schema worden de C#-typen in Microsoft.Azure.Data.SchemaRegistry.example naamruimte gegenereerd.
  4. Voeg het Order.cs bestand toe aan het OrderProducer project.

Code schrijven om gebeurtenissen te serialiseren en naar de Event Hub te verzenden

  1. Voeg de volgende code toe aan het bestand Program.cs. Zie de opmerkingen bij de code voor meer informatie. Stappen op hoog niveau in de code zijn:

    1. Maak een producerclient die u kunt gebruiken om gebeurtenissen naar een Event Hub te verzenden.
    2. Maak een schemaregisterclient die u kunt gebruiken om gegevens in een Order object te serialiseren en te valideren.
    3. Maak een nieuw Order object met behulp van het gegenereerde Order type.
    4. Gebruik de schemaregisterclient om het Order object te serialiseren naar EventData.
    5. Maak een batch met gebeurtenissen.
    6. Voeg de gebeurtenisgegevens toe aan de gebeurtenisbatch.
    7. Gebruik de producerclient om de batch met gebeurtenissen naar de Event Hub te verzenden.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Vervang de volgende waarden voor tijdelijke aanduidingen door de werkelijke waarden.

    • EVENTHUBSNAMESPACECONNECTIONSTRING- connection string voor de Event Hubs-naamruimte
    • EVENTHUBNAME - naam van de Event Hub
    • EVENTHUBSNAMESPACENAME - naam van de Event Hubs-naamruimte
    • SCHEMAGROUPNAME - naam van de schemagroep
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Bouw het project en controleer of er geen fouten zijn.

  4. Voer het programma uit en wacht op het bevestigingsbericht.

    A batch of 1 order has been published.
    
  5. In de Azure Portal kunt u controleren of de Event Hub de gebeurtenissen heeft ontvangen. Schakel over naar de weergave Berichten in de sectie Metrische gegevens. U moet de pagina vernieuwen om de grafiek bij te werken. Het kan een paar seconden duren voordat wordt weergegeven dat de berichten zijn ontvangen.

    Afbeelding van de pagina Azure Portal om te controleren of de Event Hub de gebeurtenissen heeft ontvangen.

Gebeurtenissen van Event Hubs gebruiken met schemavalidatie

In deze sectie wordt beschreven hoe u een .NET Core-consoletoepassing schrijft die gebeurtenissen van een Event Hub ontvangt en hoe u het schemaregister gebruikt om gebeurtenisgegevens te deserialiseren.

Aanvullende vereisten

  • Maak het opslagaccount dat moet worden gebruikt voor de gebeurtenisverwerker.

Een consumententoepassing maken

  1. Klik in het venster Solution Explorer met de rechtermuisknop op de oplossing SRQuickStart, wijs Toevoegen aan en selecteer Nieuw project.
  2. Selecteer Consoletoepassing en selecteer Volgende.
  3. Voer OrderConsumer in als projectnaam en selecteer Maken.
  4. Klik in het venster Solution Explorer met de rechtermuisknop op OrderConsumer en selecteer Instellen als opstartproject.

Het Event Hubs NuGet-pakket toevoegen

  1. Selecteer Tools>NuGet-pakketbeheer>Package Manager Console in het menu.

  2. Controleer in het venster Package Manager Console of OrderConsumer is geselecteerd voor het standaardproject. Zo niet, gebruik dan de vervolgkeuzelijst om OrderConsumer te selecteren.

  3. Voer de volgende opdracht uit om de vereiste NuGet-pakketten te installeren. Druk op Enter om de laatste opdracht uit te voeren.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Verifieer producententoepassingen om verbinding te maken met Azure via Visual Studio, zoals hier wordt weergegeven.

  5. Meld u aan bij Azure met het gebruikersaccount dat lid is van de Schema Registry Reader rol op naamruimteniveau. Zie Azure Schema Registry in Event Hubs voor meer informatie over schemaregisterrollen.

  6. Voeg het Order.cs bestand dat u hebt gegenereerd als onderdeel van het maken van de producer-app toe aan het project OrderConsumer .

  7. Klik met de rechtermuisknop op OrderConsumer-project en selecteer Instellen als opstartproject.

Code schrijven om gebeurtenissen te ontvangen en deze te deserialiseren met behulp van schemaregister

  1. Voeg de volgende code toe aan het bestand Program.cs. Zie de opmerkingen bij de code voor meer informatie. Stappen op hoog niveau in de code zijn:

    1. Maak een consumentenclient die u kunt gebruiken om gebeurtenissen naar een Event Hub te verzenden.
    2. Maak een blobcontainerclient voor de blobcontainer in de Azure Blob Storage.
    3. Maak een gebeurtenisprocessorclient en registreer gebeurtenis- en fout-handlers.
    4. Maak in de gebeurtenis-handler een schemaregisterclient die u kunt gebruiken om gebeurtenisgegevens in een Order object te deserialiseren.
    5. Deserialiseer de gebeurtenisgegevens in een Order object met behulp van de serializer.
    6. Druk de informatie over de ontvangen bestelling af.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Vervang de volgende waarden voor tijdelijke aanduidingen door de werkelijke waarden.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING- connection string voor de Event Hubs-naamruimte
    • EVENTHUBNAME - naam van de Event Hub
    • EVENTHUBSNAMESPACENAME - naam van de Event Hubs-naamruimte
    • SCHEMAGROUPNAME - naam van de schemagroep
    • AZURESTORAGECONNECTIONSTRING- connection string voor het Azure-opslagaccount
    • BLOBCONTAINERNAME - Naam van de blobcontainer
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Bouw het project en controleer of er geen fouten zijn.

  4. Voer de ontvangende toepassing uit.

  5. U ziet nu een bericht dat de gebeurtenissen zijn ontvangen.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Deze gebeurtenissen zijn de drie gebeurtenissen die u eerder naar de Event Hub hebt verzonden door het verzendprogramma uit te voeren.

Voorbeelden

Zie het Leesmij-artikel in onze GitHub-opslagplaats.

Resources opschonen

Verwijder de Event Hubs-naamruimte of verwijder de resourcegroep die de naamruimte bevat.

Volgende stappen