Partager via


Démarrage rapide : Envoyer des événements à event Hubs ou recevoir des événements à l’aide de Go

Azure Event Hubs est une plateforme de diffusion de données volumineuses et un service d’ingestion d’événements, capable de recevoir et de traiter des millions d’événements par seconde. Les concentrateurs d’événements peuvent traiter et stocker des événements, des données ou la télémétrie produits par des logiciels et appareils distribués. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide d’adaptateurs de traitement par lot/stockage ou d’un fournisseur d’analyse en temps réel. Pour une présentation détaillée d’Event Hubs, consultez Vue d’ensemble d’Event Hubs et Fonctionnalités d’Event Hubs.

Ce guide de démarrage rapide explique comment écrire des applications Go pour envoyer des événements à un hub d’événements ou y recevoir des événements.

Remarque

Ce guide de démarrage rapide est basé sur des exemples sur GitHub à l’adresse https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La section d’événements d’envoi est basée sur l’exemple example_producing_events_test.go et la réception est basée sur l’exemple example_processor_test.go . Le code est simplifié pour le démarrage rapide et tous les commentaires détaillés sont supprimés. Examinez donc les exemples pour plus de détails et d’explications.

Conditions préalables

Pour effectuer ce démarrage rapide, vous avez besoin de ce qui suit :

  • Go Installé localement. Suivez ces instructions si nécessaire.
  • Un compte Azure actif. Si vous n’avez pas d’abonnement Azure, créez un compte gratuit avant de commencer.
  • Créez un espace de noms Event Hubs et un Event Hub. Utilisez le portail Azure pour créer un espace de noms de type Event Hubs et obtenir les informations d’identification de gestion dont votre application a besoin pour communiquer avec le hub d’événements. Pour créer un espace de noms et un hub d’événements, suivez la procédure décrite dans cet article.

Envoyer des événements

Cette section vous montre comment créer une application Go pour envoyer des événements à un hub d’événements.

Installer le package Go

Obtenez le package Go pour Event Hubs, comme illustré dans l’exemple suivant.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Code permettant d’envoyer des événements à un hub d’événements

Voici le code permettant d’envoyer des événements à un hub d’événements. Les principales étapes du code sont les suivantes :

  1. Créez un client de producteur Event Hubs à l’aide d’une chaîne de connexion à l’espace de noms Event Hubs et au nom du hub d’événements.
  2. Créez un objet batch et ajoutez des exemples d’événements au lot.
  3. Envoyer le lot d’événements au hub d’événements.

Important

Remplacez NAMESPACE CONNECTION STRING par la chaîne de connexion à votre espace de noms Event Hubs et EVENT HUB NAME par le nom du hub d’événements dans l’exemple de code.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

N’exécutez pas encore l’application. Vous devez d’abord exécuter l’application réceptrice, puis l’application expéditeur.

Recevoir des événements

Créer un compte de stockage et un conteneur

Des états comme les baux sur des partitions et des points de contrôle dans les événements sont partagés entre les destinataires en utilisant un conteneur de stockage Azure. Vous pouvez créer un compte de stockage et un conteneur avec le Kit de développement logiciel (SDK) Go, mais vous pouvez également en créer un en suivant les instructions fournies dans À propos des comptes de stockage Azure.

Suivez ces recommandations lorsque vous utilisez le Stockage Blob Azure en tant que magasin de points de contrôle :

  • Utilisez un conteneur distinct pour chaque groupe de consommateurs. Vous pouvez utiliser le même compte de stockage, mais utiliser un conteneur par groupe.
  • N’utilisez pas le compte de stockage pour autre chose.
  • N’utilisez pas le conteneur pour tout autre chose.
  • Créez le compte de stockage dans la même région que l’application déployée. Si l’application est locale, essayez de choisir la région la plus proche possible.

Sur la page Compte de stockage du Portail Azure, dans la section Service BLOB, vérifiez que les paramètres suivants sont désactivés.

  • Espace de noms hiérarchique
  • Suppression réversible de blob
  • Gestion des versions

Packages Go

Pour recevoir les messages, obtenez les packages Go pour Event Hubs, comme illustré dans l’exemple suivant.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Code permettant de recevoir des événements à partir d’un hub d’événements

Voici le code permettant de recevoir des événements à partir d’un hub d’événements. Les principales étapes du code sont les suivantes :

  1. Créer un objet de magasin de point de contrôle qui représente le service Stockage Blob Azure utilisé par le hub d’événements pour la vérification des points de contrôle.
  2. Créez un client consommateur d'Event Hubs à l'aide d'une chaîne de connexion à l'espace de noms Event Hubs et au nom du hub d'événements.
  3. Créez un processeur d’événements à l’aide de l’objet client et de l’objet de magasin de points de contrôle. Le processeur reçoit et traite les événements.
  4. Pour chaque partition dans le hub d’événements, créez un client de partition avec processEvents comme fonction pour traiter les événements.
  5. Exécutez tous les clients de partition pour recevoir et traiter des événements.

Important

Remplacez les valeurs d’espace réservé suivantes par les valeurs réelles :

  • AZURE STORAGE CONNECTION STRING par la chaîne de connexion de votre compte Stockage Azure
  • BLOB CONTAINER NAME avec le nom du conteneur blob que vous avez créé dans le compte de stockage
  • NAMESPACE CONNECTION STRING par la chaîne de connexion de votre espace de noms Event Hubs
  • EVENT HUB NAME avec le nom du hub d’événements dans l’exemple de code.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Exécuter des applications de récepteur et d’expéditeur

  1. Exécutez d’abord l’application réceptrice.

  2. Exécutez l’application expéditeur.

  3. Attendez une minute pour voir la sortie suivante dans la fenêtre du récepteur.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Étapes suivantes

Consultez des exemples sur GitHub à l’adresse https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.