Guida introduttiva: Inviare o ricevere eventi da Hub eventi con Go
Hub eventi di Azure è una piattaforma di Big Data streaming e un servizio di inserimento di eventi che consente di ricevere ed elaborare milioni di eventi al secondo. Hub eventi consente di elaborare e archiviare eventi, dati o dati di telemetria generati dal software distribuito e dai dispositivi. I dati inviati a un hub eventi possono essere trasformati e archiviati usando qualsiasi provider di analisi in tempo reale o adattatori di invio in batch/archiviazione. Per una panoramica dettagliata di Hub eventi, vedere Panoramica di Hub eventi e Funzionalità di Hub eventi.
Questa guida introduttiva descrive come scrivere applicazioni Go per inviare o ricevere eventi da un hub eventi.
Nota
Questa guida introduttiva si basa sugli esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La sezione degli eventi di invio si basa sull'esempio example_producing_events_test.go e la ricezione si basa sull'esempio example_processor_test.go . Il codice è semplificato per l'avvio rapido e tutti i commenti dettagliati vengono rimossi, quindi esaminare gli esempi per altri dettagli e spiegazioni.
Prerequisiti
Per completare questa guida introduttiva è necessario soddisfare i prerequisiti seguenti:
- Go installato in locale. Seguire queste istruzioni, se necessario.
- Un account Azure attivo. Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
- Creare uno spazio dei nomi di Hub eventi e un hub eventi. Usare il portale di Azure per creare uno spazio dei nomi di tipo Hub eventi e ottenere le credenziali di gestione necessarie all'applicazione per comunicare con l'hub eventi. Per creare uno spazio dei nomi e un hub eventi, seguire la procedura descritta in questo articolo.
Inviare gli eventi
Questa sezione illustra come creare un'applicazione Go per inviare eventi a un hub eventi.
Installare il pacchetto di Go
Ottenere il pacchetto Go per Hub eventi, come illustrato nell'esempio seguente.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Codice per inviare eventi a un hub eventi
Ecco il codice per inviare eventi a un hub eventi. I passaggi principali nel codice sono:
- Creare un client producer di Hub eventi usando un stringa di connessione allo spazio dei nomi di Hub eventi e al nome dell'hub eventi.
- Creare un oggetto batch e aggiungere eventi di esempio al batch.
- Inviare il batch di eventi agli eventi th.
Importante
Sostituire NAMESPACE CONNECTION STRING
con il stringa di connessione allo spazio dei nomi di Hub eventi e EVENT HUB NAME
con il nome dell'hub eventi nel codice di esempio.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Non eseguire ancora l'applicazione. È prima necessario eseguire l'app ricevitore e quindi l'app mittente.
Ricevere eventi
Creare un account di archiviazione e un contenitore
Lo stato, ad esempio i lease sulle partizioni e i checkpoint negli eventi, vengono condivisi tra ricevitori usando un contenitore Archiviazione di Azure. È possibile creare un account di archiviazione e un contenitore usando l'SDK per Go, ma anche seguendo le istruzioni riportate in Informazioni sugli account di archiviazione di Azure.
Seguire questi consigli quando si usa Archiviazione BLOB di Azure come archivio checkpoint:
- Usare un contenitore separato per ogni gruppo di consumer. È possibile usare lo stesso account di archiviazione, ma usare un contenitore per ogni gruppo.
- Non usare il contenitore per altri elementi e non usare l'account di archiviazione per altri elementi.
- Archiviazione account deve trovarsi nella stessa area in cui si trova l'applicazione distribuita. Se l'applicazione è locale, provare a scegliere l'area più vicina possibile.
Nella pagina Archiviazione account della portale di Azure, nella sezione Servizio BLOB verificare che le impostazioni seguenti siano disabilitate.
- Spazio dei nomi gerarchico
- Eliminazione temporanea dei BLOB
- Controllo delle versioni
Pacchetti Go
Per ricevere i messaggi, ottenere i pacchetti Go per Hub eventi, come illustrato nell'esempio seguente.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Codice per ricevere eventi da un hub eventi
Ecco il codice per ricevere eventi da un hub eventi. I passaggi principali nel codice sono:
- Controllare un oggetto archivio checkpoint che rappresenta il Archiviazione BLOB di Azure usato dall'hub eventi per il checkpoint.
- Creare un client consumer di Hub eventi usando un stringa di connessione allo spazio dei nomi di Hub eventi e al nome dell'hub eventi.
- Creare un processore di eventi usando l'oggetto client e l'oggetto archivio checkpoint. Il processore riceve ed elabora gli eventi.
- Per ogni partizione nell'hub eventi, creare un client di partizione con processEvents come funzione per elaborare gli eventi.
- Eseguire tutti i client di partizione per ricevere ed elaborare gli eventi.
Importante
Sostituire i valori segnaposto seguenti con i valori effettivi:
AZURE STORAGE CONNECTION STRING
con il stringa di connessione per l'account di archiviazione di AzureBLOB CONTAINER NAME
con il nome del contenitore BLOB creato nell'account di archiviazioneNAMESPACE CONNECTION STRING
con il stringa di connessione per lo spazio dei nomi di Hub eventiEVENT HUB NAME
con il nome dell'hub eventi nel codice di esempio.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Eseguire app ricevitore e mittente
Eseguire prima l'app ricevitore.
Eseguire l'app mittente.
Attendere un minuto per visualizzare l'output seguente nella finestra del ricevitore.
Processing 2 event(s) Event received with body hello Event received with body world
Passaggi successivi
Vedere gli esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.