Delen via


Quickstart: Gebeurtenissen verzenden naar of ontvangen van Event Hubs met behulp van Go

Azure Event Hubs is een big data-platform voor het streamen van gegevens en een gebeurtenisopneemservice die miljoenen gebeurtenissen per seconde kan opnemen en verwerken. Event Hubs kan gebeurtenissen, gegevens of telemetrie die wordt geproduceerd door gedistribueerde software en apparaten verwerken en opslaan. Gegevens die naar een Event Hub worden verzonden, kunnen worden omgezet en opgeslagen via een provider voor realtime analytische gegevens of batchverwerking/opslagadapters. Zie Overzicht van Event Hubs en Functies van Event Hubs voor een gedetailleerd overzicht van Event Hubs.

In deze quickstart wordt beschreven hoe u Go-toepassingen schrijft voor het verzenden van gebeurtenissen naar of het ontvangen van gebeurtenissen van een Event Hub.

Opmerking

Deze quickstart is gebaseerd op voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. De sectie Gebeurtenissen verzenden is gebaseerd op het voorbeeld example_producing_events_test.go en de ontvangst is gebaseerd op het voorbeeld example_processor_test.go . De code is vereenvoudigd voor de quickstart en alle gedetailleerde opmerkingen worden verwijderd, dus bekijk de voorbeelden voor meer informatie en uitleg.

Vereiste voorwaarden

Voor het voltooien van deze quickstart hebt u de volgende vereisten nodig:

  • Go is lokaal geïnstalleerd. Volg deze instructies indien nodig.
  • Een actief Azure-account. Als je geen Azure-abonnement hebt, maak dan een gratis account aan voordat je begint.
  • Een Event Hubs-naamruimte en een Event Hub maken. Gebruik Azure Portal om een naamruimte van het type Event Hubs te maken en haal de beheerreferenties op die uw toepassing nodig heeft om te communiceren met de Event Hub. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken.

Gebeurtenissen verzenden

In deze sectie wordt beschreven hoe u een Go-toepassing maakt om gebeurtenissen naar een Event Hub te verzenden.

Go-pakket installeren

Haal het Go-pakket voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code voor het verzenden van gebeurtenissen naar een Event Hub

Hier volgt de code voor het verzenden van gebeurtenissen naar een Event Hub. De belangrijkste stappen in de code zijn:

  1. Maak een Event Hubs-producerclient met behulp van een verbindingsreeks voor de Event Hubs-naamruimte en de naam van de Event Hub.
  2. Maak een batchobject en voeg voorbeeldgebeurtenissen toe aan de batch.
  3. Verzend de batch met gebeurtenissen naar de gebeurtenissen.

Belangrijk

Vervang NAMESPACE CONNECTION STRING door de verbindingsreeks naar uw Event Hubs-naamruimte en EVENT HUB NAME door de naam van de Event Hub in de voorbeeldcode.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Voer de toepassing nog niet uit. U moet eerst de ontvanger-app en vervolgens de afzender-app uitvoeren.

Gebeurtenissen ontvangen

Een opslagaccount en container maken

Status, zoals leases op partities en controlepunten tijdens de gebeurtenissen, worden gedeeld tussen ontvangers middels een Azure Storage-container. U kunt een opslagaccount en container maken met de Go SDK, maar u kunt er ook een maken door de instructies in Over Azure-opslagaccounts te volgen.

Volg deze aanbevelingen wanneer u Azure Blob Storage als controlepuntarchief gebruikt:

  • Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
  • Gebruik het opslagaccount niet voor iets anders.
  • Gebruik de container niet voor iets anders.
  • Maak het opslagaccount in dezelfde regio als de geïmplementeerde toepassing. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.

Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.

  • Hiërarchische naamruimte
  • Blob zacht verwijderen
  • Versiebeheer

Go-pakketten

Als u de berichten wilt ontvangen, haalt u de Go-pakketten voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Code voor het ontvangen van gebeurtenissen van een Event Hub

Hier volgt de code voor het ontvangen van gebeurtenissen van een Event Hub. De belangrijkste stappen in de code zijn:

  1. Controleer een controlepuntarchiefobject dat de Azure Blob Storage vertegenwoordigt die door de Event Hub wordt gebruikt voor controlepunten.
  2. Maak een Event Hubs-consumentenclient met behulp van een verbindingsreeks met de Event Hubs-naamruimte en de naam van de Event Hub.
  3. Maak een gebeurtenisprocessor met behulp van het clientobject en het controlepuntarchiefobject. De processor ontvangt en verwerkt gebeurtenissen.
  4. Maak voor elke partitie in de Event Hub een partitieclient met processEvents als de functie voor het verwerken van gebeurtenissen.
  5. Voer alle partitieclients uit om gebeurtenissen te ontvangen en te verwerken.

Belangrijk

Vervang de volgende tijdelijke aanduidingen door werkelijke waarden:

  • AZURE STORAGE CONNECTION STRING met de connectiestring voor uw Azure-opslagaccount
  • BLOB CONTAINER NAME met de naam van de blobcontainer die u hebt gemaakt in het opslagaccount
  • NAMESPACE CONNECTION STRING met de connectiestring voor uw Event Hubs-naamruimte
  • EVENT HUB NAME met de event hub-naam in de voorbeeldcode.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Apps voor ontvangers en afzenders uitvoeren

  1. Voer eerst de ontvanger-app uit.

  2. Voer de afzender-app uit.

  3. Wacht even totdat de volgende uitvoer in het ontvangervenster wordt weergegeven.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Volgende stappen

Bekijk voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.