共用方式為


快速入門:使用 Go 將事件傳送至事件中樞或從事件中樞接收事件

Azure 事件中樞是巨量資料串流平台和事件擷取服務,每秒可接收和處理數百萬個事件。 事件中樞可以處理及儲存分散式軟體和裝置所產生的事件、資料或遙測。 傳送至事件中樞的資料可以透過任何即時分析提供者或批次/儲存體配接器來轉換和儲存。 如需事件中樞的詳細概觀,請參閱事件中樞概觀事件中樞功能

此快速入門說明如何撰寫 Go 應用程式,以將事件傳送至事件中樞或從中接收事件。

注意

此快速入門是以 GitHub 上的範例為基礎,網址為:https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs (英文)。 傳送事件區段是以 example_producing_events_test.go 範例為基礎,而接收事件則以 example_processor_test.go 範例為基礎 此快速入門已簡化程式碼,並移除所有詳細註解,因此請查看範例以取得更多詳細資料和說明。

必要條件

若要完成本快速入門,您必須符合下列必要條件:

  • 安裝在本機的 Go。 如果有需要,請依照這些指示 \(英文\) 執行。
  • 使用中的 Azure 帳戶。 如尚未擁有 Azure 訂用帳戶,請在開始之前先建立免費帳戶
  • 建立事件中樞命名空間和事件中樞。 請使用 Azure 入口網站建立「事件中樞」類型的命名空間,然後取得您的應用程式與「事件中樞」進行通訊所需的管理認證。 若要建立命名空間和事件中樞,請依照這篇文章中的程序操作。

傳送事件

本節說明如何建立可將事件傳送至事件中樞的 Go 應用程式。

安裝 Go 套件

取得事件中樞的 Go 套件,如下列範例所示。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

將事件傳送至事件中樞的程式碼

以下是將事件傳送至事件中樞的程式碼。 程式碼中的主要步驟包括:

  1. 使用針對事件中樞命名空間的連接字串和事件中樞名稱,建立事件中樞產生器用戶端。
  2. 建立批次物件,並將範例事件新增至批次。
  3. 將事件批次傳送到事件。

重要

將範例程式碼中的 NAMESPACE CONNECTION STRING 取代為針對您事件中樞命名空間的連接字串,並將 EVENT HUB NAME 取代為事件中樞名稱。

package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// create an Event Hubs producer client using a connection string to the namespace and the event hub
	producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	// create sample events
	events := createEventsForSample()

	// create a batch object and add sample events to the batch
	newBatchOptions := &azeventhubs.EventDataBatchOptions{}

	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if err != nil {
			panic(err)
		}
	}

	// send the batch of events to the event hub
	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}

請暫時不要執行應用程式。 您必須先執行接收器應用程式,然後再執行傳送器應用程式。

接收事件

建立儲存體帳戶和容器

使用 Azure 儲存體 容器,在接收者之間共用事件中的租用,例如分割區和檢查點上的租用。 您可以搭配 Go SDK 建立儲存體帳戶和容器,但您也可以依照關於 Azure 儲存體帳戶中的指示建立它們。

使用 Azure Blob 記憶體做為檢查點存放區時,請遵循這些建議:

  • 針對每個取用者群組使用不同的容器。 您可以使用相同的儲存體帳戶,但每個群組各使用一個容器。
  • 請勿將容器用於其他任何項目,也不會將儲存體帳戶用於其他任何項目。
  • 儲存體帳戶應位於與已部署應用程式所在的相同區域中。 如果應用程式是內部部署,請嘗試選擇最接近的區域。

在 Azure 入口網站的 [儲存體帳戶] 頁面上,於 [Blob 服務] 區段中,確定已停用下列設定。

  • 階層式命名空間
  • Blob 虛刪除
  • 版本控制

Go 套件

若要接收訊息,請取得適用於事件中樞的 Go 套件,如下列範例所示。

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
go get github.com/Azure/azure-sdk-for-go/sdk/storage/azblob

從事件中樞接收事件的程式碼

以下是從事件中樞接收事件的程式碼。 程式碼中的主要步驟包括:

  1. 檢查檢查點存放區物件,此物件代表事件中樞用於檢查點的 Azure Blob 儲存體。
  2. 使用針對事件中樞命名空間的連接字串與事件中樞名稱,建立事件中樞取用者用戶端。
  3. 使用用戶端物件和檢查點存放區物件建立事件處理器。 處理器會接收和處理事件。
  4. 針對事件中樞中的每個分割區,建立具有 processEvents 作為處理事件函式的分割區用戶端。
  5. 執行所有分割區用戶端來接收和處理事件。

重要

將下列預留位置值取代為實際值:

  • AZURE STORAGE CONNECTION STRING 取代為 Azure 儲存體帳戶的連接字串
  • BLOB CONTAINER NAME 取代為您在儲存體帳戶中建立的 Blob 容器名稱
  • NAMESPACE CONNECTION STRING 取代為事件中樞命名空間的連接字串
  • EVENT HUB NAME 取代為範例程式碼中的事件中樞名稱。
package main

import (
	"context"
	"errors"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

	// create a container client using a connection string and container name
	checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

	if err != nil {
		panic(err)
	}

	// create a checkpoint store that will be used by the event hub
	checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

	if err != nil {
		panic(err)
	}

	// create a consumer client using a connection string to the namespace and the event hub
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// create a processor to receive and process events
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	//  for each partition in the event hub, create a partition client with processEvents as the function to process events
	dispatchPartitionClients := func() {
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				break
			}

			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	// run all partition clients
	go dispatchPartitionClients()

	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	defer closePartitionResources(partitionClient)
	for {
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", string(event.Body))
		}

		if len(events) != 0 {
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
				return err
			}
		}
	}
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	defer partitionClient.Close(context.TODO())
}

執行接收器和傳送器應用程式

  1. 先執行接收器應用程式。

  2. 執行傳送器應用程式。

  3. 等候一分鐘,以在接收器視窗中查看下列輸出。

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

下一步

請參閱 GitHub 上的範例,網址為:https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs (英文)。