在 Azure 服務匯流排佇列 (Go) 中傳送和接收訊息

在本教學課程中,您將瞭解如何使用 Go 程式設計語言來傳送及接收 Azure 服務匯流排佇列的訊息。

Azure 服務匯流排是完全受控管的企業訊息代理程式,具有訊息佇列和發佈/訂閱功能。 服務匯流排可用來將應用程式和服務彼此分離,以提供分散、可靠且高效能的訊息傳輸。

Azure SDK for Go 的 azservicebus 套件可讓您您使用 Go 程式設計語言來傳送及接收 Azure 服務匯流排佇列的訊息。

在本教學課程結束時,您將能夠:將單一訊息或訊息批次傳送至佇列、接收訊息,以及將未處理的訊息歸類為寄不出的訊息。

必要條件

建立範例應用程式

若要開始,請建立新的 Go 模組。

  1. 為名為 service-bus-go-how-to-use-queues 的模組建立新的目錄。

  2. azservicebus 目錄中,初始化模組並安裝必要的套件。

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. 建立名為 main.go 的新檔案。

驗證並建立用戶端

在檔案 main.go 中,建立名為 GetClient 的新函式,並新增下列程式碼:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

GetClient 函式會傳回使用 Azure 服務匯流排命名空間和認證所建立的新 azservicebus.Client 物件。 命名空間是由 AZURE_SERVICEBUS_HOSTNAME 環境變數提供。 且認證是使用函式 azidentity.NewDefaultAzureCredential 來建立。

若為本機開發,DefaultAzureCredential 會使用來自 Azure CLI 的存取權杖,其可藉由執行 az login 命令向 Azure 進行驗證來建立。

提示

若要使用連接字串進行驗證,請使用 NewClientFromConnectionString 函式。

傳送訊息至佇列

在檔案 main.go 中,建立名為 SendMessage 的新函式,並新增下列程式碼:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage 接受兩個參數:訊息字串和 azservicebus.Client 物件。 然後,它會建立新的 azservicebus.Sender 物件,並將訊息傳送至佇列。 若要傳送大量訊息,請將函式 SendMessageBatch 新增至您的 main.go 檔案。

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch 會採用兩個參數:一則訊息配量和一個 azservicebus.Client 物件。 然後,它會建立新的 azservicebus.Sender 物件,並將訊息傳送至佇列。

從佇列接收訊息

您將訊息傳送至佇列之後,就可以使用 azservicebus.Receiver 類型來接收訊息。 若要從佇列接收訊息,請將函式 GetMessage 新增至您的 main.go 檔案。

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage 會接受 azservicebus.Client 物件並建立新的 azservicebus.Receiver 物件。 然後,它會從佇列接收訊息。 函式 Receiver.ReceiveMessages 會採用兩個參數:內容和要接收的訊息數目。 函式 Receiver.ReceiveMessages 會傳回物件 azservicebus.ReceivedMessage 的配量。

接下來,for 迴圈會逐一查看訊息並列印訊息本文。 然後系統會呼叫函式 CompleteMessage 以完成訊息,並從佇列中移除它。

超過長度限制、傳送至無效佇列或未成功處理的訊息可以傳送至無效信件佇列。 若要將訊息傳送至無效信件佇列,請將函式 SendDeadLetterMessage 新增至您的 main.go 檔案。

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage 會採用 azservicebus.Client 物件和 azservicebus.ReceivedMessage 物件。 然後,它會將訊息傳送至無效信件佇列。 函式會採用兩個參數:內容和 azservicebus.DeadLetterOptions 物件。 如果訊息無法傳送至無效信件佇列,此 Receiver.DeadLetterMessage 函式會傳回錯誤。

若要將訊息傳送至無效信件佇列,請將函式 ReceiveDeadLetterMessage 新增至您的 main.go 檔案。

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage 會採用 azservicebus.Client 物件,並使用無效信件佇列的選項來建立新的 azservicebus.Receiver 物件。 然後,它會從無效信件佇列接收訊息。 函式接著會從無效信件佇列接收一則訊息。 然後,它會列印信件無效的原因及該訊息的描述。

範例指令碼

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

執行程式碼

執行程式碼之前,請先建立名稱為 AZURE_SERVICEBUS_HOSTNAME 的環境變數。 將環境變數的值設定為服務匯流排命名空間。

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

接下來,執行下列 go run 命令來執行應用程式:

go run main.go

下一步

如需詳細資訊,請查看下面連結: