Quickstart: Gebeurtenissen verzenden naar of ontvangen van Event Hubs met behulp van Go
Azure Event Hubs is een big data-platform voor het streamen van gegevens en een gebeurtenisopneemservice die miljoenen gebeurtenissen per seconde kan opnemen en verwerken. Event Hubs kan gebeurtenissen, gegevens of telemetrie die wordt geproduceerd door gedistribueerde software en apparaten verwerken en opslaan. Gegevens die naar een Event Hub worden verzonden, kunnen worden omgezet en opgeslagen via een provider voor realtime analytische gegevens of batchverwerking/opslagadapters. Zie Overzicht van Event Hubs en Functies van Event Hubs voor een gedetailleerd overzicht van Event Hubs.
In deze quickstart wordt beschreven hoe u Go-toepassingen schrijft voor het verzenden van gebeurtenissen naar of het ontvangen van gebeurtenissen van een Event Hub.
Notitie
Deze quickstart is gebaseerd op voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. De sectie Gebeurtenissen verzenden is gebaseerd op het voorbeeld example_producing_events_test.go en de ontvangst is gebaseerd op het voorbeeld example_processor_test.go . De code is vereenvoudigd voor de quickstart en alle gedetailleerde opmerkingen worden verwijderd, dus bekijk de voorbeelden voor meer informatie en uitleg.
Voor het voltooien van deze snelstart moet aan de volgende vereisten worden voldaan:
- Go is lokaal geïnstalleerd. Volg deze instructies indien nodig.
- Een actief Azure-account. Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.
- Een Event Hubs-naamruimte en een Event Hub maken. Gebruik de Azure-portal om een naamruimte van het type Event Hubs te maken en de beheerreferenties te verkrijgen die de toepassing nodig heeft om met de Event Hub te communiceren. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken.
In deze sectie wordt beschreven hoe u een Go-toepassing maakt voor het verzenden van gebeurtenissen naar een Event Hub.
Haal het Go-pakket voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Hier volgt de code voor het verzenden van gebeurtenissen naar een Event Hub. De belangrijkste stappen in de code zijn:
- Maak een Event Hubs-producerclient met behulp van een verbindingsreeks naar de Event Hubs-naamruimte en de naam van de Event Hub.
- Maak een batchobject en voeg voorbeeldgebeurtenissen toe aan de batch.
- Verzend de batch met gebeurtenissen naar de gebeurtenissen.
Belangrijk
Vervang NAMESPACE CONNECTION STRING
door de verbindingsreeks in uw Event Hubs-naamruimte en EVENT HUB NAME
door de naam van de Event Hub in de voorbeeldcode.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Voer de toepassing nog niet uit. U moet eerst de ontvanger-app en vervolgens de afzender-app uitvoeren.
Status, zoals leases op partities en controlepunten in de gebeurtenissen, worden gedeeld tussen ontvangers met behulp van een Azure Storage-container. U kunt een opslagaccount en container maken met de Go-SDK, maar u kunt er ook een maken door de instructies te volgen in Over Azure-opslagaccounts.
Volg deze aanbevelingen wanneer u Azure Blob Storage gebruikt als controlepuntarchief:
- Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
- Gebruik de container niet voor iets anders en gebruik het opslagaccount niet voor iets anders.
- Het opslagaccount moet zich in dezelfde regio bevinden als waarin de geïmplementeerde toepassing zich bevindt. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.
Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.
- Hiërarchische naamruimte
- Blob voorlopig verwijderen
- Versiebeheer
Als u de berichten wilt ontvangen, haalt u de Go-pakketten voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Hier volgt de code voor het ontvangen van gebeurtenissen van een Event Hub. De belangrijkste stappen in de code zijn:
- Controleer een controlepuntarchiefobject dat de Azure Blob Storage vertegenwoordigt die door de Event Hub wordt gebruikt voor controlepunten.
- Maak een Event Hubs-consumentenclient met behulp van een verbindingsreeks naar de Event Hubs-naamruimte en de naam van de Event Hub.
- Maak een gebeurtenisprocessor met behulp van het clientobject en het controlepuntarchiefobject. De processor ontvangt en verwerkt gebeurtenissen.
- Maak voor elke partitie in de Event Hub een partitieclient met processEvents als de functie voor het verwerken van gebeurtenissen.
- Voer alle partitieclients uit om gebeurtenissen te ontvangen en te verwerken.
Belangrijk
Vervang de volgende tijdelijke aanduidingen door werkelijke waarden:
AZURE STORAGE CONNECTION STRING
met de verbindingsreeks voor uw Azure-opslagaccountBLOB CONTAINER NAME
met de naam van de blobcontainer die u hebt gemaakt in het opslagaccountNAMESPACE CONNECTION STRING
met de verbindingsreeks voor uw Event Hubs-naamruimteEVENT HUB NAME
met de event hub-naam in de voorbeeldcode.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Voer eerst de ontvanger-app uit.
Voer de afzender-app uit.
Wacht even totdat de volgende uitvoer in het ontvangervenster wordt weergegeven.
Processing 2 event(s) Event received with body hello Event received with body world
Bekijk voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.