Partager via


Démarrage rapide : Envoyer ou recevoir des événements d’Event Hubs à l’aide de Go

Azure Event Hubs est une plateforme de diffusion de données volumineuses et un service d’ingestion d’événements, capable de recevoir et de traiter des millions d’événements par seconde. Les concentrateurs d’événements peuvent traiter et stocker des événements, des données ou la télémétrie produits par des logiciels et appareils distribués. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide d’adaptateurs de traitement par lot/stockage ou d’un fournisseur d’analyse en temps réel. Pour une présentation détaillée d’Event Hubs, consultez Vue d’ensemble d’Event Hubs et Fonctionnalités d’Event Hubs.

Ce guide de démarrage rapide explique comment écrire des applications Go pour envoyer ou recevoir des événements à destination ou en provenance d’un hub d’événements.

Notes

Ce guide de démarrage rapide est basé sur des exemples disponibles sur GitHub à l’adresse https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La section d’envoi des événements est basée sur l’exemple example_producing_events_test.go, et celle de réception est basée sur l’exemple example_processor_test.go. Le code a été simplifié pour le guide de démarrage rapide, et tous les commentaires détaillés ont été supprimés. Vous devez donc consulter les exemples pour obtenir plus de détails et d’explications.

Prérequis

Pour effectuer ce démarrage rapide, vous avez besoin de ce qui suit :

  • Go Installé localement. Suivez ces instructions si nécessaire.
  • Un compte Azure actif. Si vous n’avez pas d’abonnement Azure, créez un compte gratuit avant de commencer.
  • Créez un espace de noms Event Hubs et un Event Hub. Utilisez le portail Azure pour créer un espace de noms de type Event Hubs et obtenez les informations de gestion nécessaires à votre application pour communiquer avec le hub d’événements. Pour créer un espace de noms et un hub d’événements, suivez la procédure décrite dans cet article.

Envoyer des événements

Cette section montre comment créer une application Go pour envoyer des événements à un hub d’événements.

Installer le package Go

Obtenez le package Go pour Event Hubs, comme indiqué dans l’exemple suivant.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code permettant d’envoyer des événements à un hub d’événements

Voici le code permettant d’envoyer des événements à un hub d’événements. Les principales étapes du code sont les suivantes :

  1. Créer un client producteur Event Hubs en utilisant une chaîne de connexion à l’espace de noms Event Hubs et le nom du hub d’événements.
  2. Créer un objet de lot, puis ajouter des exemples d’événements au lot.
  3. Envoyer le lot d’événements au hub d’événements.

Important

Remplacez NAMESPACE CONNECTION STRING par la chaîne de connexion à votre espace de noms Event Hubs, et EVENT HUB NAME par le nom du hub d’événements dans l’exemple de code.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

N’exécutez pas encore l’application. Vous devez d’abord exécuter l’application réceptrice, puis l’application émettrice.

Recevoir des événements

Créer un compte de stockage et un conteneur

Des états comme les baux sur des partitions et des points de contrôle dans les événements sont partagés entre les destinataires en utilisant un conteneur de stockage Azure. Vous pouvez créer un compte de stockage et un conteneur avec le Kit de développement logiciel (SDK) Go ou en suivant les instructions fournies dans Comptes de stockage Azure.

Suivez les recommandations ci-dessous quand vous utilisez Stockage Blob Azure comme magasin de points de contrôle :

  • Utilisez un conteneur distinct pour chaque groupe de consommateurs. Vous pouvez utiliser le même compte de stockage, mais utiliser un conteneur par groupe.
  • N’utilisez pas le conteneur et le compte de stockage pour quoi que ce soit d’autre.
  • Le compte de stockage doit se trouver dans la même région que l’application déployée. Si l’application est locale, essayez de choisir la région la plus proche possible.

Sur la page Compte de stockage du Portail Azure, dans la section Service BLOB, vérifiez que les paramètres suivants sont désactivés.

  • Espace de noms hiérarchique
  • Suppression réversible de blob
  • Contrôle de version

Packages Go

Pour recevoir les messages, récupérez les packages Go pour Event Hubs, comme indiqué dans l’exemple suivant.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Code permettant de recevoir les événements d’un hub d’événements

Voici le code permettant de recevoir les événements d’un hub d’événements. Les principales étapes du code sont les suivantes :

  1. Créer un objet de magasin de point de contrôle qui représente le service Stockage Blob Azure utilisé par le hub d’événements pour la vérification des points de contrôle.
  2. Créer un client consommateur Event Hubs en utilisant une chaîne de connexion à l’espace de noms Event Hubs et le nom du hub d’événements.
  3. Créer un processeur d’événements en utilisant l’objet client et l’objet de magasin de point de contrôle. Le processeur reçoit et traite les événements.
  4. Pour chaque partition du hub d’événements, créer un client de partition en utilisant la fonction processEvents afin de traiter les événements.
  5. Exécuter tous les clients de partition pour recevoir et traiter les événements.

Important

Remplacez les valeurs d’espace réservé suivantes par les valeurs réelles :

  • AZURE STORAGE CONNECTION STRING par la chaîne de connexion de votre compte Stockage Azure
  • BLOB CONTAINER NAME par le nom du conteneur d’objets blob que vous avez créé dans le compte de stockage
  • NAMESPACE CONNECTION STRING par la chaîne de connexion de votre espace de noms Event Hubs
  • EVENT HUB NAME par le nom du hub d’événements dans l’exemple de code.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Exécuter les applications réceptrice et émettrice

  1. Exécutez d’abord l’application réceptrice.

  2. Exécutez l’application émettrice.

  3. Attendez une minute pour voir la sortie suivante dans la fenêtre de l’application réceptrice.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Étapes suivantes

Consultez des exemples sur GitHub à l’adresse https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.