分享方式:


使用事件中樞 .NET SDK (AMQP) 串流事件時,使用 Avro 結構描述進行驗證

這個快速入門說明如何使用 Azure.Messaging.EventHubs .NET 程式庫,將事件傳送至具有結構描述驗證的事件中樞,以及從中接收事件。

注意

Azure 結構描述登錄是事件中樞的一項功能,可為事件驅動和以訊息為中心的應用程式,提供結構描述的中央存放庫。 其可為您的生產者和取用者應用程式提供彈性以交換資料而不需要管理和共用結構描述。 其也為可重複使用的結構描述提供簡單的治理架構,並透過群組結構 (結構描述群組) 定義結構描述之間的關聯性。 如需詳細資訊,請參閱事件中樞中的 Azure 結構描述登錄

必要條件

如果您對 Azure 事件中樞並不熟悉,在進行此快速入門之前,請先參閱事件中樞概述

若要完成本快速入門,您必須符合下列必要條件:

  • 如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶
  • Microsoft Visual Studio 2022。 Azure 事件中樞用戶端程式庫會使用 C# 8.0 中引進的新功能。 您仍然可以使用之前 C# 程式設計語言版本,但無法使用新的語法。 若要使用完整的語法,建議您使用 .NET Core SDK 3.0 或更新版本,並將語言版本設為 latest 以進行編譯。 如果您使用的是 Visual Studio,Visual Studio 2019 之前的版本與建立 C# 8.0 專案所需的工具不相容。 Visual Studio 2019 (包括免費的 Community 版) 可以在這裡下載。

建立事件中樞

遵循快速入門: [建立事件中樞命名空間和事件中樞] 中的指示,建立事件中樞命名空間和事件中樞。 遵循 [取得連接字串] 中的指示,以取得事件中樞命名空間的連接字串。

記下您將在目前快速入門中使用的下列設定:

  • 事件中樞命名空間的連接字串
  • 事件中樞的名稱。

建立結構描述

遵循 [使用結構描述登錄建立結構描述] 的指示,來建立結構描述群組和結構描述。

  1. 使用結構描述登錄入口網站來建立名為 contoso-sg 的結構描述群組。 使用 Avro 作為序列化類型,以及 None 作為相容性模式。

  2. 在該結構描述群組中,使用下列結構描述內容建立新的 Avro 結構描述,並具有結構描述名稱:Microsoft.Azure.Data.SchemaRegistry.example.Order

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

將使用者新增至結構描述登錄讀者角色

將用戶帳戶新增至命名空間層級的 結構描述登錄讀者 角色。 您也可以使用 結構描述登錄參與者 角色,但這並非本快速入門的必要專案。

  1. [事件中樞命名空間] 頁面上,選取左側功能表上的 [存取控制 (IAM)]
  2. [存取控制 (IAM)] 頁面上,在功能表上選取 [+ 新增] ->[新增角色指派]
  3. [指派類型] 頁面上,選取 [下一步]
  4. [角色] 頁面上,選取 [結構描述登錄讀者 (預覽)],然後選取頁面底部的 [下一步]
  5. 使用 [+ 選取成員] 連結將使用者帳戶新增至角色,然後選取 [下一步]
  6. [檢閱 + 指派] 索引標籤上,選取 [檢閱 + 指派]

使用結構描述驗證向事件中樞產生事件

