你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

在使用事件中心 .NET SDK (AMQP) 流式处理事件时使用 Avro 架构验证

本快速入门介绍如何使用 Azure.Messaging.EventHubs .NET 库通过架构验证向/从事件中心发送/接收事件。

注意

Azure 架构注册表是事件中心的一项功能,它为事件驱动的应用程序和以消息为中心的应用程序的架构提供一个中心存储库。 它使生成者和使用者应用程序可以灵活地交换数据,而无需管理和共享架构。 它还为可重用架构提供了一个简单的治理框架,并通过分组构造(架构组)定义了架构之间的关系。 有关详细信息,请参阅事件中心中的 Azure 架构注册表

先决条件

如果不熟悉 Azure 事件中心,请在阅读本快速入门之前参阅事件中心概述

若要完成本快速入门,需要具备以下先决条件:

  • 如果还没有 Azure 订阅,可以在开始前创建一个免费帐户
  • Microsoft Visual Studio 2022。 Azure 事件中心客户端库利用 C# 8.0 中引入的新功能。 仍然可以使用之前的 C# 语言版本的库,但新语法不可用。 若要使用完整语法,建议使用 .NET Core SDK 3.0 或更高版本进行编译,并将语言版本设置为 latest。 如果使用 Visual Studio,Visual Studio 2019 以前的版本与生成 C# 8.0 项目时所需的工具将不兼容。 可在此处下载 Visual Studio 2019(包括免费的 Community Edition)。

创建事件中心

按照快速入门创建事件中心命名空间和事件中心中的说明创建事件中心命名空间和事件中心。 然后,按照获取连接字符串中的说明获取事件中心命名空间的连接字符串。

请记下以下设置,因为稍后要在本快速入门中使用:

  • 事件中心命名空间的连接字符串
  • 事件中心的名称

创建架构

按照使用架构注册表创建架构中的说明创建架构组和架构。

  1. 使用架构注册表门户创建名为 contoso-sg 的架构组。 使用“Avro”作为序列化类型,使用“无”作为兼容性模式。

  2. 在该架构组中,使用以下架构内容创建名为 Microsoft.Azure.Data.SchemaRegistry.example.Order 的新 Avro 架构。

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

将用户添加到架构注册表读取者角色

在命名空间级别,将用户帐户添加到“架构注册表读取者”角色。 也可以使用“架构注册表参与者”角色,但本快速入门不需要这样做。

  1. 在“事件中心命名空间”页面上,从左侧菜单中选择“访问控制(IAM)”。
  2. 在“访问控制(IAM)”页面上,依次选择“+ 添加”-> 菜单上的“添加角色分配”。
  3. 在“分配类型”页面上,选择“下一步”。
  4. 在“角色”页面上,选择“架构注册表读取者(预览)”,然后选择页面底部的“下一步”。
  5. 使用“+ 选择成员”链接将用户帐户添加到该角色,然后选择“下一步”。
  6. 在“查看 + 分配”页面上,选择“查看 + 分配”。

使用架构验证向事件中心生成事件

为事件生成者创建控制台应用程序

  1. 启动 Visual Studio 2019。
  2. 选择“创建新项目”。
  3. 在“创建新项目”对话框中执行以下步骤:如果看不到此对话框,请在菜单中选择“文件”,然后依次选择“新建”、“项目”。
    1. 选择“C#”作为编程语言。

    2. 选择“控制台”作为应用程序类型。

    3. 从结果列表中选择“控制台应用程序”。

    4. 然后,选择“下一步” 。

      显示“新建项目”对话框的插图。

  4. 输入“OrderProducer”作为项目名称,输入“SRQuickStart”作为解决方案名称,然后选择“确定”以创建项目 。

添加事件中心 NuGet 包

  1. 在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”

  2. 运行以下命令以安装 Azure.Messaging.EventHubs 和其他 NuGet 包。 按 ENTER 运行最新命令。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. 验证生成者应用程序,以通过 Visual Studio 连接到 Azure,如此处所示。

  4. 使用在命名空间级别作为 Schema Registry Reader 角色成员的用户帐户登录到 Azure。 有关架构注册表角色的信息,请参阅事件中心中的 Azure 架构注册表

使用 Avro 架构生成代码

  1. 使用创建架构时所用的内容来创建名为 Order.avsc 的文件。 将文件保存在项目或解决方案文件夹中。
  2. 然后可以使用此架构文件生成适用于 .NET 的代码。 可以使用任何外部代码生成工具(例如 avrogen)生成代码。 例如,可以运行 avrogen -s .\Order.avsc . 来生成代码。
  3. 生成代码后,你将在 Order.cs 文件夹中看到名为 \Microsoft\Azure\Data\SchemaRegistry\example 的文件。 对于上述 Avro 架构,将在 Microsoft.Azure.Data.SchemaRegistry.example 命名空间中生成 C# 类型。
  4. Order.cs 文件添加到 OrderProducer 项目中。

