Az események Event Hubs .NET SDK-kkal (AMQP) történő streamelésének ellenőrzése Avro-sémával
Ebben a rövid útmutatóban megtudhatja, hogyan küldhet eseményeket egy eseményközpontba, és hogyan fogadhat eseményeket sémaérvényesítéssel az Azure.Messaging.EventHubs .NET-kódtár használatával.
Megjegyzés
Az Azure Schema Registry az Event Hubs szolgáltatása, amely központi adattárat biztosít az eseményvezérelt és üzenetkezelés-központú alkalmazások sémáihoz. Rugalmasan biztosítja a gyártói és fogyasztói alkalmazások számára az adatok cseréjét anélkül, hogy kezelnie és meg kellene osztania a sémát. Emellett egy egyszerű szabályozási keretrendszert is biztosít az újrafelhasználható sémákhoz, és egy csoportosítási szerkezeten (sémacsoportokon) keresztül definiálja a sémák közötti kapcsolatot. További információ: Azure Schema Registry az Event Hubsban.
Előfeltételek
Ha még nem ismerkedik a Azure Event Hubs, a rövid útmutató előtt tekintse meg az Event Hubs áttekintését.
A rövid útmutató elvégzéséhez a következő előfeltételekre van szüksége:
- Ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy ingyenes fiókot .
- Microsoft Visual Studio 2022.
A Azure Event Hubs ügyfélkódtár a C# 8.0-s verziójában bevezetett új funkciókat használja. A kódtárat továbbra is használhatja a korábbi C#-nyelvi verziókkal, de az új szintaxis nem érhető el. A teljes szintaxis használatához azt javasoljuk, hogy a .NET Core SDK 3.0-s vagy újabb verziójával, a nyelvi verzió pedig a következőre
latest
legyen állítva: . A Visual Studio használata esetén a Visual Studio 2019 előtti verziók nem kompatibilisek a C# 8.0-projektek létrehozásához szükséges eszközökkel. A Visual Studio 2019, beleértve az ingyenes közösségi kiadást is, itt tölthető le.
Eseményközpont létrehozása
Kövesse a rövid útmutató utasításait: Event Hubs-névtér és eseményközpont létrehozása az Event Hubs-névtér és egy eseményközpont létrehozásához. Ezután kövesse a Get the kapcsolati sztring (Kapcsolati sztring) című cikk utasításait, hogy kapcsolati sztring kapjon az Event Hubs-névtérhez.
Jegyezze fel az alábbi beállításokat, amelyeket az aktuális rövid útmutatóban fog használni:
- Az Event Hubs-névtér kapcsolati sztringje
- Az eseményközpont neve
Séma létrehozása
Sémacsoport és séma létrehozásához kövesse a Séma létrehozása a Sémaregisztrációs adatbázissal című cikk utasításait.
Hozzon létre egy contoso-sg nevű sémacsoportot a Sémaregisztrációs portál használatával. A kompatibilitási módhoz használja az Avro típust szerializálási típusként, a Nincs típust.
Ebben a sémacsoportban hozzon létre egy új Avro-sémát sémanévvel:
Microsoft.Azure.Data.SchemaRegistry.example.Order
a következő sématartalommal.{ "namespace": "Microsoft.Azure.Data.SchemaRegistry.example", "type": "record", "name": "Order", "fields": [ { "name": "id", "type": "string" }, { "name": "amount", "type": "double" }, { "name": "description", "type": "string" } ] }
Felhasználó hozzáadása sémaregisztrációs adatbázis-olvasó szerepkörhöz
Adja hozzá a felhasználói fiókját a sémaregisztrációs adatbázis-olvasó szerepkörhöz a névtér szintjén. A sémaregisztrációs adatbázis közreműködője szerepkört is használhatja, de ehhez a rövid útmutatóhoz nem szükséges.
- Az Event Hubs-névtér lapon válassza a hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.
- A Hozzáférés-vezérlés (IAM) lapon válassza a + Szerepkör-hozzárendelés>hozzáadása lehetőséget a menüben.
- A Hozzárendelés típusa lapon válassza a Tovább gombot.
- A Szerepkörök lapon válassza a Sémaregisztrációs adatbázis olvasója (előzetes verzió) lehetőséget, majd válassza a Tovább gombot a lap alján.
- A + Tagok kiválasztása hivatkozás használatával adja hozzá a felhasználói fiókját a szerepkörhöz, majd válassza a Tovább gombot.
- A Véleményezés + hozzárendelés lapon válassza a Véleményezés + hozzárendelés lehetőséget.
Események létrehozása az eseményközpontokba sémaérvényesítéssel
Konzolalkalmazás létrehozása eseménykészítőhöz
- Indítsa el a Visual Studio 2019-et.
- Válassza az Új projekt létrehozása lehetőséget.
- Az Új projekt létrehozása párbeszédpanelen hajtsa végre a következő lépéseket: Ha nem látja ezt a párbeszédpanelt, válassza a Menü Fájl elemét, válassza az Új, majd a Projekt lehetőséget.
A programozási nyelvhez válassza a C# lehetőséget.
Az alkalmazás típusához válassza a Konzol lehetőséget.
Válassza a Konzolalkalmazás lehetőséget az eredmények listájából.
Ezután válassza a Tovább gombot.
- Adja meg az OrderProducer nevet a projektnévhez, a megoldásnévhez pedig az SRQuickStart értéket, majd kattintson az OK gombra a projekt létrehozásához.
Az Event Hubs NuGet-csomag hozzáadása
A menüben válassza az Eszközök>NuGet Package Manager>Csomagkezelő konzol lehetőséget.
Futtassa az alábbi parancsokat az Azure.Messaging.EventHubs és más NuGet-csomagok telepítéséhez. Nyomja le az ENTER billentyűt az utolsó parancs futtatásához.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Az itt látható módon hitelesítse az előállító alkalmazásokat az Azure-hoz való csatlakozáshoz a Visual Studióval.
Jelentkezzen be az Azure-ba azzal a felhasználói fiókkal, amely a szerepkör tagja a
Schema Registry Reader
névtér szintjén. További információ a sémaregisztrációs adatbázis szerepköreiről: Azure Schema Registry az Event Hubsban.
Kódlétrehozás az Avro-sémával
- Használja ugyanazt a tartalmat, amelyet a séma létrehozásához használt egy nevű
Order.avsc
fájl létrehozásához. Mentse a fájlt a projekt vagy a megoldás mappájába. - Ezután ezzel a sémafájllal létrehozhat kódot a .NET-hez. A kódgeneráláshoz bármilyen külső kódgenerálási eszközt használhat, például avrogent . Futtathatja
avrogen -s .\Order.avsc .
például a kódot. - A kód létrehozása után megjelenik a nevű
Order.cs
fájl a\Microsoft\Azure\Data\SchemaRegistry\example
mappában. A fenti Avro-séma esetében létrehozza a C#-típusokat a névtérbenMicrosoft.Azure.Data.SchemaRegistry.example
. - Adja hozzá a
Order.cs
fájlt aOrderProducer
projekthez.
Kód írása események szerializálásához és az eseményközpontba való küldéséhez
Adja hozzá a következő kódot a
Program.cs
fájlhoz. A részletekért tekintse meg a kód megjegyzéseit. A kód magas szintű lépései a következők:- Hozzon létre egy gyártói ügyfelet, amellyel eseményeket küldhet egy eseményközpontba.
- Hozzon létre egy sémaregisztrációs ügyfélprogramot, amellyel szerializálhatja és ellenőrizheti az
Order
objektumban lévő adatokat. - Hozzon létre egy új
Order
objektumot a létrehozottOrder
típussal. - A sémaregisztrációs ügyfélalkalmazással szerializálja az objektumot a
Order
következőre:EventData
. - Hozzon létre egy eseményköteget.
- Adja hozzá az eseményadatokat az esemény kötegéhez.
- Az előállító ügyfél használatával küldje el az események kötegét az eseményközpontba.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Producer; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // The Event Hubs client types are safe to cache and use as a singleton for the lifetime // of the application, which is best practice when events are being published or read regularly. EventHubProducerClient producerClient; // Create a producer client that you can use to send events to an event hub producerClient = new EventHubProducerClient(connectionString, eventHubName); // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Create a new order object using the generated type/class 'Order'. var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." }; EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData)); // Create a batch of events using EventDataBatch eventBatch = await producerClient.CreateBatchAsync(); // Add the event data to the event batch. eventBatch.TryAdd(eventData); // Send the batch of events to the event hub. await producerClient.SendAsync(eventBatch); Console.WriteLine("A batch of 1 order has been published.");
Cserélje le a következő helyőrző értékeket a valós értékekre.
EVENTHUBSNAMESPACECONNECTIONSTRING
- kapcsolati sztring az Event Hubs-névtérhezEVENTHUBNAME
- az eseményközpont neveEVENTHUBSNAMESPACENAME
- az Event Hubs-névtér neveSCHEMAGROUPNAME
- a sémacsoport neve
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME";
Hozza létre a projektet, és győződjön meg arról, hogy nincsenek hibák.
Futtassa a programot, és várja meg a megerősítési üzenetet.
A batch of 1 order has been published.
A Azure Portal ellenőrizheti, hogy az eseményközpont megkapta-e az eseményeket. Váltson Üzenetek nézetre a Metrikák szakaszban. Frissítse a lapot a diagram frissítéséhez. Az üzenetek fogadásának megjelenítése eltarthat néhány másodpercig.
Eseményközpontokból származó események felhasználása sémaérvényesítéssel
Ez a szakasz bemutatja, hogyan írhat egy .NET Core-konzolalkalmazást, amely eseményeket fogad egy eseményközpontból, és hogyan használhatja a sémaregisztrációs adatbázist az eseményadatok deszerializálására.
További előfeltételek
- Hozza létre az eseményfeldolgozóhoz használni kívánt tárfiókot.
Fogyasztói alkalmazás létrehozása
- Az Megoldáskezelő ablakban kattintson a jobb gombbal az SRQuickStart megoldásra, mutasson a Hozzáadás pontra, és válassza az Új projekt lehetőséget.
- Válassza a Konzolalkalmazás, majd a Tovább gombot.
- Adja meg az OrderConsumer nevet a Projekt neve mezőben, majd válassza a Létrehozás lehetőséget.
- Az Megoldáskezelő ablakban kattintson a jobb gombbal az OrderConsumer elemre, és válassza a Beállítás indítási projektként lehetőséget.
Az Event Hubs NuGet-csomag hozzáadása
A menüben válassza az Eszközök>NuGet Package Manager>Csomagkezelő konzol lehetőséget.
A Csomagkezelő konzol ablakában győződjön meg arról, hogy az OrderConsumer be van jelölve az Alapértelmezett projekthez. Ha nem, a legördülő listában válassza az OrderConsumer lehetőséget.
Futtassa a következő parancsot a szükséges NuGet-csomagok telepítéséhez. Nyomja le az ENTER billentyűt az utolsó parancs futtatásához.
Install-Package Azure.Messaging.EventHubs Install-Package Azure.Messaging.EventHubs.Processor Install-Package Azure.Identity Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro Install-Package Azure.ResourceManager.Compute
Az itt látható módon hitelesítse az előállító alkalmazásokat az Azure-hoz való csatlakozáshoz a Visual Studióval.
Jelentkezzen be az Azure-ba azzal a felhasználói fiókkal, amely a szerepkör tagja a
Schema Registry Reader
névtér szintjén. További információ a sémaregisztrációs adatbázis szerepköreiről: Azure Schema Registry az Event Hubsban.Adja hozzá az
Order.cs
előállító alkalmazás létrehozása során létrehozott fájlt az OrderConsumer projekthez.Kattintson a jobb gombbal az OrderConsumer projekt elemre, és válassza a Beállítás indítási projektként lehetőséget.
Kód írása események fogadásához és deszerializálásához a Sémaregisztrációs adatbázis használatával
Adja hozzá a következő kódot a
Program.cs
fájlhoz. A részletekért tekintse meg a kód megjegyzéseit. A kód magas szintű lépései a következők:- Hozzon létre egy fogyasztói ügyfelet, amellyel eseményeket küldhet egy eseményközpontba.
- Hozzon létre egy blobtároló-ügyfelet a blobtárolóhoz az Azure Blob Storage-ban.
- Hozzon létre egy eseményfeldolgozó ügyfelet, és regisztrálja az esemény- és hibakezelőket.
- Az eseménykezelőben hozzon létre egy sémaregisztrációs ügyfélprogramot, amellyel deszerializálhatja az eseményadatokat egy
Order
objektumba. - Deszerializálja az eseményadatokat egy
Order
objektumba a szerializáló használatával. - A kapott rendelés adatainak nyomtatása.
using Azure.Data.SchemaRegistry; using Azure.Identity; using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro; using Azure.Storage.Blobs; using Azure.Messaging.EventHubs; using Azure.Messaging.EventHubs.Consumer; using Azure.Messaging.EventHubs.Processor; using Microsoft.Azure.Data.SchemaRegistry.example; // connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // connection string for the Azure Storage account const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // name of the blob container that will be userd as a checkpoint store const string blobContainerName = "BLOBCONTAINERNAME"; // Create a blob container client that the event processor will use BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName); // Create an event processor client to process events in the event hub EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName); // Register handlers for processing events and handling errors processor.ProcessEventAsync += ProcessEventHandler; processor.ProcessErrorAsync += ProcessErrorHandler; // Start the processing await processor.StartProcessingAsync(); // Wait for 30 seconds for the events to be processed await Task.Delay(TimeSpan.FromSeconds(30)); // Stop the processing await processor.StopProcessingAsync(); static async Task ProcessEventHandler(ProcessEventArgs eventArgs) { // Create a schema registry client that you can use to serialize and validate data. var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential()); // Create an Avro object serializer using the Schema Registry client object. var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true }); // Deserialized data in the received event using the schema Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order)); // Print the received event Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}"); await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken); } static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs) { // Write details about the error to the console window Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen."); Console.WriteLine(eventArgs.Exception.Message); return Task.CompletedTask; }
Cserélje le a következő helyőrző értékeket a valós értékekre.
EVENTHUBSNAMESPACE-CONNECTIONSTRING
- kapcsolati sztring az Event Hubs-névtérhezEVENTHUBNAME
- az eseményközpont neveEVENTHUBSNAMESPACENAME
- az Event Hubs-névtér neveSCHEMAGROUPNAME
- a sémacsoport neveAZURESTORAGECONNECTIONSTRING
- kapcsolati sztring az Azure Storage-fiókhozBLOBCONTAINERNAME
- A blobtároló neve
// connection string to the Event Hubs namespace const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING"; // name of the event hub const string eventHubName = "EVENTHUBNAME"; // Schema Registry endpoint const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net"; // name of the consumer group const string schemaGroup = "SCHEMAGROUPNAME"; // Azure storage connection string const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING"; // Azure blob container name const string blobContainerName = "BLOBCONTAINERNAME";
Hozza létre a projektet, és győződjön meg arról, hogy nincsenek hibák.
Futtassa a fogadóalkalmazást.
Meg kell jelennie egy üzenetnek, amely az események fogadását ismerteti.
Received order with ID: 1234, amount: 45.29, description: First sample order.
Ezek az események a küldőprogram futtatásával korábban az eseményközpontba küldött három esemény.
Példák
Tekintse meg a Readme cikket a GitHub-adattárban.
Az erőforrások eltávolítása
Törölje az Event Hubs-névteret, vagy törölje a névteret tartalmazó erőforráscsoportot.