Share via


Az események Event Hubs .NET SDK-kkal (AMQP) történő streamelésének ellenőrzése Avro-sémával

Ebben a rövid útmutatóban megtudhatja, hogyan küldhet eseményeket egy eseményközpontba, és hogyan fogadhat eseményeket sémaérvényesítéssel az Azure.Messaging.EventHubs .NET-kódtár használatával.

Megjegyzés

Az Azure Schema Registry az Event Hubs szolgáltatása, amely központi adattárat biztosít az eseményvezérelt és üzenetkezelés-központú alkalmazások sémáihoz. Rugalmasan biztosítja a gyártói és fogyasztói alkalmazások számára az adatok cseréjét anélkül, hogy kezelnie és meg kellene osztania a sémát. Emellett egy egyszerű szabályozási keretrendszert is biztosít az újrafelhasználható sémákhoz, és egy csoportosítási szerkezeten (sémacsoportokon) keresztül definiálja a sémák közötti kapcsolatot. További információ: Azure Schema Registry az Event Hubsban.

Előfeltételek

Ha még nem ismerkedik a Azure Event Hubs, a rövid útmutató előtt tekintse meg az Event Hubs áttekintését.

A rövid útmutató elvégzéséhez a következő előfeltételekre van szüksége:

  • Ha nem rendelkezik Azure-előfizetéssel, a kezdés előtt hozzon létre egy ingyenes fiókot .
  • Microsoft Visual Studio 2022. A Azure Event Hubs ügyfélkódtár a C# 8.0-s verziójában bevezetett új funkciókat használja. A kódtárat továbbra is használhatja a korábbi C#-nyelvi verziókkal, de az új szintaxis nem érhető el. A teljes szintaxis használatához azt javasoljuk, hogy a .NET Core SDK 3.0-s vagy újabb verziójával, a nyelvi verzió pedig a következőre latestlegyen állítva: . A Visual Studio használata esetén a Visual Studio 2019 előtti verziók nem kompatibilisek a C# 8.0-projektek létrehozásához szükséges eszközökkel. A Visual Studio 2019, beleértve az ingyenes közösségi kiadást is, itt tölthető le.

Eseményközpont létrehozása

Kövesse a rövid útmutató utasításait: Event Hubs-névtér és eseményközpont létrehozása az Event Hubs-névtér és egy eseményközpont létrehozásához. Ezután kövesse a Get the kapcsolati sztring (Kapcsolati sztring) című cikk utasításait, hogy kapcsolati sztring kapjon az Event Hubs-névtérhez.

Jegyezze fel az alábbi beállításokat, amelyeket az aktuális rövid útmutatóban fog használni:

  • Az Event Hubs-névtér kapcsolati sztringje
  • Az eseményközpont neve

Séma létrehozása

Sémacsoport és séma létrehozásához kövesse a Séma létrehozása a Sémaregisztrációs adatbázissal című cikk utasításait.

  1. Hozzon létre egy contoso-sg nevű sémacsoportot a Sémaregisztrációs portál használatával. A kompatibilitási módhoz használja az Avro típust szerializálási típusként, a Nincs típust.

  2. Ebben a sémacsoportban hozzon létre egy új Avro-sémát sémanévvel: Microsoft.Azure.Data.SchemaRegistry.example.Order a következő sématartalommal.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Felhasználó hozzáadása sémaregisztrációs adatbázis-olvasó szerepkörhöz

Adja hozzá a felhasználói fiókját a sémaregisztrációs adatbázis-olvasó szerepkörhöz a névtér szintjén. A sémaregisztrációs adatbázis közreműködője szerepkört is használhatja, de ehhez a rövid útmutatóhoz nem szükséges.

  1. Az Event Hubs-névtér lapon válassza a hozzáférés-vezérlés (IAM) lehetőséget a bal oldali menüben.
  2. A Hozzáférés-vezérlés (IAM) lapon válassza a + Szerepkör-hozzárendelés>hozzáadása lehetőséget a menüben.
  3. A Hozzárendelés típusa lapon válassza a Tovább gombot.
  4. A Szerepkörök lapon válassza a Sémaregisztrációs adatbázis olvasója (előzetes verzió) lehetőséget, majd válassza a Tovább gombot a lap alján.
  5. A + Tagok kiválasztása hivatkozás használatával adja hozzá a felhasználói fiókját a szerepkörhöz, majd válassza a Tovább gombot.
  6. A Véleményezés + hozzárendelés lapon válassza a Véleményezés + hozzárendelés lehetőséget.

