Inicio rápido: Envío o recepción de eventos en Event Hubs mediante Go
Azure Event Hubs es una plataforma de streaming de macrodatos y servicio de ingesta de eventos de gran escalabilidad capaz de recibir y procesar millones de eventos por segundo. Event Hubs puede procesar y almacenar eventos, datos o telemetría generados por dispositivos y software distribuido. Los datos enviados a un centro de eventos se pueden transformar y almacenar con cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes y almacenamiento. Para más información sobre Event Hubs, consulte Introducción a Event Hubs y Características de Event Hubs.
En este inicio rápido se describe cómo escribir aplicaciones de Go para enviar eventos a un centro de eventos o recibirlos de este.
Nota
Este inicio rápido se basa en ejemplos de GitHub en https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. La sección de envío de eventos se basa en el ejemplo example_producing_events_test.go, mientras que la sección de recepción se basa en el ejemplo example_processor_test.go. El código se ha simplificado para el inicio rápido y se han quitado todos los comentarios detallados, así que examine los ejemplos para obtener más detalles y explicaciones.
Requisitos previos
Para completar este tutorial de inicio rápido, debe cumplir los siguientes requisitos previos:
- Go instalado de forma local. Siga estas instrucciones si es necesario.
- Una cuenta de Azure activa. Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.
- Creación de un espacio de nombres de Event Hubs y un centro de eventos. Use Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo.
Envío de eventos
En esta sección se muestra cómo crear una aplicación de Go para enviar eventos a un centro de eventos.
Instalación del paquete Go
Obtenga el paquete de Go para Event Hubs como se muestra en el ejemplo siguiente.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Código para enviar eventos a un centro de eventos
Este es el código para enviar eventos a un centro de eventos. Los pasos principales del código son:
- Crear un cliente productor de Event Hubs mediante una cadena de conexión al espacio de nombres de Event Hubs y el nombre del centro de eventos.
- Crear un objeto por lotes y agregar eventos de ejemplo al lote.
- Enviar el lote de eventos a los eventos.
Importante
Reemplace NAMESPACE CONNECTION STRING
por la cadena de conexión al espacio de nombres de Event Hubs y EVENT HUB NAME
por el nombre del centro de eventos del código de ejemplo.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
No ejecute todavía la aplicación. Primero debe ejecutar la aplicación receptora y, luego, la aplicación remitente.
Recepción de eventos
Creación de una cuenta de almacenamiento y un contenedor
Los estados como, por ejemplo, las concesiones sobre particiones y puntos de comprobación de los eventos se comparten entre receptores mediante un contenedor de Azure Storage. Puede crear una cuenta de almacenamiento y un contenedor con el SDK para Go, pero también puede crearlos siguiendo las instrucciones de Acerca de las cuentas de almacenamiento de Azure.
Siga estas recomendaciones al usar Azure Blob Storage como almacén de puntos de control:
- Use un contenedor independiente para cada grupo de consumidores. Puede usar la misma cuenta de almacenamiento, pero usar un contenedor por cada grupo.
- No use el contenedor ni la cuenta de almacenamiento para otras actividades.
- La cuenta de almacenamiento debe estar en la misma región en la que se encuentra la aplicación implementada. Si la aplicación es local, intente elegir la región más cercana posible.
En la página Cuenta de almacenamiento de Azure Portal, en la sección Blob service, asegúrese de que la siguiente configuración está deshabilitada.
- Espacio de nombres jerárquico
- Eliminación temporal de blobs
- Control de versiones
Paquetes de Go
Para recibir los mensajes, obtenga los paquetes de Go para Event Hubs, como se muestra en el ejemplo siguiente.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
Código para recibir eventos de un centro de eventos
Este es el código para recibir eventos de un centro de eventos. Los pasos principales del código son:
- Comprobar un objeto de almacén de puntos de control que representa la instancia de Azure Blob Storage que el centro de eventos usa para crear puntos de control.
- Crear un cliente consumidor de Event Hubs mediante una cadena de conexión al espacio de nombres de Event Hubs y el nombre del centro de eventos.
- Crear un procesador de eventos mediante el objeto cliente y el objeto de almacén de puntos de control. El procesador recibe y procesa eventos.
- Para cada partición del centro de eventos, crear un cliente de partición con processEvents como función para procesar eventos.
- Ejecutar todos los clientes de partición para recibir y procesar eventos.
Importante
Reemplace los siguientes valores de marcador de posición por valores reales:
AZURE STORAGE CONNECTION STRING
por la cadena de conexión para la cuenta de almacenamiento de Azure.BLOB CONTAINER NAME
por el nombre del contenedor de blobs que creó en la cuenta de almacenamiento.NAMESPACE CONNECTION STRING
por la cadena de conexión para el espacio de nombres de Event Hubs.EVENT HUB NAME
por el nombre del centro de eventos del código de ejemplo.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
Ejecución de las aplicaciones receptora y remitente
Ejecute primero la aplicación receptora.
Ejecute la aplicación remitente.
Espere un minuto para ver la siguiente salida en la ventana receptora.
Processing 2 event(s) Event received with body hello Event received with body world
Pasos siguientes
Consulte ejemplos en GitHub en https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.