Azure Service Bus kuyruklarına ileti gönderme ve bu kuyruklardan ileti alma (Go)

Bu öğreticide Go programlama dilini kullanarak Azure Service Bus kuyruklarına ileti göndermeyi ve bu kuyruklardan ileti almayı öğreneceksiniz.

Azure Service Bus, ileti kuyrukları ve yayımlama/abone olma özelliklerine sahip tam olarak yönetilen bir kurumsal ileti aracısıdır. Service Bus, uygulamaları ve hizmetleri birbirinden ayırarak dağıtılmış, güvenilir ve yüksek performanslı bir ileti aktarımı sağlamak için kullanılır.

Go için Azure SDK'nın azservicebus paketi, Azure Service Bus ve Go programlama dilini kullanarak ileti gönderip almanıza olanak tanır.

Bu öğreticinin sonunda şunları yapabileceksiniz: kuyruğa tek bir ileti veya toplu ileti gönderme, iletileri alma ve işlenmemiş teslim edilemeyen iletiler.

Önkoşullar

Örnek uygulamayı oluşturma

Başlamak için yeni bir Go modülü oluşturun.

  1. adlı modül service-bus-go-how-to-use-queuesiçin yeni bir dizin oluşturun.

  2. dizininde azservicebus modülü başlatın ve gerekli paketleri yükleyin.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. main.go adlı yeni bir dosya oluşturun.

Kimlik doğrulaması yapma ve istemci oluşturma

main.go dosyasında adlı GetClient yeni bir işlev oluşturun ve aşağıdaki kodu ekleyin:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

işlevi, GetClient Azure Service Bus ad alanı ve kimlik bilgisi kullanılarak oluşturulan yeni azservicebus.Client bir nesne döndürür. Ad alanı ortam değişkeni tarafından AZURE_SERVICEBUS_HOSTNAME sağlanır. Kimlik bilgisi işlevi kullanılarak azidentity.NewDefaultAzureCredential oluşturulur.

Yerel geliştirme için Azure DefaultAzureCredential CLI'dan erişim belirteci kullanılmıştır. Bu belirteç, Azure'da kimlik doğrulaması yapmak için komutu çalıştırılarak az login oluşturulabilir.

İpucu

Bağlantı dizesiyle kimlik doğrulaması yapmak için NewClientFromConnectionString işlevini kullanın.

Kuyruğa ileti gönderme

main.go dosyasında adlı SendMessage yeni bir işlev oluşturun ve aşağıdaki kodu ekleyin:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage iki parametre alır: bir ileti dizesi ve bir azservicebus.Client nesne. Ardından yeni azservicebus.Sender bir nesne oluşturur ve iletiyi kuyruğa gönderir. Toplu ileti göndermek için işlevini dosyanıza main.go ekleyinSendMessageBatch.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch iki parametre alır: bir ileti dilimi ve bir azservicebus.Client nesne. Ardından yeni azservicebus.Sender bir nesne oluşturur ve iletileri kuyruğa gönderir.

Kuyruktan ileti alma

Kuyruğa ileti gönderdikten sonra, bu iletileri türüyle azservicebus.Receiver alabilirsiniz. Kuyruktan ileti almak için işlevini dosyanıza main.go ekleyinGetMessage.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage bir azservicebus.Client nesnesi alır ve yeni azservicebus.Receiver bir nesne oluşturur. Ardından kuyruktan iletileri alır. Receiver.ReceiveMessages İşlev iki parametre alır: bağlam ve alınacak ileti sayısı. Receiver.ReceiveMessages işlevi bir nesne dilimi azservicebus.ReceivedMessage döndürür.

Ardından, bir for döngü iletiler arasında yinelenir ve ileti gövdesini yazdırır. CompleteMessage Ardından, iletiyi kuyruktan kaldırarak tamamlamak için işlevi çağrılır.

Uzunluk sınırlarını aşan, geçersiz bir kuyruğa gönderilen veya başarıyla işlenmemiş iletiler teslim edilemeyen ileti kuyruğuna gönderilebilir. İletileri teslim edilemeyen ileti kuyruğuna göndermek için işlevi dosyanıza main.go ekleyinSendDeadLetterMessage.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage bir azservicebus.Client nesnesi ve bir azservicebus.ReceivedMessage nesnesi alır. Ardından iletiyi teslim edilemeyen ileti kuyruğuna gönderir. İşlev iki parametre alır: bağlam ve azservicebus.DeadLetterOptions nesne. İleti Receiver.DeadLetterMessage teslim edilemeyen ileti kuyruğuna gönderilemiyorsa işlev bir hata döndürür.

Teslim edilemeyen ileti kuyruğundan ileti almak için işlevini dosyanıza main.go ekleyinReceiveDeadLetterMessage.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage bir azservicebus.Client nesnesi alır ve teslim edilemeyen ileti kuyruğu seçenekleriyle yeni azservicebus.Receiver bir nesne oluşturur. Ardından iletileri teslim edilemeyen ileti kuyruğundan alır. Ardından işlev, teslim edilemeyen ileti kuyruğundan bir ileti alır. Ardından bu iletinin ölü harfinin nedenini ve açıklamasını yazdırır.

Örnek kod

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Kodu çalıştırma

Kodu çalıştırmadan önce adlı AZURE_SERVICEBUS_HOSTNAMEbir ortam değişkeni oluşturun. Ortam değişkeninin değerini Service Bus ad alanına ayarlayın.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Ardından, uygulamayı çalıştırmak için aşağıdaki go run komutu çalıştırın:

go run main.go

Sonraki adımlar

Daha fazla bilgi için aşağıdaki bağlantıları gözden geçirin: