Quickstart: Validate schema when sending and receiving events - AMQP and .NET

Azure Schema Registry is a feature of Event Hubs, which provides a central repository for schemas for event-driven and messaging-centric applications. It provides the flexibility for your producer and consumer applications to exchange data without having to manage and share the schema. It also provides a simple governance framework for reusable schemas and defines relationship between schemas through a grouping construct (schema groups). For more information, see Azure Schema Registry in Event Hubs.

This quickstart shows how to send events to and receive events from an event hub with schema validation using the Azure.Messaging.EventHubs .NET library.

Prerequisites

If you're new to Azure Event Hubs, see Event Hubs overview before you do this quickstart.

To complete this quickstart, you need the following prerequisites:

  • Follow instructions from the quickstart: Create an Event Hubs namespace and an event hub.
  • Follow instructions from Get the connection string to get a connection string to your Event Hubs namespace. Note down the following settings that you'll use in the current quickstart:
    • Connection string for the Event Hubs namespace
    • Name of the event hub
  • Complete the .NET quickstart to become familiar with sending events to and receiving events from event hubs using .NET. If you have already done the .NET quickstart before, you can skip this step.
  • Follow instructions from Create schemas using Schema Registry to create a schema group and a schema. When creating a schema, follow instructions from the Create a schema in the current quickstart article.
  • Microsoft Visual Studio 2019. The Azure Event Hubs client library makes use of new features that were introduced in C# 8.0. You can still use the library with previous C# language versions, but the new syntax won't be available. To make use of the full syntax, we recommended that you compile with the .NET Core SDK 3.0 or higher and language version set to latest. If you're using Visual Studio, versions before Visual Studio 2019 aren't compatible with the tools needed to build C# 8.0 projects. Visual Studio 2019, including the free Community edition, can be downloaded here.

Create a schema

  1. Create a schema group named contoso-sg using the Schema Registry portal. Use Avro as the serialization type and None for the compatibility mode.

  2. In that schema group, create a new Avro schema with schema name: Microsoft.Azure.Data.SchemaRegistry.example.Order using the following schema content.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Produce events to event hubs with schema validation

Create console application for event producer

  1. Start Visual Studio 2019.
  2. Select Create a new project.
  3. On the Create a new project dialog box, do the following steps: If you don't see this dialog box, select File on the menu, select New, and then select Project.
    1. Select C# for the programming language.

    2. Select Console for the type of the application.

    3. Select Console Application from the results list.

    4. Then, select Next.

      Image showing the New Project dialog box.

  4. Enter OrderProducer for the project name, SRQuickStart for the solution name, and then select OK to create the project.

Add the Event Hubs NuGet package

  1. Select Tools > NuGet Package Manager > Package Manager Console from the menu.

  2. Run the following command to install the Azure.Messaging.EventHubs NuGet package:

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro -Version 1.0.0-beta.2
    Install-Package Azure.ResourceManager.Compute -Version 1.0.0-beta.1
    
    
  3. Authenticate producer applications to connect to Azure via Visual Studio as shown here.

Code generation using the Avro schema

  1. You can use the same schema content and create the Avro schema file Order.avsc file inside the OrderProducer project.
  2. Then you can use this schema file to generate code for .NET. You can use any external code generation tool such as avrogen for code generation. (For example you can run avrogen -s .\Order.avsc to generate code).
  3. Once you generate code, you should have the corresponding C# types available inside your project. For the above Avro schema, it generates the C# types in Microsoft.Azure.Data.SchemaRegistry.example namespace.

Write code to serialize and send events to the event hub

  1. Add the following using statements to the top of the Program.cs file:

    using System;
    using System.IO;
    using System.Threading;
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    using System.Threading.Tasks;
    
  2. Also you can import the generated types related to Order schema as shown below.

    using Microsoft.Azure.Data.SchemaRegistry.example;   
    
  3. Add constants to the Program class for the Event Hubs connection string and the event hub name.

        // connection string to the Event Hubs namespace
        private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    
        // name of the event hub
        private const string eventHubName = "<EVENT HUB NAME>";
    
        // Schema Registry endpoint 
        private const string schemaRegistryEndpoint = "<EVENT HUBS NAMESPACE>.servicebus.windows.net>";
    
        // name of the consumer group   
        private const string schemaGroup = "<SCHEMA GROUP>";
    
    

    Note

    Replace placeholder values with the connection string to your namespace, the name of the event hub, and schema group.

  4. Add the following static property to the Program class. See the code comments.

        // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
        // of the application, which is best practice when events are being published or read regularly.
        static EventHubProducerClient producerClient;    
    
  5. Replace the Main method with the following async Main method. See the code comments for details.

        static async Task Main()
        {
            // Create a producer client that you can use to send events to an event hub
            producerClient = new EventHubProducerClient(connectionString, eventHubName); 
    
            // Create a schema registry client that you can use to serialize and validate data.  
            var schemaRegistryClient = new SchemaRegistryClient(endpoint: schemaRegistryEndpoint, credential: new DefaultAzureCredential());
    
            // Create a batch of events 
            using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
            // Create a new order object using the generated type/class 'Order'. 
            var sampleOrder = new Order { id = "12345", amount = 55.99, description = "This is a sample order." };
    
            using var memoryStream = new MemoryStream();
            // Create an Avro object serializer using the Schema Registry client object. 
            var producerSerializer = new SchemaRegistryAvroObjectSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroObjectSerializerOptions { AutoRegisterSchemas = true });
    
            // Serialize events data to the memory stream object. 
            producerSerializer.Serialize(memoryStream, sampleOrder, typeof(Order), CancellationToken.None);
    
            byte[] _memoryStreamBytes;
            _memoryStreamBytes = memoryStream.ToArray();
    
            // Create event data with serialized data and add it to an event batch. 
            eventBatch.TryAdd(new EventData(_memoryStreamBytes));
    
            // Send serilized event data to event hub. 
            await producerClient.SendAsync(eventBatch);
            Console.WriteLine("A batch of 1 order has been published.");
        }
    
  6. Build the project, and ensure that there are no errors.

  7. Run the program and wait for the confirmation message.

    A batch of 1 order has been published.
    
  8. In the Azure portal, you can verify that the event hub has received the events. Switch to Messages view in the Metrics section. Refresh the page to update the chart. It may take a few seconds for it to show that the messages have been received.

    Image of the Azure portal page to verify that the event hub received the events.

