Démarrage rapide : Envoyer ou recevoir des événements d’Event Hubs à l’aide de Go
Azure Event Hubs est une plateforme de diffusion de données volumineuses et un service d’ingestion d’événements, capable de recevoir et de traiter des millions d’événements par seconde. Les concentrateurs d’événements peuvent traiter et stocker des événements, des données ou la télémétrie produits par des logiciels et appareils distribués. Les données envoyées à un concentrateur d’événements peuvent être transformées et stockées à l’aide d’adaptateurs de traitement par lot/stockage ou d’un fournisseur d’analyse en temps réel. Pour une présentation détaillée d’Event Hubs, consultez Vue d’ensemble d’Event Hubs et Fonctionnalités d’Event Hubs.
Ce guide de démarrage rapide explique comment écrire des applications Go pour envoyer ou recevoir des événements à destination ou en provenance d’un hub d’événements.
Notes
Ce guide de démarrage rapide est basé sur des exemples disponibles sur GitHub à l’adresse https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La section d’envoi des événements est basée sur l’exemple example_producing_events_test.go, et celle de réception est basée sur l’exemple example_processor_test.go. Le code a été simplifié pour le guide de démarrage rapide, et tous les commentaires détaillés ont été supprimés. Vous devez donc consulter les exemples pour obtenir plus de détails et d’explications.
Prérequis
Pour effectuer ce démarrage rapide, vous avez besoin de ce qui suit :
- Go Installé localement. Suivez ces instructions si nécessaire.
- Un compte Azure actif. Si vous n’avez pas d’abonnement Azure, créez un compte gratuit avant de commencer.
- Créez un espace de noms Event Hubs et un Event Hub. Utilisez le portail Azure pour créer un espace de noms de type Event Hubs et obtenez les informations de gestion nécessaires à votre application pour communiquer avec le hub d’événements. Pour créer un espace de noms et un hub d’événements, suivez la procédure décrite dans cet article.
Envoyer des événements
Cette section montre comment créer une application Go pour envoyer des événements à un hub d’événements.
Installer le package Go
Obtenez le package Go pour Event Hubs, comme indiqué dans l’exemple suivant.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Code permettant d’envoyer des événements à un hub d’événements
Voici le code permettant d’envoyer des événements à un hub d’événements. Les principales étapes du code sont les suivantes :
- Créer un client producteur Event Hubs en utilisant une chaîne de connexion à l’espace de noms Event Hubs et le nom du hub d’événements.
- Créer un objet de lot, puis ajouter des exemples d’événements au lot.
- Envoyer le lot d’événements au hub d’événements.
Important
Remplacez NAMESPACE CONNECTION STRING
par la chaîne de connexion à votre espace de noms Event Hubs, et EVENT HUB NAME
par le nom du hub d’événements dans l’exemple de code.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
N’exécutez pas encore l’application. Vous devez d’abord exécuter l’application réceptrice, puis l’application émettrice.
Recevoir des événements
Créer un compte de stockage et un conteneur
Des états comme les baux sur des partitions et des points de contrôle dans les événements sont partagés entre les destinataires en utilisant un conteneur de stockage Azure. Vous pouvez créer un compte de stockage et un conteneur avec le Kit de développement logiciel (SDK) Go ou en suivant les instructions fournies dans Comptes de stockage Azure.
Suivez les recommandations ci-dessous quand vous utilisez Stockage Blob Azure comme magasin de points de contrôle :
- Utilisez un conteneur distinct pour chaque groupe de consommateurs. Vous pouvez utiliser le même compte de stockage, mais utiliser un conteneur par groupe.
- N’utilisez pas le conteneur et le compte de stockage pour quoi que ce soit d’autre.
- Le compte de stockage doit se trouver dans la même région que l’application déployée. Si l’application est locale, essayez de choisir la région la plus proche possible.
Sur la page Compte de stockage du Portail Azure, dans la section Service BLOB, vérifiez que les paramètres suivants sont désactivés.
- Espace de noms hiérarchique
- Suppression réversible de blob
- Contrôle de version
Packages Go
Pour recevoir les messages, récupérez les packages Go pour Event Hubs, comme indiqué dans l’exemple suivant.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Code permettant de recevoir les événements d’un hub d’événements
Voici le code permettant de recevoir les événements d’un hub d’événements. Les principales étapes du code sont les suivantes :
- Créer un objet de magasin de point de contrôle qui représente le service Stockage Blob Azure utilisé par le hub d’événements pour la vérification des points de contrôle.
- Créer un client consommateur Event Hubs en utilisant une chaîne de connexion à l’espace de noms Event Hubs et le nom du hub d’événements.
- Créer un processeur d’événements en utilisant l’objet client et l’objet de magasin de point de contrôle. Le processeur reçoit et traite les événements.
- Pour chaque partition du hub d’événements, créer un client de partition en utilisant la fonction processEvents afin de traiter les événements.
- Exécuter tous les clients de partition pour recevoir et traiter les événements.
Important
Remplacez les valeurs d’espace réservé suivantes par les valeurs réelles :
AZURE STORAGE CONNECTION STRING
par la chaîne de connexion de votre compte Stockage AzureBLOB CONTAINER NAME
par le nom du conteneur d’objets blob que vous avez créé dans le compte de stockageNAMESPACE CONNECTION STRING
par la chaîne de connexion de votre espace de noms Event HubsEVENT HUB NAME
par le nom du hub d’événements dans l’exemple de code.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Exécuter les applications réceptrice et émettrice
Exécutez d’abord l’application réceptrice.
Exécutez l’application émettrice.
Attendez une minute pour voir la sortie suivante dans la fenêtre de l’application réceptrice.
Processing 2 event(s) Event received with body hello Event received with body world
Étapes suivantes
Consultez des exemples sur GitHub à l’adresse https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.