编写代码以序列化事件并将其发送到事件中心

  1. 将以下代码添加到 Program.cs 文件。 参阅代码注释了解详细信息。 代码中的概要步骤如下:

    1. 创建可用于将事件发送到事件中心的生成者客户端。
    2. 创建可用于序列化和验证 Order 对象中的数据的架构注册表客户端。
    3. 使用生成的 Order 类型创建新的 Order 对象。
    4. 使用架构注册表客户端将 Order 对象序列化为 EventData
    5. 准备一批事件。
    6. 将事件数据添加到事件批。
    7. 使用生成者客户端将该批事件发送到事件中心。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. 将以下占位符值替换为实际值。

    • EVENTHUBSNAMESPACECONNECTIONSTRING - 事件中心命名空间的连接字符串
    • EVENTHUBNAME - 事件中心的名称
    • EVENTHUBSNAMESPACENAME - 事件中心命名空间的名称
    • SCHEMAGROUPNAME - 架构组的名称
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. 生成项目并确保没有错误。

  4. 运行程序并等待出现确认消息。

    A batch of 1 order has been published.
    
  5. 在 Azure 门户中,可以验证事件中心是否已收到事件。 在“指标”部分切换到“消息”视图。 刷新页面以更新图表。 可能需要在几秒钟后才会显示已收到消息。

    用于验证事件中心是否已收到事件的 Azure 门户页的插图。

通过架构验证使用来自事件中心的事件

本部分介绍如何编写一个 .NET Core 控制台应用程序,用于从事件中心接收事件,并使用架构注册表来反序列化事件数据。

其他先决条件

  • 创建要用于事件处理器的存储帐户。

创建使用者应用程序

  1. 在“解决方案资源管理器”窗口中,右键单击“SRQuickStart”解决方案,指向“添加”,然后选择“新建项目” 。
  2. 选择“控制台应用程序”,然后选择“下一步”。
  3. 输入“OrderConsumer”作为项目名称,然后选择“创建” 。
  4. 在“解决方案资源管理器”窗口中,右键单击“OrderConsumer”并选择“设为启动项目” 。

添加事件中心 NuGet 包

  1. 在菜单中选择“工具”>“NuGet 包管理器”>“包管理器控制台”

  2. 在“包管理器控制台”窗口中,确认是否为“默认项目”选择了“OrderConsumer” 。 如果不是,请使用下拉列表选择“OrderConsumer”。

  3. 运行以下命令以安装所需的 NuGet 包。 按 ENTER 运行最新命令。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. 验证生成者应用程序,以通过 Visual Studio 连接到 Azure,如此处所示。

  5. 使用在命名空间级别作为 Schema Registry Reader 角色成员的用户帐户登录到 Azure。 有关架构注册表角色的信息,请参阅事件中心中的 Azure 架构注册表

  6. 将你在创建生成者应用时生成的 Order.cs 文件添加到 OrderConsumer 项目。

  7. 右键单击 OrderConsumer 项目,然后选择“设为启动项目”。

编写代码以接收事件并使用架构注册表反序列化这些事件

  1. 将以下代码添加到 Program.cs 文件。 参阅代码注释了解详细信息。 代码中的概要步骤如下:

    1. 创建可用于将事件发送到事件中心的使用者客户端。
    2. 为 Azure Blob 存储中的 Blob 容器创建 Blob 容器客户端。
    3. 创建事件处理器客户端,然后注册事件和错误处理程序。
    4. 在事件处理程序中,创建可用于将事件数据反序列化为 Order 对象的架构注册表客户端。
    5. 使用序列化程序将事件数据反序列化为 Order 对象。
    6. 打印有关收到的订单的信息。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. 将以下占位符值替换为实际值。

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - 事件中心命名空间的连接字符串
    • EVENTHUBNAME - 事件中心的名称
    • EVENTHUBSNAMESPACENAME - 事件中心命名空间的名称
    • SCHEMAGROUPNAME - 架构组的名称
    • AZURESTORAGECONNECTIONSTRING - Azure 存储帐户的连接字符串
    • BLOBCONTAINERNAME - Blob 容器的名称
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. 生成项目并确保没有错误。

  4. 运行接收器应用程序。

  5. 应会看到一条消息,指出已收到事件。

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    这些事件是前面通过运行发送器程序发送到事件中心的三个事件。

示例

请参阅 GitHub 存储库中的自述文章。

清理资源

删除事件中心命名空间或删除包含该命名空间的资源组。

后续步骤