Краткое руководство. Отправка и получение событий с помощью Go в Центрах событий

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий, принимающая и обрабатывающая миллионы событий в секунду. Центры событий могут обрабатывать и сохранять события, данные и телеметрию, созданные распределенным программным обеспечением и устройствами. Данные, отправляемые в концентратор событий, можно преобразовывать и сохранять с помощью любого поставщика аналитики в реальном времени, а также с помощью адаптеров пакетной обработки или хранения. Подробный обзор Центров событий см. в статьях Что такое Центры событий Azure? и Обзор функций Центров событий.

В этом кратком руководстве описывается, как записывать приложения Go для отправки событий в концентратор событий или получения событий из концентратора событий.

Примечание.

Это краткое руководство основано на примерах на сайте GitHub https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Отправка основывается на примере example_producing_events_test.go , а получение основано на примере example_processor_test.go . Код упрощен для краткого руководства, и все подробные комментарии удаляются, поэтому ознакомьтесь с примерами для получения дополнительных сведений и объяснений.

Необходимые компоненты

Для работы с данным руководством необходимо следующее:

  • Локально установленный Go. При необходимости выполните следующие инструкции.
  • Активная учетная запись Azure. Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.
  • Создайте пространство имен Центров событий и концентратор событий. Вы можете использовать портал Azure для создания пространства имен типа Центров событий и получать учетные данные для управления, требуемые приложению для взаимодействия с концентратором событий. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.

Отправка событий

Из этого раздела вы узнаете, как создать приложение Go, которое отправляет события в концентратор событий.

Установка пакета Go

Получите пакет Go для Центров событий, как показано в следующем примере.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Код для отправки событий в концентратор событий

Ниже приведен код для отправки событий в концентратор событий. Основными этапами кода являются:

  1. Создайте клиент производителя Центров событий с помощью строка подключения в пространство имен Центров событий и имя концентратора событий.
  2. Создайте пакетный объект и добавьте примеры событий в пакет.
  3. Отправьте пакет событий в события th.

Важно!

Замените NAMESPACE CONNECTION STRING строка подключения пространством имен Центров событий и EVENT HUB NAME именем концентратора событий в примере кода.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Еще не запускайте приложение. Сначала необходимо запустить приложение-получатель, а затем приложение отправителя.

Получение событий

Создание учетной записи хранения и контейнера

Такие состояния, как аренда разделов и контрольные точки в потоке событий, совместно используются получателями через контейнер службы хранилища Azure. Вы можете создать учетную запись хранения и контейнер с помощью пакета SDK для Go, но их также можно создать, следуя инструкциям в статье Об учетных записях хранения Azure.

Следуйте этим рекомендациям при использовании Хранилище BLOB-объектов Azure в качестве хранилища проверка point:

  • Используйте отдельный контейнер для каждой группы потребителей. Вы можете использовать одну и ту же учетную запись хранения, но использовать один контейнер для каждой группы.
  • Не используйте контейнер для других компонентов и не используйте учетную запись хранения для других действий.
  • служба хранилища учетная запись должна находиться в том же регионе, в который находится развернутое приложение. Если приложение находится в локальной среде, попробуйте выбрать ближайший регион.

На странице учетной записи служба хранилища в портал Azure в разделе службы BLOB-объектов убедитесь, что следующие параметры отключены.

  • Иерархическое пространство имен
  • Обратимое удаление BLOB-объекта
  • Управление версиями

Пакеты Go

Чтобы получить сообщения, получите пакеты Go для Центров событий, как показано в следующем примере.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Код для получения событий из концентратора событий

Ниже приведен код для получения событий из концентратора событий. Основными этапами кода являются:

  1. Проверьте объект хранилища проверка point, представляющий Хранилище BLOB-объектов Azure, используемый центром событий для проверка назначения.
  2. Создайте клиент-получатель Центров событий с помощью строка подключения в пространство имен Центров событий и имя концентратора событий.
  3. Создайте обработчик событий с помощью клиентского объекта и объекта хранилища проверка point. Обработчик получает и обрабатывает события.
  4. Для каждой секции в концентраторе событий создайте клиент секции с processEvents в качестве функции для обработки событий.
  5. Запустите все клиенты секционирования для получения и обработки событий.

Важно!

Замените следующие значения заполнителей фактическими значениями:

  • AZURE STORAGE CONNECTION STRINGс строка подключения для учетной записи хранения Azure
  • BLOB CONTAINER NAME с именем контейнера BLOB-объектов, созданного в учетной записи хранения
  • NAMESPACE CONNECTION STRINGс строка подключения для пространства имен Центров событий
  • EVENT HUB NAME с именем концентратора событий в примере кода.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Запуск приложений получателя и отправителя

  1. Сначала запустите приложение-получатель.

  2. Запустите приложение отправителя.

  3. Подождите минуту, чтобы увидеть следующие выходные данные в окне приемника.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Следующие шаги

См. примеры на сайте GitHub https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs.