Bagikan melalui


Memvalidasi menggunakan skema Avro saat streaming peristiwa menggunakan Event Hubs .NET SDK (AMQP)

Dalam mulai cepat ini, Anda mempelajari cara mengirim peristiwa ke dan menerima peristiwa dari pusat aktivitas dengan validasi skema menggunakan pustaka .NET Azure.Messaging.EventHubs .

Catatan

Azure Schema Registry adalah fitur Azure Event Hubs, yang menyediakan repositori pusat untuk skema pada aplikasi yang digerakkan oleh peristiwa dan pesan-sentris. Hal ini memberikan fleksibilitas bagi aplikasi produsen dan konsumen Anda untuk bertukar data tanpa harus mengelola dan berbagi skema. Schema Registry juga menyediakan kerangka kerja tata kelola sederhana untuk skema yang dapat digunakan kembali dan mendefinisikan hubungan antara skema melalui konstruksi pengelompokan (kelompok skema). Untuk informasi selengkapnya, lihat Azure Schema Registry di Pusat Aktivitas.

Prasyarat

Jika Anda baru menggunakan Azure Event Hubs, lihat Ringkasan Event Hubs sebelum Anda melakukan mulai cepat ini.

Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:

  • Jika Anda tidak memiliki langganan Azure, buatlah akun gratis sebelum Anda memulai.
  • Microsoft Visual Studio 2022. Pustaka klien Azure Event Hubs memanfaatkan fitur baru yang diperkenalkan di C# 8.0. Anda masih dapat menggunakan pustaka dengan versi bahasa C# sebelumnya, tetapi sintaks baru tidak tersedia. Untuk menggunakan sintaks lengkap, kami menyarankan Anda untuk mengompilasi dengan .NET Core SDK 3.0 atau lebih tinggi dan versi bahasa diatur ke latest. Jika Anda menggunakan Visual Studio, versi sebelum Visual Studio 2019 tidak kompatibel dengan alat yang diperlukan untuk membuat proyek C# 8.0. Visual Studio 2019, termasuk edisi Komunitas gratis, dapat diunduh di sini.

Membuat pusat aktivitas

Ikuti instruksi dari mulai cepat: Membuat namespace layanan Azure Event Hubs dan pusat aktivitas untuk membuat namespace layanan Azure Event Hubs dan pusat aktivitas. Kemudian, ikuti instruksi dari Dapatkan string koneksi untuk mendapatkan string koneksi ke namespace layanan Azure Event Hubs Anda.

Catat pengaturan berikut yang akan Anda gunakan di mulai cepat saat ini:

  • String koneksi untuk namespace Azure Event Hubs
  • Nama hub peristiwa

Buat skema

Ikuti instruksi dari Membuat skema menggunakan Schema Registry untuk membuat grup skema dan skema.

  1. Buat grup skema bernama contoso-sg menggunakan portal Schema Registry. Gunakan Avro sebagai jenis serialisasi dan Tidak ada untuk mode kompatibilitas.

  2. Di grup skema tersebut, buat skema Avro baru dengan nama skema: Microsoft.Azure.Data.SchemaRegistry.example.Order menggunakan konten skema berikut.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Menambahkan pengguna ke peran Pembaca Schema Registry

Tambahkan akun pengguna Anda ke peran Pembaca Schema Registry di tingkat namespace. Anda juga dapat menggunakan peran Kontributor Registri Skema , tetapi itu tidak diperlukan untuk mulai cepat ini.

  1. Pada halaman Namespace Layanan Pusat Aktivitas , pilih Kontrol akses (IAM) di menu sebelah kiri.
  2. Pada halaman Kontrol akses (IAM) , pilih + Tambahkan ->Tambahkan penetapan peran pada menu.
  3. Pada halaman Jenis penugasan , pilih Berikutnya.
  4. Pada halaman Peran, pilih Pembaca Registri Skema (Pratinjau), lalu pilih Berikutnya di bagian bawah halaman.
  5. Gunakan tautan + Pilih anggota untuk menambahkan akun pengguna Anda ke peran, lalu pilih Berikutnya.
  6. Pada halaman Tinjau + tetapkan , pilih Tinjau + tetapkan.

