Guida introduttiva: Inviare o ricevere eventi da Hub eventi con Go

Hub eventi di Azure è una piattaforma di Big Data streaming e un servizio di inserimento di eventi che consente di ricevere ed elaborare milioni di eventi al secondo. Hub eventi consente di elaborare e archiviare eventi, dati o dati di telemetria generati dal software distribuito e dai dispositivi. I dati inviati a un hub eventi possono essere trasformati e archiviati usando qualsiasi provider di analisi in tempo reale o adattatori di invio in batch/archiviazione. Per una panoramica dettagliata di Hub eventi, vedere Panoramica di Hub eventi e Funzionalità di Hub eventi.

Questa guida introduttiva descrive come scrivere applicazioni Go per inviare o ricevere eventi da un hub eventi.

Nota

Questa guida introduttiva si basa sugli esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. L'invio si basa sull'esempio di example_producing_events_test.go e la ricezione si basa sull'esempio example_processor_test.go . Il codice è semplificato per l'avvio rapido e tutti i commenti dettagliati vengono rimossi, quindi esaminare gli esempi per altri dettagli e spiegazioni.

Prerequisiti

Per completare questa guida introduttiva è necessario soddisfare i prerequisiti seguenti:

  • Go installato in locale. Seguire queste istruzioni, se necessario.
  • Un account Azure attivo. Se non si ha una sottoscrizione di Azure, creare un account gratuito prima di iniziare.
  • Creare uno spazio dei nomi di Hub eventi e un hub eventi. Usare il portale di Azure per creare uno spazio dei nomi di tipo Hub eventi e ottenere le credenziali di gestione necessarie all'applicazione per comunicare con l'hub eventi. Per creare uno spazio dei nomi e un hub eventi, seguire la procedura descritta in questo articolo.

Inviare gli eventi

Questa sezione illustra come creare un'applicazione Go per inviare eventi a un hub eventi.

Installare il pacchetto di Go

Ottenere il pacchetto Go per Hub eventi, come illustrato nell'esempio seguente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Codice per inviare eventi a un hub eventi

Ecco il codice per inviare eventi a un hub eventi. I passaggi principali nel codice sono:

  1. Creare un client producer di Hub eventi usando un stringa di connessione allo spazio dei nomi di Hub eventi e al nome dell'hub eventi.
  2. Creare un oggetto batch e aggiungere eventi di esempio al batch.
  3. Inviare il batch di eventi agli eventi th.

Importante

Sostituire NAMESPACE CONNECTION STRING con il stringa di connessione allo spazio dei nomi di Hub eventi e EVENT HUB NAME con il nome dell'hub eventi nel codice di esempio.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Non eseguire ancora l'applicazione. È prima necessario eseguire l'app ricevitore e quindi l'app mittente.

Ricevere eventi

Creare un account di archiviazione e un contenitore

Le informazioni sullo stato, come i lease sulle partizioni e i checkpoint nel flusso di eventi, vengono condivise tra i ricevitori tramite un contenitore di Archiviazione di Azure. È possibile creare un account di archiviazione e un contenitore usando l'SDK per Go, ma anche seguendo le istruzioni riportate in Informazioni sugli account di archiviazione di Azure.

Seguire questi consigli quando si usa Archiviazione BLOB di Azure come archivio checkpoint:

  • Usare un contenitore separato per ogni gruppo di consumer. È possibile usare lo stesso account di archiviazione, ma usare un contenitore per ogni gruppo.
  • Non usare il contenitore per altri elementi e non usare l'account di archiviazione per altri elementi.
  • Archiviazione account deve trovarsi nella stessa area in cui si trova l'applicazione distribuita. Se l'applicazione è locale, provare a scegliere l'area più vicina possibile.

Nella pagina Archiviazione account della portale di Azure, nella sezione Servizio BLOB verificare che le impostazioni seguenti siano disabilitate.

  • Spazio dei nomi gerarchico
  • Eliminazione temporanea dei BLOB
  • Controllo delle versioni

Pacchetti Go

Per ricevere i messaggi, ottenere i pacchetti Go per Hub eventi, come illustrato nell'esempio seguente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Codice per ricevere eventi da un hub eventi

Ecco il codice per ricevere eventi da un hub eventi. I passaggi principali nel codice sono:

  1. Controllare un oggetto archivio checkpoint che rappresenta il Archiviazione BLOB di Azure usato dall'hub eventi per il checkpoint.
  2. Creare un client consumer di Hub eventi usando un stringa di connessione allo spazio dei nomi di Hub eventi e al nome dell'hub eventi.
  3. Creare un processore di eventi usando l'oggetto client e l'oggetto archivio checkpoint. Il processore riceve ed elabora gli eventi.
  4. Per ogni partizione nell'hub eventi, creare un client di partizione con processEvents come funzione per elaborare gli eventi.
  5. Eseguire tutti i client di partizione per ricevere ed elaborare gli eventi.

Importante

Sostituire i valori segnaposto seguenti con i valori effettivi:

  • AZURE STORAGE CONNECTION STRINGcon il stringa di connessione per l'account di archiviazione di Azure
  • BLOB CONTAINER NAME con il nome del contenitore BLOB creato nell'account di archiviazione
  • NAMESPACE CONNECTION STRINGcon il stringa di connessione per lo spazio dei nomi di Hub eventi
  • EVENT HUB NAME con il nome dell'hub eventi nel codice di esempio.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Eseguire app ricevitore e mittente

  1. Eseguire prima l'app ricevitore.

  2. Eseguire l'app mittente.

  3. Attendere un minuto per visualizzare l'output seguente nella finestra del ricevitore.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Passaggi successivi

Vedere gli esempi in GitHub all'indirizzo https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.