Guia de início rápido: enviar ou receber eventos dos Hubs de Eventos usando a linguagem Go
Os Hubs de Eventos do Azure são uma plataforma de streaming de Big Data e um serviço de ingestão de eventos capaz de receber e processar milhões de eventos por segundo. Os Hubs de Eventos podem processar e armazenar eventos, dados ou telemetria produzidos pelos dispositivos e software distribuídos. Os dados enviados para um Hub de Eventos podem ser transformados e armazenados usando qualquer provedor de análise em tempo real ou adaptadores de envio em lote/armazenamento. Para obter uma visão detalhada dos Hubs de Eventos, confira Visão geral de Hubs de Eventos e Recursos de Hubs de Eventos.
Este guia de início rápido descreve como escrever aplicativos Go para enviar ou receber eventos de um hub de eventos.
Observação
Este guia de início rápido é baseado em exemplos do GitHub em https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. A seção de envio de eventos é baseada no exemplo example_producing_events_test.go e a de recebimento é baseada no exemplo example_processor_test.go. O código está simplificado para o guia de início rápido e todos os comentários detalhados foram removidos, portanto, examine os exemplos para obter mais detalhes e explicações.
Pré-requisitos
Para concluir este início rápido, você precisará dos seguintes pré-requisitos:
- Go instalado Localmente. Siga estas instruções, se necessário.
- Uma conta ativa do Azure. Se você não tiver uma assinatura do Azure, crie uma conta gratuita antes de começar.
- Criar um namespace de Hubs de Eventos e um hub de eventos. Use o portal do Azure para criar um namespace do tipo Hubs de Eventos e obter as credenciais de gerenciamento que seu aplicativo precisa para se comunicar com o hub de eventos. Para criar um namespace e um hub de eventos, siga o procedimento nesse artigo.
Enviar eventos
Esta seção mostra como criar um aplicativo Go para enviar eventos para um hub de eventos.
Instalar o pacote do Go
Obtenha o pacote Go para Hubs de Eventos, como é mostrado no exemplo a seguir.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Código para enviar eventos a um hub de eventos
Veja o código para enviar eventos a um hub de eventos. As principais etapas no código são:
- Criar um cliente produtor dos Hubs de Eventos usando uma cadeia de conexão para o namespace dos Hubs de Eventos e o nome do hub de eventos.
- Criar um objeto em lote e adicionar eventos de exemplo ao lote.
- Enviar o lote de eventos para o hub de eventos.
Importante
Substitua NAMESPACE CONNECTION STRING
pela cadeia de conexão do namespace dos Hubs de Eventos e EVENT HUB NAME
pelo nome do hub de eventos no código de exemplo.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
Não execute o aplicativo ainda. Primeiro, você precisa executar o aplicativo receptor e depois o aplicativo remetente.
Receber eventos
Criar uma conta e um contêiner de armazenamento
Estados, como concessões em partições e pontos de verificação nos eventos, são compartilhado entre receptores usando um contêiner de Armazenamento do Azure. Você pode criar uma conta de armazenamento e um contêiner com o SDK Go, mas você também pode criar um seguindo as instruções em Sobre Contas de armazenamento do Azure.
Siga estas recomendações ao usar Armazenamento de Blobs do Azure como um repositório de ponto de verificação:
- Use um contêiner separado para cada grupo de consumidores. Você pode usar a mesma conta de armazenamento, mas usar um contêiner por cada grupo.
- Não use o contêiner para mais nada e não use a conta de armazenamento para mais nada.
- A conta de armazenamento deve estar na mesma região em que o aplicativo implantado está localizado. Se o aplicativo for local, tente escolher a região mais próxima possível.
Na página Conta de armazenamento do portal do Azure, na seção serviço Blob, verifique se as seguintes configurações estão desabilitadas.
- Namespace hierárquico
- Exclusão temporária de blobs
- Controle de versão
Pacotes do Go
Para receber as mensagens, obtenha os pacotes Go para Hubs de Eventos, como é mostrado no exemplo a seguir.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Código para receber eventos de um hub de eventos
Veja o código para receber eventos de um hub de eventos. As principais etapas no código são:
- Verificar um objeto de repositório de ponto de verificação que represente o Armazenamento de Blobs do Azure usado pelo hub de eventos como um ponto de verificação.
- Criar um cliente consumidor dos Hubs de Eventos usando uma cadeia de conexão para o namespace dos Hubs de Eventos e o nome do hub de eventos.
- Criar um processador de eventos usando o objeto cliente e o objeto do repositório de ponto de verificação. O processador recebe e processa eventos.
- Para cada partição no hub de eventos, crie um cliente de partição com processEvents como a função para processar eventos.
- Executar todos os clientes de partição para receber e processar eventos.
Importante
Substitua os seguintes valores de espaço reservado por valores reais:
AZURE STORAGE CONNECTION STRING
pela cadeia de conexão da sua conta de armazenamento do AzureBLOB CONTAINER NAME
pelo nome do contêiner de blob que você criou na conta de armazenamentoNAMESPACE CONNECTION STRING
pela cadeia de conexão do seu namespace dos Hubs de EventosEVENT HUB NAME
pelo nome do hub de eventos no código de exemplo.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Executar aplicativos de destinatário e remetente
Execute o aplicativo receptor primeiro.
Execute o aplicativo remetente.
Aguarde um minuto para ver a saída a seguir na janela do receptor.
Processing 2 event(s) Event received with body hello Event received with body world
Próximas etapas
Veja os exemplos no GitHub em https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.