Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Azure Event Hubs is een big data-platform voor het streamen van gegevens en een gebeurtenisopneemservice die miljoenen gebeurtenissen per seconde kan opnemen en verwerken. Event Hubs kan gebeurtenissen, gegevens of telemetrie die wordt geproduceerd door gedistribueerde software en apparaten verwerken en opslaan. Gegevens die naar een Event Hub worden verzonden, kunnen worden omgezet en opgeslagen via een provider voor realtime analytische gegevens of batchverwerking/opslagadapters. Zie Overzicht van Event Hubs en Functies van Event Hubs voor een gedetailleerd overzicht van Event Hubs.
In deze quickstart wordt beschreven hoe u Go-toepassingen schrijft voor het verzenden van gebeurtenissen naar of het ontvangen van gebeurtenissen van een Event Hub.
Opmerking
Deze quickstart is gebaseerd op voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. De sectie Gebeurtenissen verzenden is gebaseerd op het voorbeeld example_producing_events_test.go en de ontvangst is gebaseerd op het voorbeeld example_processor_test.go . De code is vereenvoudigd voor de quickstart en alle gedetailleerde opmerkingen worden verwijderd, dus bekijk de voorbeelden voor meer informatie en uitleg.
Vereiste voorwaarden
Voor het voltooien van deze quickstart hebt u de volgende vereisten nodig:
- Go is lokaal geïnstalleerd. Volg deze instructies indien nodig.
- Een actief Azure-account. Als je geen Azure-abonnement hebt, maak dan een gratis account aan voordat je begint.
- Een Event Hubs-naamruimte en een Event Hub maken. Gebruik Azure Portal om een naamruimte van het type Event Hubs te maken en haal de beheerreferenties op die uw toepassing nodig heeft om te communiceren met de Event Hub. Volg de procedure in dit artikel om een naamruimte en een Event Hub te maken.
Gebeurtenissen verzenden
In deze sectie wordt beschreven hoe u een Go-toepassing maakt om gebeurtenissen naar een Event Hub te verzenden.
Go-pakket installeren
Haal het Go-pakket voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Code voor het verzenden van gebeurtenissen naar een Event Hub
Hier volgt de code voor het verzenden van gebeurtenissen naar een Event Hub. De belangrijkste stappen in de code zijn:
- Maak een Event Hubs-producerclient met behulp van een verbindingsreeks voor de Event Hubs-naamruimte en de naam van de Event Hub.
- Maak een batchobject en voeg voorbeeldgebeurtenissen toe aan de batch.
- Verzend de batch met gebeurtenissen naar de gebeurtenissen.
Belangrijk
Vervang NAMESPACE CONNECTION STRING
door de verbindingsreeks naar uw Event Hubs-naamruimte en EVENT HUB NAME
door de naam van de Event Hub in de voorbeeldcode.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Voer de toepassing nog niet uit. U moet eerst de ontvanger-app en vervolgens de afzender-app uitvoeren.
Gebeurtenissen ontvangen
Een opslagaccount en container maken
Status, zoals leases op partities en controlepunten tijdens de gebeurtenissen, worden gedeeld tussen ontvangers middels een Azure Storage-container. U kunt een opslagaccount en container maken met de Go SDK, maar u kunt er ook een maken door de instructies in Over Azure-opslagaccounts te volgen.
Volg deze aanbevelingen wanneer u Azure Blob Storage als controlepuntarchief gebruikt:
- Gebruik een afzonderlijke container voor elke consumentengroep. U kunt hetzelfde opslagaccount gebruiken, maar één container per groep gebruiken.
- Gebruik het opslagaccount niet voor iets anders.
- Gebruik de container niet voor iets anders.
- Maak het opslagaccount in dezelfde regio als de geïmplementeerde toepassing. Als de toepassing on-premises is, probeert u de dichtstbijzijnde regio te kiezen.
Controleer op de pagina Opslagaccount in Azure Portal in de sectie Blob-service of de volgende instellingen zijn uitgeschakeld.
- Hiërarchische naamruimte
- Blob zacht verwijderen
- Versiebeheer
Go-pakketten
Als u de berichten wilt ontvangen, haalt u de Go-pakketten voor Event Hubs op, zoals wordt weergegeven in het volgende voorbeeld.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Code voor het ontvangen van gebeurtenissen van een Event Hub
Hier volgt de code voor het ontvangen van gebeurtenissen van een Event Hub. De belangrijkste stappen in de code zijn:
- Controleer een controlepuntarchiefobject dat de Azure Blob Storage vertegenwoordigt die door de Event Hub wordt gebruikt voor controlepunten.
- Maak een Event Hubs-consumentenclient met behulp van een verbindingsreeks met de Event Hubs-naamruimte en de naam van de Event Hub.
- Maak een gebeurtenisprocessor met behulp van het clientobject en het controlepuntarchiefobject. De processor ontvangt en verwerkt gebeurtenissen.
- Maak voor elke partitie in de Event Hub een partitieclient met processEvents als de functie voor het verwerken van gebeurtenissen.
- Voer alle partitieclients uit om gebeurtenissen te ontvangen en te verwerken.
Belangrijk
Vervang de volgende tijdelijke aanduidingen door werkelijke waarden:
-
AZURE STORAGE CONNECTION STRING
met de connectiestring voor uw Azure-opslagaccount -
BLOB CONTAINER NAME
met de naam van de blobcontainer die u hebt gemaakt in het opslagaccount -
NAMESPACE CONNECTION STRING
met de connectiestring voor uw Event Hubs-naamruimte -
EVENT HUB NAME
met de event hub-naam in de voorbeeldcode.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Apps voor ontvangers en afzenders uitvoeren
Voer eerst de ontvanger-app uit.
Voer de afzender-app uit.
Wacht even totdat de volgende uitvoer in het ontvangervenster wordt weergegeven.
Processing 2 event(s) Event received with body hello Event received with body world
Volgende stappen
Bekijk voorbeelden op GitHub op https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.