Guia de início rápido: enviar ou receber eventos dos Hubs de Eventos usando a linguagem Go

Os Hubs de Eventos do Azure são uma plataforma de streaming de Big Data e um serviço de ingestão de eventos capaz de receber e processar milhões de eventos por segundo. Os Hubs de Eventos podem processar e armazenar eventos, dados ou telemetria produzidos pelos dispositivos e software distribuídos. Os dados enviados para um Hub de Eventos podem ser transformados e armazenados usando qualquer provedor de análise em tempo real ou adaptadores de envio em lote/armazenamento. Para obter uma visão detalhada dos Hubs de Eventos, confira Visão geral de Hubs de Eventos e Recursos de Hubs de Eventos.

Este guia de início rápido descreve como escrever aplicativos Go para enviar ou receber eventos de um hub de eventos.

Observação

Este guia de início rápido é baseado em exemplos do GitHub em https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. O enviado é baseado no exemplo example_producing_events_test.go e o recebido é baseado no exemplo example_processor_test.go. O código está simplificado para o guia de início rápido e todos os comentários detalhados foram removidos, portanto, examine os exemplos para obter mais detalhes e explicações.

Pré-requisitos

Para concluir este início rápido, você precisará dos seguintes pré-requisitos:

  • Go instalado Localmente. Siga estas instruções, se necessário.
  • Uma conta ativa do Azure. Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
  • Criar um namespace de Hubs de Eventos e um hub de eventos. Use o portal do Azure para criar um namespace do tipo Hubs de Eventos e obter as credenciais de gerenciamento que seu aplicativo precisa para se comunicar com o hub de eventos. Para criar um namespace e um hub de eventos, siga o procedimento nesse artigo.

Enviar eventos

Esta seção mostra como criar um aplicativo Go para enviar eventos para um hub de eventos.

Instalar o pacote do Go

Obtenha o pacote Go para Hubs de Eventos, como é mostrado no exemplo a seguir.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Código para enviar eventos a um hub de eventos

Veja o código para enviar eventos a um hub de eventos. As principais etapas no código são:

  1. Criar um cliente produtor dos Hubs de Eventos usando uma cadeia de conexão para o namespace dos Hubs de Eventos e o nome do hub de eventos.
  2. Criar um objeto em lote e adicionar eventos de exemplo ao lote.
  3. Enviar o lote de eventos para o hub de eventos.

Importante

Substitua NAMESPACE CONNECTION STRING pela cadeia de conexão do namespace dos Hubs de Eventos e EVENT HUB NAME pelo nome do hub de eventos no código de exemplo.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Não execute o aplicativo ainda. Primeiro, você precisa executar o aplicativo receptor e depois o aplicativo remetente.

Receber eventos

Criar uma conta e um contêiner de armazenamento

Afirme como as concessões em partições e pontos de verificação no evento fluxo são compartilhados entre os destinatários usando um contêiner de Armazenamento do Microsoft Azure. Você pode criar uma conta de armazenamento e um contêiner com o SDK Go, mas você também pode criar um seguindo as instruções em Sobre Contas de armazenamento do Azure.

Siga estas recomendações ao usar Armazenamento de Blobs do Azure como um repositório de ponto de verificação:

  • Use um contêiner separado para cada grupo de consumidores. Você pode usar a mesma conta de armazenamento, mas usar um contêiner por cada grupo.
  • Não use o contêiner para mais nada e não use a conta de armazenamento para mais nada.
  • A conta de armazenamento deve estar na mesma região em que o aplicativo implantado está localizado. Se o aplicativo for local, tente escolher a região mais próxima possível.

Na página Conta de armazenamento do portal do Azure, na seção serviço Blob, verifique se as seguintes configurações estão desabilitadas.

  • Namespace hierárquico
  • Exclusão temporária de blobs
  • Controle de versão

Pacotes do Go

Para receber as mensagens, obtenha os pacotes Go para Hubs de Eventos, como é mostrado no exemplo a seguir.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Código para receber eventos de um hub de eventos

Veja o código para receber eventos de um hub de eventos. As principais etapas no código são:

  1. Verificar um objeto de repositório de ponto de verificação que represente o Armazenamento de Blobs do Azure usado pelo hub de eventos como um ponto de verificação.
  2. Criar um cliente consumidor dos Hubs de Eventos usando uma cadeia de conexão para o namespace dos Hubs de Eventos e o nome do hub de eventos.
  3. Criar um processador de eventos usando o objeto cliente e o objeto do repositório de ponto de verificação. O processador recebe e processa eventos.
  4. Para cada partição no hub de eventos, crie um cliente de partição com processEvents como a função para processar eventos.
  5. Executar todos os clientes de partição para receber e processar eventos.

Importante

Substitua os seguintes valores de espaço reservado por valores reais:

  • AZURE STORAGE CONNECTION STRING pela cadeia de conexão da sua conta de armazenamento do Azure
  • BLOB CONTAINER NAME pelo nome do contêiner de blob que você criou na conta de armazenamento
  • NAMESPACE CONNECTION STRING pela cadeia de conexão do seu namespace dos Hubs de Eventos
  • EVENT HUB NAME pelo nome do hub de eventos no código de exemplo.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Executar aplicativos de destinatário e remetente

  1. Execute o aplicativo receptor primeiro.

  2. Execute o aplicativo remetente.

  3. Aguarde um minuto para ver a saída a seguir na janela do receptor.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Próximas etapas

Veja os exemplos no GitHub em https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.