Azure Event Hubs는 초당 수백만 개의 이벤트를 수신하여 처리할 수 있는 빅 데이터 스트리밍 플랫폼이자 이벤트 수집 서비스입니다. Event Hubs는 분산된 소프트웨어와 디바이스에서 생성된 이벤트, 데이터 또는 원격 분석을 처리하고 저장할 수 있습니다. Event Hub로 전송된 데이터는 실시간 분석 공급자 또는 일괄 처리/스토리지 어댑터를 사용하여 변환하고 저장할 수 있습니다. Event Hubs에 대한 자세한 개요는 Event Hubs 개요 및 Event Hubs 기능을 참조하세요.
이 빠른 시작에서는 이벤트 허브에서 이벤트를 보내거나 받는 Go 애플리케이션을 작성하는 방법을 설명합니다.
비고
이 빠른 시작은 GitHub의 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs샘플을 기반으로 하며, 이벤트 보내기 섹션은 example_producing_events_test.go 샘플을 기반으로 하며 수신 섹션은 example_processor_test.go 샘플을 기반으로 합니다. 빠른 시작에 대한 코드가 간소화되고 모든 자세한 주석이 제거되므로 샘플에서 자세한 내용과 설명을 확인하세요.
필수 조건
이 빠른 시작을 완료하려면 다음 필수 구성 요소가 필요합니다.
- Go가 로컬에 설치되었습니다. 필요한 경우 다음 지침을 따릅니다.
- 활성 Azure 계정입니다. Azure 구독이 없는 경우, 시작하기 전에 무료 계정을 만드십시오.
- Event Hubs 네임스페이스 및 이벤트 허브 만들기 Azure Portal을 사용하여 Event Hubs 유형의 네임스페이스를 만들고 애플리케이션이 이벤트 허브와 통신하는 데 필요한 관리 자격 증명을 가져옵니다. 네임스페이스 및 이벤트 허브를 만들려면 이 문서의 절차를 따릅니다.
이벤트 보내기
이 섹션에서는 이벤트 허브에 이벤트를 보내는 Go 애플리케이션을 만드는 방법을 보여 줍니다.
Go 패키지 설치
다음 예제와 같이 Event Hubs용 Go 패키지를 가져옵니다.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
이벤트 허브에 이벤트를 보내는 코드
이벤트 허브에 이벤트를 보내는 코드는 다음과 같습니다. 코드의 주요 단계는 다음과 같습니다.
- Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 생산자 클라이언트를 만듭니다.
- 일괄 처리 개체를 만들고 일괄 처리에 샘플 이벤트를 추가합니다.
- 이벤트로 이벤트 일괄 처리를 보냅니다.
중요합니다
샘플 코드에서 NAMESPACE CONNECTION STRING
를 Event Hubs 네임스페이스에 대한 연결 문자열로, EVENT HUB NAME
를 이벤트 허브 이름으로 바꾸세요.
package main
import (
"context"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)
func main() {
// create an Event Hubs producer client using a connection string to the namespace and the event hub
producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)
if err != nil {
panic(err)
}
defer producerClient.Close(context.TODO())
// create sample events
events := createEventsForSample()
// create a batch object and add sample events to the batch
newBatchOptions := &azeventhubs.EventDataBatchOptions{}
batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)
if err != nil {
panic(err)
}
for i := 0; i < len(events); i++ {
err = batch.AddEventData(events[i], nil)
if err != nil {
panic(err)
}
}
// send the batch of events to the event hub
err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)
if err != nil {
panic(err)
}
}
func createEventsForSample() []*azeventhubs.EventData {
return []*azeventhubs.EventData{
{
Body: []byte("hello"),
},
{
Body: []byte("world"),
},
}
}
애플리케이션을 아직 실행하지 마세요. 먼저 수신기 앱을 실행한 다음 보낸 사람 앱을 실행해야 합니다.
이벤트 수신
Storage 계정 및 컨테이너 만들기
파티션의 임대 및 이벤트의 검사점과 같은 상태는 Azure Storage 컨테이너를 사용하여 수신기 간에 공유됩니다. Go SDK를 사용하여 스토리지 계정 및 컨테이너를 만들 수 있지만 Azure Storage 계정 정보 지침에 따라 만들 수도 있습니다.
검사점 저장소로 Azure Blob Storage를 사용하는 경우 다음 권장 사항을 따릅니다.
- 각 소비자 그룹에 대해 별도의 컨테이너를 사용합니다. 동일한 스토리지 계정을 사용할 수 있지만 각 그룹당 하나의 컨테이너를 사용합니다.
- 스토리지 계정을 다른 용도로 사용하지 마세요.
- 컨테이너를 다른 용도로 사용하지 마세요.
- 배포된 애플리케이션과 동일한 지역에 스토리지 계정을 만듭니다. 애플리케이션이 온-프레미스인 경우 가능한 가장 가까운 지역을 선택해 보세요.
Azure Portal에서 Storage 계정 페이지의 Blob service 섹션에서 다음 설정을 사용하지 않도록 설정해야 합니다.
- 계층 구조 네임스페이스
- Blob 일시 삭제
- 버전 관리
Go 패키지
메시지를 받으려면 다음 예제와 같이 Event Hubs용 Go 패키지를 가져옵니다.
go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob
이벤트 허브에서 이벤트를 수신하는 코드
이벤트 허브에서 이벤트를 수신하는 코드는 다음과 같습니다. 코드의 주요 단계는 다음과 같습니다.
- 검사점을 지정하기 위해 이벤트 허브에서 사용하는 Azure Blob Storage를 나타내는 검사점 저장소 개체를 확인합니다.
- Event Hubs 네임스페이스 및 이벤트 허브 이름에 대한 연결 문자열을 사용하여 Event Hubs 소비자 클라이언트를 만듭니다.
- 클라이언트 개체와 검사점 저장소 개체를 사용하여 이벤트 프로세서를 만듭니다. 프로세서는 이벤트를 수신하고 처리합니다.
- 이벤트 허브의 각 파티션에 대해 processEvents를 이벤트를 처리하는 함수로 사용하여 파티션 클라이언트를 만듭니다.
- 모든 파티션 클라이언트를 실행하여 이벤트를 수신하고 처리합니다.
중요합니다
다음 자리 표시자 값을 실제 값으로 바꿉다.
-
AZURE STORAGE CONNECTION STRING
을 Azure 스토리지 계정의 연결 문자열로 바꿉니다. -
BLOB CONTAINER NAME
을 스토리지 계정에서 만든 Blob 컨테이너의 이름으로 바꿉니다. -
NAMESPACE CONNECTION STRING
을 Event Hubs 네임스페이스의 연결 문자열로 바꿉니다. -
EVENT HUB NAME
샘플 코드의 이벤트 허브 이름을 사용합니다.
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)
func main() {
// create a container client using a connection string and container name
checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
if err != nil {
panic(err)
}
// create a checkpoint store that will be used by the event hub
checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)
if err != nil {
panic(err)
}
// create a consumer client using a connection string to the namespace and the event hub
consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)
if err != nil {
panic(err)
}
defer consumerClient.Close(context.TODO())
// create a processor to receive and process events
processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if err != nil {
panic(err)
}
// for each partition in the event hub, create a partition client with processEvents as the function to process events
dispatchPartitionClients := func() {
for {
partitionClient := processor.NextPartitionClient(context.TODO())
if partitionClient == nil {
break
}
go func() {
if err := processEvents(partitionClient); err != nil {
panic(err)
}
}()
}
}
// run all partition clients
go dispatchPartitionClients()
processorCtx, processorCancel := context.WithCancel(context.TODO())
defer processorCancel()
if err := processor.Run(processorCtx); err != nil {
panic(err)
}
}
func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
defer closePartitionResources(partitionClient)
for {
receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
receiveCtxCancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
fmt.Printf("Processing %d event(s)\n", len(events))
for _, event := range events {
fmt.Printf("Event received with body %v\n", string(event.Body))
}
if len(events) != 0 {
if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
return err
}
}
}
}
func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
defer partitionClient.Close(context.TODO())
}
수신기 및 보낸 사람 앱 실행
수신기 앱을 먼저 실행합니다.
보낸 사람 앱을 실행합니다.
수신기 창에서 다음 출력을 보려면 1분 동안 기다립니다.
Processing 2 event(s) Event received with body hello Event received with body world
다음 단계
의 GitHub에서 https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs샘플을 참조하세요.