Consume events from event hubs with schema validation

This section shows how to write a .NET Core console application that receives events from an event hub and use schema registry to de-serialize event data.

Create consumer application

  1. In the Solution Explorer window, right-click the SRQuickStart solution, point to Add, and select New Project.
  2. Select Console application, and select Next.
  3. Enter OrderConsumer for the Project name, and select Create.
  4. In the Solution Explorer window, right-click OrderConsumer, and select Set as a Startup Project.

Add the Event Hubs NuGet package

  1. Select Tools > NuGet Package Manager > Package Manager Console from the menu.

  2. In the Package Manager Console window, confirm that OrderConsumer is selected for the Default project. If not, use the drop-down list to select OrderConsumer.

  3. Run the following command to install the required NuGet package:

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro -Version 1.0.0-beta.2
    Install-Package Azure.ResourceManager.Compute -Version 1.0.0-beta.1
    
    
  4. Authenticate producer applications to connect to Azure via Visual Studio as shown here.

Code generation using the Avro schema

  1. You can use the same schema content and create the Avro schema file Order.avsc file inside the OrderProducer project.
  2. Then you can use this schema file to generate code for .NET. For this, you can use any external code generation tool such as avrogen. (For example you can run avrogen -s .\Order.avsc to generate code).
  3. Once you generate code, you should have the corresponding C# types available inside your project. For the above Avro schema, it generates the C# types in Microsoft.Azure.Data.SchemaRegistry.example namespace.

Write code to receive events and deserialize them using Schema Registry

  1. Add the following using statements to the top of the Program.cs file:

    using System;
    using System.IO;
    using System.Text;
    using System.Threading.Tasks;
    using System.Threading;
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
  2. Also you can import the generated types related to Order schema as shown below.

    using Microsoft.Azure.Data.SchemaRegistry.example;   
    
  3. Add constants to the Program class for the Event Hubs connection string and the event hub name.

        // connection string to the Event Hubs namespace
        private const string connectionString = "<EVENT HUBS NAMESPACE - CONNECTION STRING>";
    
        // name of the event hub
        private const string eventHubName = "<EVENT HUB NAME>";
    
        private const string blobStorageConnectionString = "<AZURE STORAGE CONNECTION STRING>";
    
        private const string blobContainerName = "<BLOB CONTAINER NAME>";
    
        // Schema Registry endpoint 
        private const string schemaRegistryEndpoint = "<EVENT HUBS NAMESPACE>.servicebus.windows.net>";
    
        // name of the consumer group   
        private const string schemaGroup = "<SCHEMA GROUP>";
    
    
  4. Add the following static properties to the Program class.

        static BlobContainerClient storageClient;
    
        // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
        // of the application, which is best practice when events are being published or read regularly.        
        static EventProcessorClient processor;    
    
  5. Replace the Main method with the following async Main method. See the code comments for details.

        static async Task Main()
        {
            // Read from the default consumer group: $Default
            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
    
            // Create a blob container client that the event processor will use 
            storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
            // Create an event processor client to process events in the event hub
            processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
    
            // Register handlers for processing events and handling errors
            processor.ProcessEventAsync += ProcessEventHandler;
            processor.ProcessErrorAsync += ProcessErrorHandler;
    
            // Start the processing
            await processor.StartProcessingAsync();
    
            // Wait for 30 seconds for the events to be processed
            await Task.Delay(TimeSpan.FromSeconds(30));
    
            // Stop the processing
            await processor.StopProcessingAsync();
        }
    
  6. Now, add the following event handler method that includes event de-serialization logic with the schema registry

        static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
        {
            // Create a schema registry client that you can use to deserialize and validate data.  
            var schemaRegistryClient = new SchemaRegistryClient(endpoint: schemaRegistryEndpoint, credential: new DefaultAzureCredential());
    
            // Retrieve event data and convert it to a byte array. 
            byte[] _memoryStreamBytes = eventArgs.Data.Body.ToArray();
            using var consumerMemoryStream = new MemoryStream(_memoryStreamBytes);
    
            var consumerSerializer = new SchemaRegistryAvroObjectSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroObjectSerializerOptions { AutoRegisterSchemas = false });
            consumerMemoryStream.Position = 0;
    
            // Deserialize event data and create order object using schema registry. 
            Order sampleOrder = (Order)consumerSerializer.Deserialize(consumerMemoryStream, typeof(Order), CancellationToken.None);
            Console.WriteLine("Received - Order ID: " + sampleOrder.id);                 
    
            // Update checkpoint in the blob storage so that the app receives only new events the next time it's run
            await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);   
        }
    
    
  7. Now, add the following error handler methods to the class.

        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
        {
            // Write details about the error to the console window
            Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
            Console.WriteLine(eventArgs.Exception.Message);
            return Task.CompletedTask;
        }
    
    
  8. Build the project, and ensure that there are no errors.

  9. Run the receiver application.

  10. You should see a message that the events have been received.

    Received - Order ID: 12345
    

    These events are the three events you sent to the event hub earlier by running the sender program.

Next steps