Események létrehozása az eseményközpontokba sémaérvényesítéssel

Konzolalkalmazás létrehozása eseménykészítőhöz

  1. Indítsa el a Visual Studio 2019-et.
  2. Válassza az Új projekt létrehozása lehetőséget.
  3. Az Új projekt létrehozása párbeszédpanelen hajtsa végre a következő lépéseket: Ha nem látja ezt a párbeszédpanelt, válassza a Menü Fájl elemét, válassza az Új, majd a Projekt lehetőséget.
    1. A programozási nyelvhez válassza a C# lehetőséget.

    2. Az alkalmazás típusához válassza a Konzol lehetőséget.

    3. Válassza a Konzolalkalmazás lehetőséget az eredmények listájából.

    4. Ezután válassza a Tovább gombot.

      Az Új projekt párbeszédpanelt ábrázoló kép.

  4. Adja meg az OrderProducer nevet a projektnévhez, a megoldásnévhez pedig az SRQuickStart értéket, majd kattintson az OK gombra a projekt létrehozásához.

Az Event Hubs NuGet-csomag hozzáadása

  1. A menüben válassza az Eszközök>NuGet Package Manager>Csomagkezelő konzol lehetőséget.

  2. Futtassa az alábbi parancsokat az Azure.Messaging.EventHubs és más NuGet-csomagok telepítéséhez. Nyomja le az ENTER billentyűt az utolsó parancs futtatásához.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Az itt látható módon hitelesítse az előállító alkalmazásokat az Azure-hoz való csatlakozáshoz a Visual Studióval.

  4. Jelentkezzen be az Azure-ba azzal a felhasználói fiókkal, amely a szerepkör tagja a Schema Registry Reader névtér szintjén. További információ a sémaregisztrációs adatbázis szerepköreiről: Azure Schema Registry az Event Hubsban.

Kódlétrehozás az Avro-sémával

  1. Használja ugyanazt a tartalmat, amelyet a séma létrehozásához használt egy nevű Order.avscfájl létrehozásához. Mentse a fájlt a projekt vagy a megoldás mappájába.
  2. Ezután ezzel a sémafájllal létrehozhat kódot a .NET-hez. A kódgeneráláshoz bármilyen külső kódgenerálási eszközt használhat, például avrogent . Futtathatja avrogen -s .\Order.avsc . például a kódot.
  3. A kód létrehozása után megjelenik a nevű Order.cs fájl a \Microsoft\Azure\Data\SchemaRegistry\example mappában. A fenti Avro-séma esetében létrehozza a C#-típusokat a névtérben Microsoft.Azure.Data.SchemaRegistry.example .
  4. Adja hozzá a Order.cs fájlt a OrderProducer projekthez.

Kód írása események szerializálásához és az eseményközpontba való küldéséhez

  1. Adja hozzá a következő kódot a Program.cs fájlhoz. A részletekért tekintse meg a kód megjegyzéseit. A kód magas szintű lépései a következők:

    1. Hozzon létre egy gyártói ügyfelet, amellyel eseményeket küldhet egy eseményközpontba.
    2. Hozzon létre egy sémaregisztrációs ügyfélprogramot, amellyel szerializálhatja és ellenőrizheti az Order objektumban lévő adatokat.
    3. Hozzon létre egy új Order objektumot a létrehozott Order típussal.
    4. A sémaregisztrációs ügyfélalkalmazással szerializálja az objektumot a Order következőre: EventData.
    5. Hozzon létre egy eseményköteget.
    6. Adja hozzá az eseményadatokat az esemény kötegéhez.
    7. Az előállító ügyfél használatával küldje el az események kötegét az eseményközpontba.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Cserélje le a következő helyőrző értékeket a valós értékekre.

    • EVENTHUBSNAMESPACECONNECTIONSTRING- kapcsolati sztring az Event Hubs-névtérhez
    • EVENTHUBNAME - az eseményközpont neve
    • EVENTHUBSNAMESPACENAME - az Event Hubs-névtér neve
    • SCHEMAGROUPNAME - a sémacsoport neve
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Hozza létre a projektet, és győződjön meg arról, hogy nincsenek hibák.

  4. Futtassa a programot, és várja meg a megerősítési üzenetet.

    A batch of 1 order has been published.
    
  5. A Azure Portal ellenőrizheti, hogy az eseményközpont megkapta-e az eseményeket. Váltson Üzenetek nézetre a Metrikák szakaszban. Frissítse a lapot a diagram frissítéséhez. Az üzenetek fogadásának megjelenítése eltarthat néhány másodpercig.

    A Azure Portal oldal képe annak ellenőrzéséhez, hogy az eseményközpont fogadta-e az eseményeket.