Hasilkan peristiwa ke hub peristiwa dengan validasi skema

Buat aplikasi konsol untuk produsen peristiwa

  1. Mulai Visual Studio 2019.
  2. Pilih Buat proyek baru.
  3. Pada kotak dialog Buat proyek baru, lakukan langkah-langkah berikut ini: Jika Anda tidak melihat kotak dialog ini, pilih File pada menu, pilih Baru, lalu pilih Proyek.
    1. Untuk C# untuk bahasa pemrograman.

    2. Pilih Konsol untuk jenis aplikasi.

    3. Pilih Aplikasi Konsol dari daftar hasil.

    4. Kemudian, pilih Berikutnya.

      Gambar yang menampilkan kotak dialog Proyek Baru.

  4. Masukkan OrderProducer untuk nama proyek, SRQuickStart untuk nama solusi, lalu pilih OK untuk membuat proyek.

Menambahkan paket Azure Event Hubs NuGet

  1. Pilih Alat>NuGet Package Manager>Package Manager Console dari menu.

  2. Jalankan perintah berikut untuk menginstal Azure.Messaging.EventHubs dan paket NuGet lainnya. Tekan ENTER untuk menjalankan perintah terakhir.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Autentikasikan aplikasi produsen agar terhubung ke Azure melalui Visual Studio seperti yang ditunjukkan di sini.

  4. Masuk ke Azure menggunakan akun pengguna yang merupakan anggota Schema Registry Reader peran di tingkat namespace. Untuk informasi tentang peran registri skema, lihat Azure Schema Registry di Azure Event Hubs.

Pembuatan kode menggunakan skema Avro

  1. Gunakan konten yang sama dengan yang Anda gunakan untuk membuat skema untuk membuat file bernama Order.avsc. Simpan file dalam folder proyek atau solusi.
  2. Kemudian Anda dapat menggunakan file skema ini untuk menghasilkan kode untuk .NET. Anda dapat menggunakan alat pembuat kode eksternal seperti avrogen untuk pembuatan kode. Misalnya, Anda dapat menjalankan avrogen -s .\Order.avsc . untuk menghasilkan kode.
  3. Setelah membuat kode, Anda akan melihat file bernama Order.cs di \Microsoft\Azure\Data\SchemaRegistry\example folder . Untuk skema Avro di atas, skema membuat jenis C# di Microsoft.Azure.Data.SchemaRegistry.example namespace.
  4. Order.cs Tambahkan file ke OrderProducer proyek.

Tulis kode untuk melakukan serialisasi dan mengirim peristiwa ke hub peristiwa

  1. Tambahkan kode berikut ke file Program.cs. Lihat komentar kode untuk mengetahui detailnya. Langkah-langkah tingkat tinggi dalam kode adalah:

    1. Buat klien produsen yang dapat Anda gunakan untuk mengirim peristiwa ke pusat aktivitas.
    2. Buat klien registri skema yang dapat Anda gunakan untuk membuat serialisasi dan memvalidasi data dalam Order objek.
    3. Buat objek baru Order menggunakan jenis yang dihasilkan Order .
    4. Gunakan klien registri skema untuk membuat serialisasi Order objek ke EventData.
    5. Buat batch peristiwa.
    6. Tambahkan data peristiwa ke batch peristiwa.
    7. Gunakan klien produser untuk mengirim batch peristiwa ke pusat aktivitas.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Ganti nilai tempat penampung berikut dengan nilai riil.

    • EVENTHUBSNAMESPACECONNECTIONSTRING - string koneksi untuk namespace layanan Azure Event Hubs
    • EVENTHUBNAME - nama pusat aktivitas
    • EVENTHUBSNAMESPACENAME - nama namespace layanan Azure Event Hubs
    • SCHEMAGROUPNAME - nama grup skema
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Bangun program, dan pastikan tidak ada kesalahan.

  4. Jalankan program dan tunggu pesan konfirmasi.

    A batch of 1 order has been published.
    
  5. Di portal Microsoft Azure, Anda dapat memverifikasi bahwa pusat aktivitas menerima acara. Beralih ke tampilan Pesan di bagian Metrik. Refresh halaman untuk memperbarui bagan. Mungkin perlu beberapa detik untuk menunjukkan bahwa pesan telah diterima.

    Gambar halaman portal Microsoft Azure untuk memverifikasi bahwa hub peristiwa menerima acara.

