Partilhar via


Guia de início rápido: enviar ou receber eventos de Hubs de Eventos usando o Go

Os Hubs de Eventos do Azure são uma plataforma de fluxo de Macrodados e um serviço de ingestão de eventos capaz de receber e processar milhões de eventos por segundo. Os Hubs de Eventos podem processar e armazenar eventos, dados ou telemetria produzidos por dispositivos e software distribuído. Os dados enviados para um hub de eventos podem ser transformados e armazenados em qualquer fornecedor de análise em tempo real ou adaptadores de armazenamento/criação de batches. Para uma descrição geral detalhada dos Hubs de Eventos, veja Descrição geral dos Hubs de Eventos e Funcionalidades dos Hubs de Eventos.

Este guia de início rápido descreve como escrever aplicativos Go para enviar ou receber eventos de um hub de eventos.

Nota

Este início rápido é baseado em exemplos no GitHub em https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. A seção de eventos de envio é baseada no exemplo example_producing_events_test.go e a de recebimento é baseada no exemplo example_processor_test.go . O código é simplificado para o início rápido e todos os comentários detalhados são removidos, então veja os exemplos para obter mais detalhes e explicações.

Pré-requisitos

Para concluir este início rápido, você precisa dos seguintes pré-requisitos:

  • Vá instalado localmente. Siga estas instruções , se necessário.
  • Uma conta ativa do Azure. Se não tiver uma subscrição do Azure, crie uma conta gratuita antes de começar.
  • Crie um namespace de Hubs de Eventos e um hub de eventos. Use o portal do Azure para criar um namespace do tipo Hubs de Eventos e obter as credenciais de gerenciamento de que seu aplicativo precisa para se comunicar com o hub de eventos. Para criar um namespace e um hub de eventos, siga o procedimento neste artigo.

Enviar eventos

Esta seção mostra como criar um aplicativo Go para enviar eventos para um hub de eventos.

Instalar o pacote Go

Obtenha o pacote Go para Hubs de Eventos, conforme mostrado no exemplo a seguir.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Código para enviar eventos para um hub de eventos

Aqui está o código para enviar eventos para um hub de eventos. As principais etapas do código são:

  1. Crie um cliente produtor de Hubs de Eventos usando uma cadeia de conexão com o namespace Hubs de Eventos e o nome do hub de eventos.
  2. Crie um objeto em lote e adicione eventos de exemplo ao lote.
  3. Envie o lote de eventos para os eventos.

Importante

Substitua NAMESPACE CONNECTION STRING pela cadeia de conexão ao namespace dos Hubs de Eventos e EVENT HUB NAME pelo nome do hub de eventos no código de exemplo.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Não execute o aplicativo ainda. Primeiro, você precisa executar o aplicativo recetor e, em seguida, o aplicativo remetente.

Receber eventos

Criar uma conta de armazenamento e um contêiner

Estado como concessões em partições e pontos de verificação nos eventos são compartilhados entre recetores usando um contêiner de Armazenamento do Azure. Você pode criar uma conta de armazenamento e um contêiner com o SDK Go, mas também pode criar um seguindo as instruções em Sobre as contas de armazenamento do Azure.

Siga estas recomendações ao usar o Armazenamento de Blobs do Azure como um armazenamento de ponto de verificação:

  • Use um contêiner separado para cada grupo de consumidores. Você pode usar a mesma conta de armazenamento, mas usar um contêiner por cada grupo.
  • Não use o contêiner para mais nada e não use a conta de armazenamento para mais nada.
  • A conta de armazenamento deve estar na mesma região em que o aplicativo implantado está localizado. Se o aplicativo for local, tente escolher a região mais próxima possível.

Na página Conta de armazenamento no portal do Azure, na seção Serviço de Blob, verifique se as configurações a seguir estão desabilitadas.

  • Espaço de nomes hierárquico
  • Eliminação de forma recuperável de blobs
  • Controlo de Versão

Pacotes Go

Para receber as mensagens, obtenha os pacotes Go para Hubs de Eventos, conforme mostrado no exemplo a seguir.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Código para receber eventos de um hub de eventos

Aqui está o código para receber eventos de um hub de eventos. As principais etapas do código são:

  1. Verifique um objeto de armazenamento de ponto de verificação que representa o Armazenamento de Blob do Azure usado pelo hub de eventos para ponto de verificação.
  2. Crie um cliente consumidor de Hubs de Eventos usando uma cadeia de conexão para o namespace Hubs de Eventos e o nome do hub de eventos.
  3. Crie um processador de eventos usando o objeto cliente e o objeto de armazenamento de ponto de verificação. O processador recebe e processa eventos.
  4. Para cada partição no hub de eventos, crie um cliente de partição com processEvents como a função para processar eventos.
  5. Execute todos os clientes de partição para receber e processar eventos.

Importante

Substitua os seguintes valores de espaço reservado por valores reais:

  • AZURE STORAGE CONNECTION STRING com a cadeia de conexão para sua conta de armazenamento do Azure
  • BLOB CONTAINER NAME com o nome do contêiner de blob criado na conta de armazenamento
  • NAMESPACE CONNECTION STRING com a cadeia de conexão para seu namespace de Hubs de Eventos
  • EVENT HUB NAME com o nome do hub de eventos no código de exemplo.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Executar aplicativos de destinatário e remetente

  1. Execute primeiro a aplicação do recetor.

  2. Execute o aplicativo remetente.

  3. Aguarde um minuto para ver a seguinte saída na janela do recetor.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Próximos passos

Veja exemplos no GitHub em https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.