Quickstart: Gebeurtenissen verzenden naar of ontvangen van Event Hubs met behulp van Go

Azure Event Hubs is een big data-platform voor het streamen van gegevens en een gebeurtenisopneemservice die miljoenen gebeurtenissen per seconde kan opnemen en verwerken. Event Hubs kan gebeurtenissen, gegevens of telemetrie die wordt geproduceerd door gedistribueerde software en apparaten verwerken en opslaan. Gegevens die naar een Event Hub worden verzonden, kunnen worden omgezet en opgeslagen via een provider voor realtime analytische gegevens of batchverwerking/opslagadapters. Zie Overzicht van Event Hubs en Functies van Event Hubs voor een gedetailleerd overzicht van Event Hubs.

In deze quickstart wordt beschreven hoe u Go-toepassingen schrijft voor het verzenden van gebeurtenissen naar of het ontvangen van gebeurtenissen van een Event Hub.

Notitie

Deze quickstart is gebaseerd op voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Het verzendvoorbeeld is gebaseerd op het voorbeeld example_producing_events_test.go en de ontvangst is gebaseerd op het voorbeeld example_processor_test.go . De code is vereenvoudigd voor de quickstart en alle gedetailleerde opmerkingen worden verwijderd, dus bekijk de voorbeelden voor meer informatie en uitleg.

Vereisten

Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:

  • Go is lokaal geïnstalleerd. Volg deze instructies indien nodig.
  • Een actief Azure-account. Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.
  • Een Event Hubs-naamruimte en een Event Hub maken. Gebruik de Azure-portal om een naamruimte van het type Event Hubs te maken en de beheerreferenties te verkrijgen die de toepassing nodig heeft om met de Event Hub te communiceren. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken.

Gebeurtenissen verzenden

In deze sectie wordt beschreven hoe u een Go-toepassing maakt voor het verzenden van gebeurtenissen naar een Event Hub.

Go-pakket installeren

Haal het Go-pakket voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code voor het verzenden van gebeurtenissen naar een Event Hub

Hier volgt de code voor het verzenden van gebeurtenissen naar een Event Hub. De belangrijkste stappen in de code zijn:

  1. Maak een Event Hubs-producerclient met behulp van een verbindingsreeks naar de Event Hubs-naamruimte en de naam van de Event Hub.
  2. Maak een batchobject en voeg voorbeeldgebeurtenissen toe aan de batch.
  3. Verzend de batch met gebeurtenissen naar de gebeurtenissen.

Belangrijk

Vervang NAMESPACE CONNECTION STRING door de verbindingsreeks in uw Event Hubs-naamruimte en EVENT HUB NAME door de naam van de Event Hub in de voorbeeldcode.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Voer de toepassing nog niet uit. U moet eerst de ontvanger-app en vervolgens de afzender-app uitvoeren.

Gebeurtenissen ontvangen

Een opslagaccount en container maken

De status, zoals leases op partities en controlepunten in de gebeurtenisstroom, wordt gedeeld tussen ontvangers die gebruikmaken van een Azure Storage-container. U kunt een opslagaccount en container maken met de Go-SDK, maar u kunt er ook een maken door de instructies te volgen in Over Azure-opslagaccounts.

Volg deze aanbevelingen wanneer u Azure Blob Storage gebruikt als controlepuntarchief:

  • Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
  • Gebruik de container niet voor iets anders en gebruik het opslagaccount niet voor iets anders.
  • Het opslagaccount moet zich in dezelfde regio bevinden als waarin de geïmplementeerde toepassing zich bevindt. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.

Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.

  • Hiërarchische naamruimte
  • Blob voorlopig verwijderen
  • Versiebeheer

Go-pakketten

Als u de berichten wilt ontvangen, haalt u de Go-pakketten voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code voor het ontvangen van gebeurtenissen van een Event Hub

Hier volgt de code voor het ontvangen van gebeurtenissen van een Event Hub. De belangrijkste stappen in de code zijn:

  1. Controleer een controlepuntarchiefobject dat de Azure Blob Storage vertegenwoordigt die door de Event Hub wordt gebruikt voor controlepunten.
  2. Maak een Event Hubs-consumentenclient met behulp van een verbindingsreeks naar de Event Hubs-naamruimte en de naam van de Event Hub.
  3. Maak een gebeurtenisprocessor met behulp van het clientobject en het controlepuntarchiefobject. De processor ontvangt en verwerkt gebeurtenissen.
  4. Maak voor elke partitie in de Event Hub een partitieclient met processEvents als de functie voor het verwerken van gebeurtenissen.
  5. Voer alle partitieclients uit om gebeurtenissen te ontvangen en te verwerken.

Belangrijk

Vervang de volgende tijdelijke aanduidingen door werkelijke waarden:

  • AZURE STORAGE CONNECTION STRINGmet de verbindingsreeks voor uw Azure-opslagaccount
  • BLOB CONTAINER NAME met de naam van de blobcontainer die u hebt gemaakt in het opslagaccount
  • NAMESPACE CONNECTION STRINGmet de verbindingsreeks voor uw Event Hubs-naamruimte
  • EVENT HUB NAME met de event hub-naam in de voorbeeldcode.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Apps voor ontvangers en afzenders uitvoeren

  1. Voer eerst de ontvanger-app uit.

  2. Voer de afzender-app uit.

  3. Wacht even totdat de volgende uitvoer in het ontvangervenster wordt weergegeven.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Volgende stappen

Bekijk voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.