Avvio rapido: Inviare eventi a o ricevere eventi da Hub eventi usando Go
Hub eventi di Azure è una piattaforma di Big Data streaming e un servizio di inserimento di eventi che consente di ricevere ed elaborare milioni di eventi al secondo. Hub eventi consente di elaborare e archiviare eventi, dati o dati di telemetria generati dal software distribuito e dai dispositivi. I dati inviati a un hub eventi possono essere trasformati e archiviati usando qualsiasi provider di analisi in tempo reale o adattatori di invio in batch/archiviazione. Per una panoramica dettagliata di Hub eventi, vedere Panoramica di Hub eventi e Funzionalità di Hub eventi.
Questa guida introduttiva descrive come scrivere applicazioni Go per inviare eventi a o ricevere eventi da un hub eventi.
Nota
Questa guida introduttiva si basa sugli esempi in GitHub in https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. L'invio si basa sull'esempio example_producing_events_test.go e quello di ricezione si basa sull'esempio example_processor_test.go . Il codice è semplificato per l'avvio rapido e tutti i commenti dettagliati vengono rimossi, quindi esaminare gli esempi per ulteriori dettagli e spiegazioni.
Prerequisiti
Per completare questa guida introduttiva è necessario soddisfare i prerequisiti seguenti:
- Go installato in locale. Seguire queste istruzioni, se necessario.
- Un account Azure attivo. Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
- Creare uno spazio dei nomi di Hub eventi e un hub eventi. Usare il portale di Azure per creare uno spazio dei nomi di tipo Hub eventi e ottenere le credenziali di gestione necessarie all'applicazione per comunicare con l'hub eventi. Per creare uno spazio dei nomi e un hub eventi, seguire la procedura descritta in questo articolo.
Inviare eventi
Questa sezione illustra come creare un'applicazione Go per inviare eventi a un hub eventi.
Installare il pacchetto di Go
Ottenere il pacchetto Go per Hub eventi, come illustrato nell'esempio seguente.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Codice per inviare eventi a un hub eventi
Ecco il codice per inviare eventi a un hub eventi. I passaggi principali nel codice sono:
- Creare un client producer di Hub eventi usando una stringa di connessione allo spazio dei nomi Hub eventi e al nome dell'hub eventi.
- Creare un oggetto batch e aggiungere eventi di esempio al batch.
- Inviare il batch di eventi agli eventi di th.
Importante
Sostituire NAMESPACE CONNECTION STRING
con la stringa di connessione allo spazio dei nomi hub eventi e EVENT HUB NAME
con il nome dell'hub eventi nel codice di esempio.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
}
// send the batch of events to the event hub
producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Non eseguire ancora l'applicazione. È prima necessario eseguire l'app ricevitore e quindi l'app mittente.
Ricevere eventi
Creare un account di archiviazione e un contenitore
Le informazioni sullo stato, come i lease sulle partizioni e i checkpoint nel flusso di eventi, vengono condivise tra i ricevitori tramite un contenitore di Archiviazione di Azure. È possibile creare un account di archiviazione e un contenitore usando l'SDK per Go, ma anche seguendo le istruzioni riportate in Informazioni sugli account di archiviazione di Azure.
Seguire queste raccomandazioni quando si usano Archiviazione BLOB di Azure come archivio checkpoint:
- Usare un contenitore separato per ogni gruppo di processori. È possibile usare lo stesso account di archiviazione, ma usare un contenitore per ogni gruppo.
- Non usare il contenitore per altri elementi e non usare l'account di archiviazione per altri elementi.
- L'account di archiviazione deve trovarsi nella stessa area in cui si trova l'applicazione distribuita. Se l'applicazione è locale, provare a scegliere l'area più vicina possibile.
Nella pagina Account di archiviazione nella portale di Azure, nella sezione Servizio BLOB verificare che le impostazioni seguenti siano disabilitate.
- Spazio dei nomi gerarchico
- Eliminazione temporanea DEL BLOB
- Controllo delle versioni
Pacchetti Go
Per ricevere i messaggi, ottenere i pacchetti Go per Hub eventi, come illustrato nell'esempio seguente.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Codice per ricevere eventi da un hub eventi
Ecco il codice per ricevere eventi da un hub eventi. I passaggi principali nel codice sono:
- Controllare un oggetto archivio checkpoint che rappresenta la Archiviazione BLOB di Azure utilizzata dall'hub eventi per il checkpoint.
- Creare un client consumer di Hub eventi usando una stringa di connessione allo spazio dei nomi Hub eventi e al nome dell'hub eventi.
- Creare un processore di eventi usando l'oggetto client e l'oggetto archivio checkpoint. Il processore riceve e elabora gli eventi.
- Per ogni partizione nell'hub eventi, creare un client di partizione con processEvents come funzione per elaborare gli eventi.
- Eseguire tutti i client di partizione per ricevere ed elaborare gli eventi.
Importante
Sostituire i valori segnaposto seguenti con i valori effettivi:
AZURE STORAGE CONNECTION STRING
con la stringa di connessione per l'account di archiviazione di AzureBLOB CONTAINER NAME
con il nome del contenitore BLOB creato nell'account di archiviazioneNAMESPACE CONNECTION STRING
con la stringa di connessione per lo spazio dei nomi hub eventiEVENT HUB NAME
con il nome dell'hub eventi nel codice di esempio.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Eseguire app ricevitore e mittente
Eseguire prima l'app ricevitore.
Eseguire l'app mittente.
Attendere un minuto per visualizzare l'output seguente nella finestra del ricevitore.
Processing 2 event(s) Event received with body hello Event received with body world
Passaggi successivi
Vedere esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.