Compartir a través de


Inicio rápido: Envío o recepción de eventos desde Event Hubs mediante Go

Azure Event Hubs es una plataforma de streaming de macrodatos y servicio de ingesta de eventos de gran escalabilidad capaz de recibir y procesar millones de eventos por segundo. Event Hubs puede procesar y almacenar eventos, datos o telemetría generados por dispositivos y software distribuido. Los datos enviados a un centro de eventos se pueden transformar y almacenar con cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes y almacenamiento. Para más información sobre Event Hubs, consulte Introducción a Event Hubs y Características de Event Hubs.

En este inicio rápido se describe cómo escribir aplicaciones de Go para enviar eventos a un centro de eventos o recibir eventos desde un centro de eventos.

Nota:

Este inicio rápido se basa en ejemplos de GitHub en https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La sección de eventos de envío se basa en el ejemplo example_producing_events_test.go y la recepción se basa en el ejemplo example_processor_test.go . El código se simplifica para el inicio rápido y se quitan todos los comentarios detallados, así que examine los ejemplos para obtener más detalles y explicaciones.

Prerrequisitos

Para completar este inicio rápido, necesita los siguientes requisitos previos:

  • Go instalado de forma local. Siga estas instrucciones si es necesario.
  • Una cuenta de Azure activa. Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.
  • Creación de un espacio de nombres de Event Hubs y un centro de eventos. Use Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo.

Envío de eventos

En esta sección se muestra cómo crear una aplicación go para enviar eventos a un centro de eventos.

Instalación del paquete de Go

Obtenga el paquete Go para Event Hubs, como se muestra en el ejemplo siguiente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Código para enviar eventos a un centro de eventos

Este es el código para enviar eventos a un centro de eventos. Los pasos principales del código son:

  1. Cree un cliente productor de Event Hubs utilizando una cadena de conexión para el namespace de Event Hubs y el nombre del centro de eventos.
  2. Cree un objeto por lotes y agregue eventos de ejemplo al lote.
  3. Enviar el lote de eventos a los eventos.

Importante

Reemplace por NAMESPACE CONNECTION STRING la cadena de conexión al espacio de nombres de Event Hubs y EVENT HUB NAME por el nombre del centro de eventos en el código de ejemplo.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

Aún no ejecute la aplicación. Primero debe ejecutar la aplicación receptora y, a continuación, la aplicación remitente.

Recepción de eventos

Creación de una cuenta de almacenamiento y un contenedor

Los estados como, por ejemplo, las concesiones sobre particiones y puntos de comprobación de los eventos se comparten entre receptores mediante un contenedor de Azure Storage. Puede crear una cuenta de almacenamiento y un contenedor con el SDK de Go, pero también puede crear una siguiendo las instrucciones de Acerca de las cuentas de almacenamiento de Azure.

Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:

  • Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero usar un contenedor por cada grupo.
  • No use la cuenta de almacenamiento para nada más.
  • No use el contenedor para nada más.
  • Cree la cuenta de almacenamiento en la misma región que la aplicación implementada. Si la aplicación es local, intente elegir la región más cercana posible.

En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.

  • Espacio de nombres jerárquico
  • Eliminación temporal de blobs
  • Versionamiento

Paquetes de Go

Para recibir los mensajes, obtenga los paquetes de Go para Event Hubs, como se muestra en el ejemplo siguiente.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

Código para recibir eventos de un centro de eventos

Este es el código para recibir eventos de un centro de eventos. Los pasos principales del código son:

  1. Compruebe un objeto de almacén de puntos de control que represente el almacenamiento de blobs de Azure usado por el centro de eventos para la creación de puntos de control.
  2. Crear un cliente consumidor de Event Hubs mediante una cadena de conexión al espacio de nombres de Event Hubs y el nombre del centro de eventos.
  3. Cree un procesador de eventos mediante el objeto de cliente y el objeto de almacén de puntos de control. El procesador recibe y procesa eventos.
  4. Para cada partición del centro de eventos, cree un cliente de partición con processEvents como función para procesar eventos.
  5. Ejecute todos los clientes de partición para recibir y procesar eventos.

Importante

Reemplace los siguientes valores de marcador de posición por los valores reales:

  • AZURE STORAGE CONNECTION STRING con la cadena de conexión de la cuenta de Azure Storage
  • BLOB CONTAINER NAME por el nombre del contenedor de blobs que creó en la cuenta de almacenamiento.
  • NAMESPACE CONNECTION STRING con la cadena de conexión del espacio de nombres de Event Hubs
  • EVENT HUB NAME con el nombre del centro de eventos en el código de ejemplo.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

Ejecución de aplicaciones receptoras y remitentes

  1. Ejecute primero la aplicación receptora.

  2. Ejecute la aplicación remitente.

  3. Espere un minuto para ver la salida siguiente en la ventana del receptor.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Pasos siguientes

Consulte ejemplos en GitHub en https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.