Snabbstart: Skicka händelser till eller ta emot händelser från Event Hubs med Go

Azure Event Hubs är en strömningstjänst för stordata och händelseinmatningstjänst som kan ta emot och bearbeta flera miljoner händelser per sekund. Event Hubs kan bearbeta och lagra händelser, data eller telemetri som producerats av distribuerade program och enheter. Data som skickas till en händelsehubb kan omvandlas och lagras med valfri provider för realtidsanalys eller batchbearbetnings-/lagringsadapter. En detaljerad översikt över Event Hubs finns i Översikt över Event Hubs och Event Hubs-funktioner.

Den här snabbstarten beskriver hur du skriver Go-program för att skicka händelser till eller ta emot händelser från en händelsehubb.

Kommentar

Den här snabbstarten baseras på exempel på GitHub på https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Send-exemplet baseras på exemplet example_producing_events_test.go och den mottagna baseras på exemplet example_processor_test.go . Koden förenklas för snabbstarten och alla detaljerade kommentarer tas bort, så titta på exemplen för mer information och förklaringar.

Förutsättningar

För att slutföra den här snabbstarten, behöver du följande förhandskrav:

  • Gå installerad lokalt. Följ dessa instruktioner om det behövs.
  • Ett aktivt Azure-konto. Om du inte har någon Azure-prenumeration skapar du ett kostnadsfritt konto innan du börjar.
  • Skapa ett Event Hubs-namnområde och en händelsehubb. Använd Azure-portalen för att skapa ett namnområde av typen Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln.

Skicka händelser

Det här avsnittet visar hur du skapar ett Go-program för att skicka händelser till en händelsehubb.

Installera Go-paketet

Hämta Go-paketet för Event Hubs enligt följande exempel.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kod för att skicka händelser till en händelsehubb

Här är koden för att skicka händelser till en händelsehubb. De viktigaste stegen i koden är:

  1. Skapa en Event Hubs-producentklient med hjälp av en anslutningssträng till Event Hubs-namnområdet och händelsehubbens namn.
  2. Skapa ett batchobjekt och lägg till exempelhändelser i batchen.
  3. Skicka batchen med händelser till händelserna.

Viktigt!

Ersätt NAMESPACE CONNECTION STRING med anslutningssträng till Event Hubs-namnområdet och EVENT HUB NAME med namnet på händelsehubben i exempelkoden.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Kör inte programmet än. Du måste först köra mottagarappen och sedan avsändarappen.

Ta emot händelser

Skapa ett lagringskonto och en container

Tillstånd som lån på partitioner och kontrollpunkter i händelseströmmen delas mellan mottagare med hjälp av en Azure Storage-container. Du kan skapa ett lagringskonto och en container med Go SDK, men du kan också skapa ett genom att följa anvisningarna i Om Azure-lagringskonton.

Följ dessa rekommendationer när du använder Azure Blob Storage som kontrollpunktslager:

  • Använd en separat container för varje konsumentgrupp. Du kan använda samma lagringskonto, men använda en container per grupp.
  • Använd inte containern för något annat och använd inte lagringskontot för något annat.
  • Lagringskontot ska finnas i samma region som det distribuerade programmet finns i. Om programmet är lokalt kan du försöka välja den region som är närmast.

På sidan Lagringskonto i Azure-portalen i avsnittet Blob Service kontrollerar du att följande inställningar är inaktiverade.

  • Hierarkisk namnrymd
  • Mjuk borttagning av blob
  • Versionshantering

Go-paket

Om du vill ta emot meddelandena hämtar du Go-paketen för Event Hubs enligt följande exempel.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Kod för att ta emot händelser från en händelsehubb

Här är koden för att ta emot händelser från en händelsehubb. De viktigaste stegen i koden är:

  1. Kontrollera ett kontrollpunktsarkivobjekt som representerar Azure Blob Storage som används av händelsehubben för kontrollpunkter.
  2. Skapa en Event Hubs-konsumentklient med hjälp av en anslutningssträng till Event Hubs-namnområdet och händelsehubbens namn.
  3. Skapa en händelseprocessor med hjälp av klientobjektet och kontrollpunktsarkivobjektet. Processorn tar emot och bearbetar händelser.
  4. För varje partition i händelsehubben skapar du en partitionsklient med processEvents som funktion för att bearbeta händelser.
  5. Kör alla partitionsklienter för att ta emot och bearbeta händelser.

Viktigt!

Ersätt följande platshållarvärden med faktiska värden:

  • AZURE STORAGE CONNECTION STRINGmed anslutningssträng för ditt Azure Storage-konto
  • BLOB CONTAINER NAME med namnet på blobcontainern som du skapade i lagringskontot
  • NAMESPACE CONNECTION STRINGmed anslutningssträng för event hubs-namnområdet
  • EVENT HUB NAME med händelsehubbens namn i exempelkoden.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Köra mottagar- och avsändarappar

  1. Kör mottagarappen först.

  2. Kör avsändarappen.

  3. Vänta en minut för att se följande utdata i mottagarfönstret.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Nästa steg

Se exempel på GitHub på https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.