다음을 통해 공유


빠른 시작: Go를 사용하여 Event Hubs에 이벤트 보내기 또는 받기

Azure Event Hubs는 초당 수백만 개의 이벤트를 수신하여 처리할 수 있는 빅 데이터 스트리밍 플랫폼이자 이벤트 수집 서비스입니다. Event Hubs는 분산된 소프트웨어와 디바이스에서 생성된 이벤트, 데이터 또는 원격 분석을 처리하고 저장할 수 있습니다. Event Hub로 전송된 데이터는 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 변환하고 저장할 수 있습니다. Event Hubs에 대한 자세한 개요는 Event Hubs 개요Event Hubs 기능을 참조하세요.

이 빠른 시작에서는 이벤트 허브에서 이벤트를 보내거나 받는 Go 애플리케이션을 작성하는 방법을 설명합니다.

비고

이 빠른 시작은 GitHub의 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs샘플을 기반으로 하며, 이벤트 보내기 섹션은 example_producing_events_test.go 샘플을 기반으로 하며 수신 섹션은 example_processor_test.go 샘플을 기반으로 합니다. 빠른 시작에 대한 코드가 간소화되고 모든 자세한 주석이 제거되므로 샘플에서 자세한 내용과 설명을 확인하세요.

필수 조건

이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.

  • Go가 로컬에 설치되었습니다. 필요한 경우 다음 지침을 따릅니다.
  • 활성 Azure 계정입니다. Azure 구독이 없는 경우, 시작하기 전에 무료 계정을 만드십시오.
  • Event Hubs 네임스페이스 및 이벤트 허브 만들기 Azure Portal을 사용하여 Event Hubs 유형의 네임스페이스를 만들고 애플리케이션이 이벤트 허브와 통신하는 데 필요한 관리 자격 증명을 가져옵니다. 네임스페이스 및 이벤트 허브를 만들려면 이 문서의 절차를 따릅니다.

이벤트 보내기

이 섹션에서는 이벤트 허브에 이벤트를 보내는 Go 애플리케이션을 만드는 방법을 보여 줍니다.

Go 패키지 설치

다음 예제와 같이 Event Hubs용 Go 패키지를 가져옵니다.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

이벤트 허브에 이벤트를 보내는 코드

이벤트 허브에 이벤트를 보내는 코드는 다음과 같습니다. 코드의 주요 단계는 다음과 같습니다.

  1. Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 생산자 클라이언트를 만듭니다.
  2. 일괄 처리 개체를 만들고 일괄 처리에 샘플 이벤트를 추가합니다.
  3. 이벤트로 이벤트 일괄 처리를 보냅니다.

중요합니다

샘플 코드에서 NAMESPACE CONNECTION STRING를 Event Hubs 네임스페이스에 대한 연결 문자열로, EVENT HUB NAME를 이벤트 허브 이름으로 바꾸세요.

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

애플리케이션을 아직 실행하지 마세요. 먼저 수신기 앱을 실행한 다음 보낸 사람 앱을 실행해야 합니다.

이벤트 수신

Storage 계정 및 컨테이너 만들기

파티션의 임대 및 이벤트의 검사점과 같은 상태는 Azure Storage 컨테이너를 사용하여 수신기 간에 공유됩니다. Go SDK를 사용하여 스토리지 계정 및 컨테이너를 만들 수 있지만 Azure Storage 계정 정보 지침에 따라 만들 수도 있습니다.

검사점 저장소로 Azure Blob Storage를 사용하는 경우 다음 권장 사항을 따릅니다.

  • 각 소비자 그룹에 대해 별도의 컨테이너를 사용합니다. 동일한 스토리지 계정을 사용할 수 있지만 각 그룹당 하나의 컨테이너를 사용합니다.
  • 스토리지 계정을 다른 용도로 사용하지 마세요.
  • 컨테이너를 다른 용도로 사용하지 마세요.
  • 배포된 애플리케이션과 동일한 지역에 스토리지 계정을 만듭니다. 애플리케이션이 온-프레미스인 경우 가능한 가장 가까운 지역을 선택해 보세요.

Azure Portal에서 Storage 계정 페이지의 Blob service 섹션에서 다음 설정을 사용하지 않도록 설정해야 합니다.

  • 계층 구조 네임스페이스
  • Blob 일시 삭제
  • 버전 관리

Go 패키지

메시지를 받으려면 다음 예제와 같이 Event Hubs용 Go 패키지를 가져옵니다.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

이벤트 허브에서 이벤트를 수신하는 코드

이벤트 허브에서 이벤트를 수신하는 코드는 다음과 같습니다. 코드의 주요 단계는 다음과 같습니다.

  1. 검사점을 지정하기 위해 이벤트 허브에서 사용하는 Azure Blob Storage를 나타내는 검사점 저장소 개체를 확인합니다.
  2. Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 소비자 클라이언트를 만듭니다.
  3. 클라이언트 개체와 검사점 저장소 개체를 사용하여 이벤트 프로세서를 만듭니다. 프로세서는 이벤트를 수신하고 처리합니다.
  4. 이벤트 허브의 각 파티션에 대해 processEvents를 이벤트를 처리하는 함수로 사용하여 파티션 클라이언트를 만듭니다.
  5. 모든 파티션 클라이언트를 실행하여 이벤트를 수신하고 처리합니다.

중요합니다

다음 자리 표시자 값을 실제 값으로 바꿉다.

  • AZURE STORAGE CONNECTION STRING을 Azure 스토리지 계정의 연결 문자열로 바꿉니다.
  • BLOB CONTAINER NAME을 스토리지 계정에서 만든 Blob 컨테이너의 이름으로 바꿉니다.
  • NAMESPACE CONNECTION STRING을 Event Hubs 네임스페이스의 연결 문자열로 바꿉니다.
  • EVENT HUB NAME 샘플 코드의 이벤트 허브 이름을 사용합니다.
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

수신기 및 보낸 사람 앱 실행

  1. 수신기 앱을 먼저 실행합니다.

  2. 보낸 사람 앱을 실행합니다.

  3. 수신기 창에서 다음 출력을 보려면 1분 동안 기다립니다.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

다음 단계

의 GitHub에서 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs샘플을 참조하세요.