Memvalidasi menggunakan skema Avro saat streaming peristiwa menggunakan Event Hubs .NET SDK (AMQP)
Dalam mulai cepat ini, Anda mempelajari cara mengirim peristiwa ke dan menerima peristiwa dari pusat aktivitas dengan validasi skema menggunakan pustaka .NET Azure.Messaging.EventHubs .
Catatan
Azure Schema Registry adalah fitur Azure Event Hubs, yang menyediakan repositori pusat untuk skema pada aplikasi yang digerakkan oleh peristiwa dan pesan-sentris. Hal ini memberikan fleksibilitas bagi aplikasi produsen dan konsumen Anda untuk bertukar data tanpa harus mengelola dan berbagi skema. Schema Registry juga menyediakan kerangka kerja tata kelola sederhana untuk skema yang dapat digunakan kembali dan mendefinisikan hubungan antara skema melalui konstruksi pengelompokan (kelompok skema). Untuk informasi selengkapnya, lihat Azure Schema Registry di Pusat Aktivitas.
Prasyarat
Jika Anda baru menggunakan Azure Event Hubs, lihat Ringkasan Event Hubs sebelum Anda melakukan mulai cepat ini.
Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:
- Jika Anda tidak memiliki langganan Azure, buatlah akun gratis sebelum Anda memulai.
-
Microsoft Visual Studio 2022.
Pustaka klien Azure Event Hubs memanfaatkan fitur baru yang diperkenalkan di C# 8.0. Anda masih dapat menggunakan pustaka dengan versi bahasa C# sebelumnya, tetapi sintaks baru tidak tersedia. Untuk menggunakan sintaks lengkap, kami menyarankan Anda untuk mengompilasi dengan .NET Core SDK 3.0 atau lebih tinggi dan versi bahasa diatur ke
latest
. Jika Anda menggunakan Visual Studio, versi sebelum Visual Studio 2019 tidak kompatibel dengan alat yang diperlukan untuk membuat proyek C# 8.0. Visual Studio 2019, termasuk edisi Komunitas gratis, dapat diunduh di sini.
Membuat pusat aktivitas
Ikuti instruksi dari mulai cepat: Membuat namespace layanan Azure Event Hubs dan pusat aktivitas untuk membuat namespace layanan Azure Event Hubs dan pusat aktivitas. Kemudian, ikuti instruksi dari Dapatkan string koneksi untuk mendapatkan string koneksi ke namespace layanan Azure Event Hubs Anda.
Catat pengaturan berikut yang akan Anda gunakan di mulai cepat saat ini:
- String koneksi untuk namespace Azure Event Hubs
- Nama hub peristiwa
Buat skema
Ikuti instruksi dari Membuat skema menggunakan Schema Registry untuk membuat grup skema dan skema.
Buat grup skema bernama contoso-sg menggunakan portal Schema Registry. Gunakan Avro sebagai jenis serialisasi dan Tidak ada untuk mode kompatibilitas.
Di grup skema tersebut, buat skema Avro baru dengan nama skema:
Microsoft.Azure.Data.SchemaRegistry.example.Order
menggunakan konten skema berikut.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Menambahkan pengguna ke peran Pembaca Schema Registry
Tambahkan akun pengguna Anda ke peran Pembaca Schema Registry di tingkat namespace. Anda juga dapat menggunakan peran Kontributor Registri Skema , tetapi itu tidak diperlukan untuk mulai cepat ini.
- Pada halaman Namespace Layanan Pusat Aktivitas , pilih Kontrol akses (IAM) di menu sebelah kiri.
- Pada halaman Kontrol akses (IAM) , pilih + Tambahkan ->Tambahkan penetapan peran pada menu.
- Pada halaman Jenis penugasan , pilih Berikutnya.
- Pada halaman Peran, pilih Pembaca Registri Skema (Pratinjau), lalu pilih Berikutnya di bagian bawah halaman.
- Gunakan tautan + Pilih anggota untuk menambahkan akun pengguna Anda ke peran, lalu pilih Berikutnya.
- Pada halaman Tinjau + tetapkan , pilih Tinjau + tetapkan.
Hasilkan peristiwa ke hub peristiwa dengan validasi skema
Buat aplikasi konsol untuk produsen peristiwa
- Mulai Visual Studio 2019.
- Pilih Buat proyek baru.
- Pada kotak dialog Buat proyek baru, lakukan langkah-langkah berikut ini: Jika Anda tidak melihat kotak dialog ini, pilih File pada menu, pilih Baru, lalu pilih Proyek.
Untuk C# untuk bahasa pemrograman.
Pilih Konsol untuk jenis aplikasi.
Pilih Aplikasi Konsol dari daftar hasil.
Kemudian, pilih Berikutnya.
- Masukkan OrderProducer untuk nama proyek, SRQuickStart untuk nama solusi, lalu pilih OK untuk membuat proyek.
Menambahkan paket Azure Event Hubs NuGet
Pilih Alat>NuGet Package Manager>Package Manager Console dari menu.
Jalankan perintah berikut untuk menginstal Azure.Messaging.EventHubs dan paket NuGet lainnya. Tekan ENTER untuk menjalankan perintah terakhir.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Autentikasikan aplikasi produsen agar terhubung ke Azure melalui Visual Studio seperti yang ditunjukkan di sini.
Masuk ke Azure menggunakan akun pengguna yang merupakan anggota
Schema Registry Reader
peran di tingkat namespace. Untuk informasi tentang peran registri skema, lihat Azure Schema Registry di Azure Event Hubs.
Pembuatan kode menggunakan skema Avro
- Gunakan konten yang sama dengan yang Anda gunakan untuk membuat skema untuk membuat file bernama
Order.avsc
. Simpan file dalam folder proyek atau solusi. - Kemudian Anda dapat menggunakan file skema ini untuk menghasilkan kode untuk .NET. Anda dapat menggunakan alat pembuat kode eksternal seperti avrogen untuk pembuatan kode. Misalnya, Anda dapat menjalankan
avrogen -s .\Order.avsc .
untuk menghasilkan kode. - Setelah membuat kode, Anda akan melihat file bernama
Order.cs
di\Microsoft\Azure\Data\SchemaRegistry\example
folder . Untuk skema Avro di atas, skema membuat jenis C# diMicrosoft.Azure.Data.SchemaRegistry.example
namespace. -
Order.cs
Tambahkan file keOrderProducer
proyek.
Tulis kode untuk melakukan serialisasi dan mengirim peristiwa ke hub peristiwa
Tambahkan kode berikut ke file
Program.cs
. Lihat komentar kode untuk mengetahui detailnya. Langkah-langkah tingkat tinggi dalam kode adalah:- Buat klien produsen yang dapat Anda gunakan untuk mengirim peristiwa ke pusat aktivitas.
- Buat klien registri skema yang dapat Anda gunakan untuk membuat serialisasi dan memvalidasi data dalam
Order
objek. - Buat objek baru
Order
menggunakan jenis yang dihasilkanOrder
. - Gunakan klien registri skema untuk membuat serialisasi
Order
objek keEventData
. - Buat batch peristiwa.
- Tambahkan data peristiwa ke batch peristiwa.
- Gunakan klien produser untuk mengirim batch peristiwa ke pusat aktivitas.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
Ganti nilai tempat penampung berikut dengan nilai riil.
-
EVENTHUBSNAMESPACECONNECTIONSTRING
- string koneksi untuk namespace layanan Azure Event Hubs -
EVENTHUBNAME
- nama pusat aktivitas -
EVENTHUBSNAMESPACENAME
- nama namespace layanan Azure Event Hubs -
SCHEMAGROUPNAME
- nama grup skema
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
-
Bangun program, dan pastikan tidak ada kesalahan.
Jalankan program dan tunggu pesan konfirmasi.
A batch of 1 order has been published.
Di portal Microsoft Azure, Anda dapat memverifikasi bahwa pusat aktivitas menerima acara. Beralih ke tampilan Pesan di bagian Metrik. Refresh halaman untuk memperbarui bagan. Mungkin perlu beberapa detik untuk menunjukkan bahwa pesan telah diterima.
Gunakan peristiwa dari hub peristiwa dengan validasi skema
Bagian ini menunjukkan cara menulis aplikasi konsol .NET Core yang menerima peristiwa dari pusat aktivitas dan menggunakan registri skema untuk mendeserialisasi data peristiwa.
Prasyarat tambahan
- Buat akun penyimpanan untuk digunakan prosesor peristiwa.
Buat aplikasi konsumen
- Di jendela Penjelajah Solusi, klik kanan solusi SRQuickStart, arahkan ke Tambahkan, dan pilih Proyek Baru.
- Pilih Aplikasi konsol, lalu pilih Berikutnya.
- Masukkan OrderConsumer untuk Nama proyek, dan pilih Buat.
- Di jendela Penjelajah Solusi, klik kanan OrderConsumer, dan pilih Tetapkan sebagai Proyek Startup.
Menambahkan paket Azure Event Hubs NuGet
Pilih Alat>NuGet Package Manager>Package Manager Console dari menu.
Di jendela Konsol Manajer Paket, konfirmasikan bahwa OrderConsumer dipilih untuk Proyek default. Jika tidak, gunakan daftar drop-down untuk memilih OrderConsumer.
Jalankan perintah berikut untuk menginstal paket NuGet yang diperlukan. Tekan ENTER untuk menjalankan perintah terakhir.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Autentikasikan aplikasi produsen agar terhubung ke Azure melalui Visual Studio seperti yang ditunjukkan di sini.
Masuk ke Azure menggunakan akun pengguna yang merupakan anggota
Schema Registry Reader
peran di tingkat namespace. Untuk informasi tentang peran registri skema, lihat Azure Schema Registry di Azure Event Hubs.Tambahkan file yang
Order.cs
Anda buat sebagai bagian dari pembuatan aplikasi produsen ke proyek OrderConsumer .Klik kanan proyek OrderConsumer , dan pilih Tetapkan sebagai proyek Startup.
Tulis kode untuk menerima peristiwa dan mendeserialisasi menggunakan Schema Registry
Tambahkan kode berikut ke file
Program.cs
. Lihat komentar kode untuk mengetahui detailnya. Langkah-langkah tingkat tinggi dalam kode adalah:- Buat klien konsumen yang dapat Anda gunakan untuk mengirim peristiwa ke pusat aktivitas.
- Buat klien kontainer blob untuk kontainer blob di penyimpanan blob Azure.
- Buat klien prosesor peristiwa dan daftarkan penanganan aktivitas dan kesalahan.
- Di penanganan aktivitas, buat klien registri skema yang dapat Anda gunakan untuk mendeserialisasi data peristiwa ke dalam
Order
objek. - Deserialisasi data peristiwa ke dalam
Order
objek menggunakan serializer. - Cetak informasi tentang pesanan yang diterima.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be userd as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Ganti nilai tempat penampung berikut dengan nilai riil.
-
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- string koneksi untuk namespace layanan Azure Event Hubs -
EVENTHUBNAME
- nama pusat aktivitas -
EVENTHUBSNAMESPACENAME
- nama namespace layanan Azure Event Hubs -
SCHEMAGROUPNAME
- nama grup skema -
AZURESTORAGECONNECTIONSTRING
- string koneksi untuk akun penyimpanan Azure -
BLOBCONTAINERNAME
- Nama kontainer blob
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
-
Bangun program, dan pastikan tidak ada kesalahan.
Jalankan aplikasi penerima.
Anda akan melihat pesan bahwa acara telah diterima.
Received order with ID: 1234, amount: 45.29, description: First sample order.
Peristiwa ini adalah tiga acara yang Anda kirim ke pusat acara sebelumnya dengan menjalankan program pengirim.
Sampel
Lihat artikel Readme di repositori GitHub kami.
Membersihkan sumber daya
Hapus namespace layanan Azure Event Hubs atau hapus grup sumber daya yang berisi namespace layanan.