Schnellstart: Senden von Ereignissen an oder Empfangen von Ereignissen aus Event Hubs mithilfe von Go

Azure Event Hubs ist eine Big Data-Streamingplattform und ein Ereigniserfassungsdienst, der pro Sekunde Millionen von Ereignissen empfangen und verarbeiten kann. Event Hubs kann Ereignisse, Daten oder Telemetriedaten, die von verteilter Software und verteilten Geräten erzeugt wurden, verarbeiten und speichern. An einen Event Hub gesendete Daten können transformiert und mit einem beliebigen Echtzeitanalyse-Anbieter oder Batchverarbeitungs-/Speicheradapter gespeichert werden. Eine ausführliche Übersicht über Event Hubs finden Sie unter Was ist Azure Event Hubs? und Event Hubs-Features im Überblick.

In diesem Schnellstart wird beschrieben, wie Sie Go-Anwendungen schreiben, die Ereignisse an einen Event Hub senden oder von diesem empfangen.

Hinweis

Dieser Schnellstart basiert auf Beispielen auf GitHub unter https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Das Sendebeispiel basiert auf dem Beispiel example_producing_events_test.go, und das Empfangsbeispiel auf dem Beispiel example_processor_test.go. Der Code wurde für die Schnellstartanleitung vereinfacht, und alle ausführlichen Kommentare wurden entfernt. Sehen Sie sich also die Beispiele an, um weitere Details und Erläuterungen zu erhalten.

Voraussetzungen

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

  • Eine lokale Go-Installation. Befolgen Sie bei Bedarf diese Anweisungen.
  • Ein aktives Azure-Konto. Wenn Sie kein Azure-Abonnement besitzen, können Sie ein kostenloses Konto erstellen, bevor Sie beginnen.
  • Erstellen Sie einen Event Hubs-Namespace und einen Event Hub. Verwenden Sie das Azure-Portal, um einen Namespace des Typs „Event Hubs“ zu erstellen, und rufen Sie die Verwaltungsanmeldeinformationen ab, die Ihre Anwendung für die Kommunikation mit dem Event Hub benötigt. Erstellen Sie anhand der Anleitung in diesem Artikel einen Namespace und einen Event Hub.

Senden von Ereignissen

In diesem Abschnitt erfahren Sie, wie Sie eine Go-Anwendung zum Senden von Ereignissen an einen Event Hub erstellen.

Installieren des Go-Pakets

Rufen Sie das Go-Paket für Event Hubs ab, wie im folgenden Beispiel gezeigt.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code zum Senden von Ereignissen an einen Event Hub

Der folgende Code dient zum Senden von Ereignissen an einen Event Hub. Die wichtigsten Schritte im Code sind:

  1. Erstellen eines Event Hubs-Producerclients mithilfe einer Verbindungszeichenfolge zum Event Hubs-Namespace und dem Event Hub-Namen.
  2. Erstellen eines Batchobjekts und Hinzufügen von Beispielereignissen zum Batch.
  3. Senden des Ereignisbatchs an die Ereignisse.

Wichtig

Ersetzen Sie NAMESPACE CONNECTION STRING durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace und EVENT HUB NAME durch den Event Hub-Namen im Beispielcode.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Führen Sie die Anwendung noch nicht aus. Sie müssen zuerst die Empfänger-App und dann die Sender-App ausführen.

Empfangen von Ereignissen

Erstellen eines Storage-Kontos und -Containers

Angaben zum Status, z.B. Leases auf Partitionen und Prüfpunkte im Ereignisstream, werden über einen Azure Storage-Container zwischen den Empfängern freigegeben. Sie können ein Speicherkonto und einen Container erstellen, indem Sie das Go SDK verwenden oder die Anweisungen unter Informationen zu Azure-Speicherkonten befolgen.

Befolgen Sie die folgenden Empfehlungen, wenn Sie Azure Blob Storage als Prüfpunktspeicher verwenden:

  • Verwenden Sie einen separaten Container für jede Consumergruppe. Sie können dasselbe Speicherkonto verwenden, aber verwenden Sie für jede Gruppe einen eigenen Container.
  • Verwenden Sie weder den Container noch das Speicherkonto für andere Zwecke.
  • Das Speicherkonto sollte sich in derselben Region befinden, in der sich die bereitgestellte Anwendung befindet. Wenn die Anwendung lokal ist, versuchen Sie, die nächstgelegene Region auszuwählen.

Stellen Sie auf der Seite Speicherkonto im Azure-Portal im Abschnitt Blobdienst sicher, dass die folgenden Einstellungen deaktiviert sind.

  • Hierarchischer Namespace
  • Vorläufiges Löschen von Blobs
  • Versionsverwaltung

Go-Pakete

Rufen Sie zum Empfangen der Nachrichten die Go-Pakete für Event Hubs ab, wie im folgenden Beispiel gezeigt.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code zum Empfangen von Ereignissen von einem Event Hub

Der folgende Code dient zum Empfangen von Ereignissen von einem Event Hub. Die wichtigsten Schritte im Code sind:

  1. Überprüfen Sie ein Prüfpunktspeicherobjekt, das die Azure Blob Storage-Instanz darstellt, die vom Event Hub für Prüfpunkterstellung verwendet wird.
  2. Erstellen eines Event Hubs-Consumerclients mithilfe einer Verbindungszeichenfolge zum Event Hubs-Namespace und dem Event Hub-Namen.
  3. Erstellen Sie mithilfe des Clientobjekts und des Prüfpunktspeicherobjekts einen Ereignisprozessor. Der Prozessor empfängt und verarbeitet Ereignisse.
  4. Erstellen Sie für jede Partition im Event Hub einen Partitionsclient mit processEvents als Funktion zum Verarbeiten von Ereignissen.
  5. Führen Sie alle Partitionsclients aus, um Ereignisse zu empfangen und zu verarbeiten.

Wichtig

Ersetzen Sie die folgenden Platzhalterwerte durch die tatsächlichen Werte:

  • AZURE STORAGE CONNECTION STRING durch die Verbindungszeichenfolge für Ihr Azure-Speicherkonto.
  • BLOB CONTAINER NAME durch den Namen des Blobcontainers, den Sie im Speicherkonto erstellt haben.
  • NAMESPACE CONNECTION STRING durch die Verbindungszeichenfolge für Ihren Event Hubs-Namespace.
  • EVENT HUB NAME durch den Event Hub-Namen im Beispielcode.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Ausführen von Empfänger- und Sender-Apps

  1. Führen Sie den Empfänger zuerst aus.

  2. Führen Sie die Sender-App aus.

  3. Warten Sie eine Minute, bis die folgende Ausgabe im Empfängerfenster angezeigt wird.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Nächste Schritte

Weitere Informationen finden Sie in den Beispielen auf GitHub unter https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.