Eseményközpontokból származó események felhasználása sémaérvényesítéssel

Ez a szakasz bemutatja, hogyan írhat egy .NET Core-konzolalkalmazást, amely eseményeket fogad egy eseményközpontból, és hogyan használhatja a sémaregisztrációs adatbázist az eseményadatok deszerializálására.

További előfeltételek

  • Hozza létre az eseményfeldolgozóhoz használni kívánt tárfiókot.

Fogyasztói alkalmazás létrehozása

  1. Az Megoldáskezelő ablakban kattintson a jobb gombbal az SRQuickStart megoldásra, mutasson a Hozzáadás pontra, és válassza az Új projekt lehetőséget.
  2. Válassza a Konzolalkalmazás, majd a Tovább gombot.
  3. Adja meg az OrderConsumer nevet a Projekt neve mezőben, majd válassza a Létrehozás lehetőséget.
  4. Az Megoldáskezelő ablakban kattintson a jobb gombbal az OrderConsumer elemre, és válassza a Beállítás indítási projektként lehetőséget.

Az Event Hubs NuGet-csomag hozzáadása

  1. A menüben válassza az Eszközök>NuGet Package Manager>Csomagkezelő konzol lehetőséget.

  2. A Csomagkezelő konzol ablakában győződjön meg arról, hogy az OrderConsumer be van jelölve az Alapértelmezett projekthez. Ha nem, a legördülő listában válassza az OrderConsumer lehetőséget.

  3. Futtassa a következő parancsot a szükséges NuGet-csomagok telepítéséhez. Nyomja le az ENTER billentyűt az utolsó parancs futtatásához.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Az itt látható módon hitelesítse az előállító alkalmazásokat az Azure-hoz való csatlakozáshoz a Visual Studióval.

  5. Jelentkezzen be az Azure-ba azzal a felhasználói fiókkal, amely a szerepkör tagja a Schema Registry Reader névtér szintjén. További információ a sémaregisztrációs adatbázis szerepköreiről: Azure Schema Registry az Event Hubsban.

  6. Adja hozzá az Order.cs előállító alkalmazás létrehozása során létrehozott fájlt az OrderConsumer projekthez.

  7. Kattintson a jobb gombbal az OrderConsumer projekt elemre, és válassza a Beállítás indítási projektként lehetőséget.

Kód írása események fogadásához és deszerializálásához a Sémaregisztrációs adatbázis használatával

  1. Adja hozzá a következő kódot a Program.cs fájlhoz. A részletekért tekintse meg a kód megjegyzéseit. A kód magas szintű lépései a következők:

    1. Hozzon létre egy fogyasztói ügyfelet, amellyel eseményeket küldhet egy eseményközpontba.
    2. Hozzon létre egy blobtároló-ügyfelet a blobtárolóhoz az Azure Blob Storage-ban.
    3. Hozzon létre egy eseményfeldolgozó ügyfelet, és regisztrálja az esemény- és hibakezelőket.
    4. Az eseménykezelőben hozzon létre egy sémaregisztrációs ügyfélprogramot, amellyel deszerializálhatja az eseményadatokat egy Order objektumba.
    5. Deszerializálja az eseményadatokat egy Order objektumba a szerializáló használatával.
    6. A kapott rendelés adatainak nyomtatása.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be userd as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Cserélje le a következő helyőrző értékeket a valós értékekre.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING- kapcsolati sztring az Event Hubs-névtérhez
    • EVENTHUBNAME - az eseményközpont neve
    • EVENTHUBSNAMESPACENAME - az Event Hubs-névtér neve
    • SCHEMAGROUPNAME - a sémacsoport neve
    • AZURESTORAGECONNECTIONSTRING- kapcsolati sztring az Azure Storage-fiókhoz
    • BLOBCONTAINERNAME - A blobtároló neve
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Hozza létre a projektet, és győződjön meg arról, hogy nincsenek hibák.

  4. Futtassa a fogadóalkalmazást.

  5. Meg kell jelennie egy üzenetnek, amely az események fogadását ismerteti.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Ezek az események a küldőprogram futtatásával korábban az eseményközpontba küldött három esemény.

Példák

Tekintse meg a Readme cikket a GitHub-adattárban.

Az erőforrások eltávolítása

Törölje az Event Hubs-névteret, vagy törölje a névteret tartalmazó erőforráscsoportot.

Következő lépések