為事件生產者建立主控台應用程式

  1. 啟動 Visual Studio 2019。
  2. 選取 [建立新專案]
  3. [建立新專案] 對話框中,執行下列步驟: 如果您沒有看到此對話框,請在功能表上選取 [檔案],選取 [新增],然後選取 [專案]
    1. 選取 [C#] 作為程式設計語言。

    2. 選取 [主控台] 作為應用程式的類型。

    3. 從結果清單選取 [主控台應用程式]

    4. 然後選取下一步

      Image showing the New Project dialog box.

  4. 輸入 OrderProducer 作為專案名稱、輸入 SRQuickStart 作為解決方案名稱,然後選取 [確定] 以建立專案。

新增事件中樞 NuGet 封裝

  1. 從功能表選取 [工具]>[NuGet 套件管理員]>[套件管理員主控台]

  2. 執行以下命令來安裝 Azure.Messaging.EventHubs 和其他 NuGet 套件。 按下 [ENTER] 鍵執行此命令。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. 驗證生產者應用程式,以透過 Visual Studio 連接至 Azure,如這裡所示。

  4. 使用屬於命名空間層級 Schema Registry Reader 角色成員的使用者帳戶登入 Azure。 如需結構描述登錄角色的相關資訊,請參閱 事件中樞中的 Azure 結構描述登錄

使用 Avro 結構描述產生程式碼

  1. 使用您用來建立結構描述的相同內容來建立名為 Order.avsc 的檔案。 將檔案儲存在專案或解決方案資料夾中。
  2. 然後,您可以使用此結構描述檔案來產生 .NET 的程式碼。 您可以使用任何外部程式碼產生工具 (例如 avrogen) 來產生程式碼。 例如,您可以執行 avrogen -s .\Order.avsc . 以產生程式碼。
  3. 產生程式代碼之後,您會在 \Microsoft\Azure\Data\SchemaRegistry\example 資料夾中看到名為 Order.cs 的檔案。 針對上述 Avro 結構描述,它會在 Microsoft.Azure.Data.SchemaRegistry.example 命名空間中產生 C# 類型。
  4. Order.cs 檔案新增至 OrderProducer 專案。

撰寫程式碼以將事件序列化並傳送至事件中樞

  1. 將下列程式碼新增至 Program.cs 檔案。 如需詳細資訊,請參閱程式碼註解。 程式碼中的高階步驟如下:

    1. 建立生產者用戶端,以用來將事件傳送至事件中樞。
    2. 建立您可用來串行化和驗證 Order 物件中的資料的結構描述登錄用戶端。
    3. 使用產生的 Order 類型,建立新的 Order 物件。
    4. 使用結構描述登錄用戶端將 Order 物件序列化至 EventData
    5. 準備事件批次。
    6. 將事件資料新增至事件批次。
    7. 使用產生者用戶端將這批事件傳送到事件中樞。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. 將下列預留位置值取代為實際值。

    • EVENTHUBSNAMESPACECONNECTIONSTRING - 事件中樞命名空間的連接字串
    • EVENTHUBNAME - 事件中樞的名稱
    • EVENTHUBSNAMESPACENAME - 事件中樞命名空間的名稱
    • SCHEMAGROUPNAME - 結構描述群組的名稱
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. 建置專案,並確定沒有任何錯誤。

  4. 執行程式,並等待確認訊息。

    A batch of 1 order has been published.
    
  5. 在 Azure 入口網站中,您可以驗證事件中樞已收到事件。 切換至 [計量] 區段中的 [訊息] 檢視。 請重新整理頁面來更新圖表。 可能需要幾秒鐘的時間,頁面才會顯示已收到訊息。

    Image of the Azure portal page to verify that the event hub received the events.

使用結構描述驗證從事件中樞取用事件

本章節會說明如何撰寫 .NET Core 主控台應用程式,以接收來自事件中樞的事件,並使用結構描述登錄來還原序列化事件資料。

其他必要條件

  • 建立記憶體帳戶以使用事件處理器。

建立取用者應用程式

  1. 在 [方案總管] 視窗中,以滑鼠右鍵按一下 SRQuickStart 解決方案,並指向 [新增],然後選取 [新增專案]
  2. 選取 [主控台應用程式],然後選取 [下一步]
  3. 針對 [專案名稱] 輸入 OrderConsumer,然後選取 [建立]
  4. 在 [方案總管] 視窗中,以滑鼠右鍵按一下 OrderConsumer,然後選取 [設定為啟動專案]

新增事件中樞 NuGet 封裝

  1. 從功能表選取 [工具]>[NuGet 套件管理員]>[套件管理員主控台]

  2. 在 [套件管理員主控台] 視窗中,確認已針對 [預設專案] 選取 OrderConsumer。 如果沒有,請使用下拉式清單選取 OrderConsumer

  3. 執行下列命令以安裝必要的 NuGet 套件。 按下 [ENTER] 鍵執行此命令。

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. 驗證生產者應用程式,以透過 Visual Studio 連接至 Azure,如這裡所示。

  5. 使用屬於命名空間層級 Schema Registry Reader 角色成員的使用者帳戶登入 Azure。 如需結構描述登錄角色的相關資訊,請參閱 事件中樞中的 Azure 結構描述登錄

  6. 將您在建立產生者應用程式時產生的 Order.cs 檔案新增至 OrderConsumer 專案。

  7. 以滑鼠右鍵按一下 [OrderConsumer] 專案,並選取 [設定為啟始專案]

撰寫程式碼以接收事件,並使用結構描述登錄將其還原序列化

  1. 將下列程式碼新增至 Program.cs 檔案。 如需詳細資訊,請參閱程式碼註解。 程式碼中的高階步驟如下:

    1. 建立生產者用戶端,以用來將事件傳送至事件中樞。
    2. 為 Azure Blob 儲存體中的 Blob 容器建立 Blob 容器用戶端。
    3. 建立事件處理器用戶端,並註冊事件和錯誤處理程式。
    4. 在事件處理程式中,建立結構描述登錄用戶端,讓您可用來將事件資料還原序列化為 Order 物件。
    5. 使用序列化程式將事件資料還原序列化至 Order 物件。
    6. 列印所接收訂單的相關資訊。
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. 將下列預留位置值取代為實際值。

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - 事件中樞命名空間的連接字串
    • EVENTHUBNAME - 事件中樞的名稱
    • EVENTHUBSNAMESPACENAME - 事件中樞命名空間的名稱
    • SCHEMAGROUPNAME - 結構描述群組的名稱
    • AZURESTORAGECONNECTIONSTRING - Azure 儲存體帳戶的連接字串
    • BLOBCONTAINERNAME - Blob 容器的名稱
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. 建置專案,並確定沒有任何錯誤。

  4. 執行接收者應用程式。

  5. 您應該會看到訊息,指出已收到事件。

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    這些事件是您稍早透過執行寄件者程式傳送至事件中樞的三個事件。

範例

請參閱 GitHub 存放庫中的 Readme 文章。

清除資源

刪除事件中樞命名空間,或刪除包含命名空間的資源群組。

下一步