Gunakan peristiwa dari hub peristiwa dengan validasi skema

Bagian ini menunjukkan cara menulis aplikasi konsol .NET Core yang menerima peristiwa dari pusat aktivitas dan menggunakan registri skema untuk mendeserialisasi data peristiwa.

Prasyarat tambahan

  • Buat akun penyimpanan untuk digunakan prosesor peristiwa.

Buat aplikasi konsumen

  1. Di jendela Penjelajah Solusi, klik kanan solusi SRQuickStart, arahkan ke Tambahkan, dan pilih Proyek Baru.
  2. Pilih Aplikasi konsol, lalu pilih Berikutnya.
  3. Masukkan OrderConsumer untuk Nama proyek, dan pilih Buat.
  4. Di jendela Penjelajah Solusi, klik kanan OrderConsumer, dan pilih Tetapkan sebagai Proyek Startup.

Menambahkan paket Azure Event Hubs NuGet

  1. Pilih Alat>NuGet Package Manager>Package Manager Console dari menu.

  2. Di jendela Konsol Manajer Paket, konfirmasikan bahwa OrderConsumer dipilih untuk Proyek default. Jika tidak, gunakan daftar drop-down untuk memilih OrderConsumer.

  3. Jalankan perintah berikut untuk menginstal paket NuGet yang diperlukan. Tekan ENTER untuk menjalankan perintah terakhir.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Autentikasikan aplikasi produsen agar terhubung ke Azure melalui Visual Studio seperti yang ditunjukkan di sini.

  5. Masuk ke Azure menggunakan akun pengguna yang merupakan anggota Schema Registry Reader peran di tingkat namespace. Untuk informasi tentang peran registri skema, lihat Azure Schema Registry di Azure Event Hubs.

  6. Tambahkan file yang Order.cs Anda buat sebagai bagian dari pembuatan aplikasi produsen ke proyek OrderConsumer .

  7. Klik kanan proyek OrderConsumer , dan pilih Tetapkan sebagai proyek Startup.

Tulis kode untuk menerima peristiwa dan mendeserialisasi menggunakan Schema Registry

  1. Tambahkan kode berikut ke file Program.cs. Lihat komentar kode untuk mengetahui detailnya. Langkah-langkah tingkat tinggi dalam kode adalah:

    1. Buat klien konsumen yang dapat Anda gunakan untuk mengirim peristiwa ke pusat aktivitas.
    2. Buat klien kontainer blob untuk kontainer blob di penyimpanan blob Azure.
    3. Buat klien prosesor peristiwa dan daftarkan penanganan aktivitas dan kesalahan.
    4. Di penanganan aktivitas, buat klien registri skema yang dapat Anda gunakan untuk mendeserialisasi data peristiwa ke dalam Order objek.
    5. Deserialisasi data peristiwa ke dalam Order objek menggunakan serializer.
    6. Cetak informasi tentang pesanan yang diterima.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Ganti nilai tempat penampung berikut dengan nilai riil.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - string koneksi untuk namespace layanan Azure Event Hubs
    • EVENTHUBNAME - nama pusat aktivitas
    • EVENTHUBSNAMESPACENAME - nama namespace layanan Azure Event Hubs
    • SCHEMAGROUPNAME - nama grup skema
    • AZURESTORAGECONNECTIONSTRING - string koneksi untuk akun penyimpanan Azure
    • BLOBCONTAINERNAME - Nama kontainer blob
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Bangun program, dan pastikan tidak ada kesalahan.

  4. Jalankan aplikasi penerima.

  5. Anda akan melihat pesan bahwa acara telah diterima.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Peristiwa ini adalah tiga acara yang Anda kirim ke pusat acara sebelumnya dengan menjalankan program pengirim.

Sampel

Lihat artikel Readme di repositori GitHub kami.

Membersihkan sumber daya

Hapus namespace layanan Azure Event Hubs atau hapus grup sumber daya yang berisi namespace layanan.

Langkah berikutnya