Condividi tramite


Guida introduttiva: Inviare o ricevere eventi da Event Hubs con Go

Hub eventi di Azure è una piattaforma di Big Data streaming e un servizio di inserimento di eventi che consente di ricevere ed elaborare milioni di eventi al secondo. Hub eventi può elaborare e archiviare eventi, dati o dati di telemetria prodotti da software e dispositivi distribuiti. I dati inviati a un hub eventi possono essere trasformati e archiviati usando qualsiasi provider di analisi in tempo reale o adattatori di batch/archiviazione. Per una panoramica dettagliata di Hub eventi, vedere Panoramica di Hub eventi e Funzionalità di Hub eventi.

Questa guida introduttiva descrive come scrivere applicazioni Go per inviare o ricevere eventi da un hub eventi.

Annotazioni

Questa guida introduttiva si basa sugli esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La sezione degli eventi di invio si basa sull'esempio example_producing_events_test.go e la ricezione si basa sull'esempio example_processor_test.go . Il codice è semplificato per l'avvio rapido e tutti i commenti dettagliati vengono rimossi, quindi esaminare gli esempi per altri dettagli e spiegazioni.

Prerequisiti

Per completare questa guida introduttiva è necessario soddisfare i prerequisiti seguenti:

  • Go installato in locale. Se necessario, seguire queste istruzioni .
  • Un account Azure attivo. Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
  • Creare uno spazio dei nomi di Hub eventi e un hub eventi. Usare il portale di Azure per creare uno spazio dei nomi di tipo Hub eventi e ottenere le credenziali di gestione necessarie per comunicare con l'hub eventi. Per creare uno spazio dei nomi e un hub eventi, seguire la procedura descritta in questo articolo.

Inviare eventi

Questa sezione illustra come creare un'applicazione Go per inviare eventi a un hub eventi.

Installare il pacchetto Go

Ottieni il pacchetto Go per Event Hubs, come illustrato nell'esempio seguente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Codice per inviare eventi a un hub eventi

Ecco il codice per inviare eventi a un hub eventi. I passaggi principali nel codice sono:

  1. Creare un client di produzione per Hub eventi utilizzando una stringa di connessione allo spazio dei nomi e al nome di Hub eventi.
  2. Creare un oggetto batch e aggiungere eventi di esempio al batch.
  3. Inviare il batch di eventi agli eventi.

Importante

Sostituire NAMESPACE CONNECTION STRING con la stringa di connessione allo spazio dei nomi di Event Hubs e EVENT HUB NAME con il nome dell'Event Hub nel codice di esempio.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Non eseguire ancora l'applicazione. È prima necessario eseguire l'app ricevitore e quindi l'app mittente.

Ricevere eventi

Creare un account di archiviazione e un contenitore

Le informazioni sullo stato, come i lease sulle partizioni e i punti di controllo negli eventi vengono condivise tra i ricevitori tramite un contenitore di Archiviazione di Azure. È possibile creare un account di archiviazione e un contenitore con Go SDK, ma è anche possibile crearne uno seguendo le istruzioni in Informazioni sugli account di archiviazione di Azure.

Seguire queste raccomandazioni quando si utilizza Archiviazione BLOB di Azure come archivio dei punti di controllo:

  • Usare un contenitore separato per ogni gruppo di consumer. È possibile usare lo stesso account di archiviazione, ma occorre usare un contenitore per ogni gruppo.
  • Non usare l'account di archiviazione per nient'altro.
  • Non usare il contenitore per nient'altro.
  • Creare l'account di archiviazione nella stessa area dell'applicazione distribuita. Se l'applicazione è locale, provare a scegliere l'area più vicina possibile.

Nella pagina Account di archiviazione del portale di Azure verificare che le impostazioni seguenti siano disabilitate nella sezione Servizio BLOB.

  • Spazio dei nomi gerarchico
  • Eliminazione temporanea BLOB
  • Controllo delle versioni

Pacchetti Go

Per ricevere i messaggi, ottenere i pacchetti Go per Hub eventi, come illustrato nell'esempio seguente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Codice per ricevere eventi da un hub eventi

Ecco il codice per ricevere eventi da un hub eventi. I passaggi principali nel codice sono:

  1. Controllare un oggetto di archiviazione dei punti di controllo che rappresenta l'istanza di Archiviazione BLOB di Azure utilizzata da Hub eventi per la creazione di punti di controllo.
  2. Creare un client di produzione per Hub eventi utilizzando una stringa di connessione allo spazio dei nomi e al nome di Hub eventi.
  3. Creare un processore di eventi utilizzando l'oggetto client e l'oggetto archivio dei punti di controllo. Il processore riceve ed elabora gli eventi.
  4. Per ogni partizione nell'hub eventi, creare un client di partizione con processEvents come funzione per elaborare gli eventi.
  5. Eseguire tutti i client di partizione per ricevere ed elaborare gli eventi.

Importante

Sostituire i valori segnaposto seguenti con i valori effettivi:

  • AZURE STORAGE CONNECTION STRING con la stringa di connessione per l'account di archiviazione di Azure
  • BLOB CONTAINER NAME con il nome del contenitore BLOB creato nell'account di archiviazione
  • NAMESPACE CONNECTION STRING con la stringa di connessione per lo spazio dei nomi di Hub eventi
  • EVENT HUB NAME con il nome dell'hub eventi nel codice di esempio.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Eseguire le app mittente e ricevitore

  1. Esegui prima l'app del ricevitore.

  2. Avvia l'app mittente.

  3. Attendere un minuto per visualizzare l'output seguente nella finestra del ricevitore.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Passaggi successivi

Vedere gli esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.