إرسال رسائل واستلام الرسائل من قوائم انتظار ناقل خدمة Azure (Go)

في هذا البرنامج التعليمي، ستتعلم كيفية إرسال الرسائل إلى قوائم انتظار ناقل خدمة Azure وتلقيها منها باستخدام لغة برمجة Go.

ناقل خدمة Azure هو وسيط رسائل مؤسسي مُدار بالكامل مع قوائم انتظار الرسائل وقدرات النشر والاشتراك. يتم استخدام ناقل الخدمة لفصل التطبيقات والخدمات عن بعضها البعض، مما يوفر نقل رسائل موزعة وموثوقة وعالية الأداء.

تسمح لك حزمة Azure SDK azservicebus الخاصة بـ Go بإرسال واستقبال الرسائل من ناقل خدمة Azure واستخدام لغة برمجة Go.

بنهاية هذا البرنامج التعليمي، ستتمكن من: إرسال رسالة واحدة أو دفعة من الرسائل إلى قائمة انتظار، وتلقي الرسائل، والرسائل غير المستخدمة التي لم تتم معالجتها.

المتطلبات الأساسية

إعادة إنشاء نموذج التطبيق

للبدء، قم بإنشاء وحدة نمطية Go جديدة.

  1. إنشاء دليل جديد للوحدة النمطية المسماة service-bus-go-how-to-use-queues.

  2. في الدليل azservicebus، قم بتهيئة الوحدة النمطية وتثبيت الحزم المطلوبة.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. إنشاء ملف جديد يسمى main.go.

مصادقة عميل وإنشاؤه

في الملف main.go أنشئ دالة جديدة مسماة GetClient وأضف التعليمات البرمجية التالية:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

ترجع الدالة GetClient عنصر azservicebus.Client جديد تم إنشاؤه باستخدام مساحة اسم ناقل خدمة Azure وبيانات اعتماد. يتم توفير مساحة الاسم بواسطة متغير البيئة AZURE_SERVICEBUS_HOSTNAME. ويتم إنشاء بيانات الاعتماد باستخدام الدالة azidentity.NewDefaultAzureCredential.

للتطوير المحلي، استخدم DefaultAzureCredential الرمز المميز للوصول من Azure CLI، والذي يمكن إنشاؤه عن طريق تشغيل الأمر az login للمصادقة على Azure.

تلميح

للمصادقة باستخدام سلسلة اتصال، استخدم الدالة NewClientFromConnectionString.

إرسال الرسائل إلى قائمة الانتظار

في الملف main.go أنشئ دالة جديدة مسماة SendMessage وأضف التعليمات البرمجية التالية:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage يأخذ معلمتين: سلسلة رسالة وعنصر azservicebus.Client. ثم ينشئ عنصر azservicebus.Sender جديد ويرسل الرسالة إلى قائمة الانتظار. لإرسال رسائل مجمعة، أضف الدالة SendMessageBatch إلى الملف main.go الخاص بك.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch يأخذ معلمتين: شريحة من الرسائل وعنصر azservicebus.Client. ثم يقوم بإنشاء عنصر azservicebus.Sender جديد وإرسال الرسائل إلى قائمة الانتظار.

استقبال الرسائل من قائمة الانتظار

بعد إرسال رسائل إلى قائمة الانتظار، يمكنك تلقيها مع النوع azservicebus.Receiver. لتلقي الرسائل من قائمة انتظار، أضف الدالة GetMessage إلى ملفك main.go.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage يأخذ عنصر azservicebus.Client وينشئ عنصر azservicebus.Receiver جديد. ثم يتلقى الرسائل من قائمة الانتظار. تأخذ الدالة Receiver.ReceiveMessages معلمتين: سياق وعدد الرسائل المراد تلقيها. ترجع الدالة Receiver.ReceiveMessages شريحة من azservicebus.ReceivedMessage العناصر.

بعد ذلك، for يتكرر التكرار الحلقي عبر الرسائل ويطبع نص الرسالة. ثم يتم استدعاء الدالة CompleteMessage لإكمال الرسالة، وإزالتها من قائمة الانتظار.

يمكن إرسال الرسائل التي تتجاوز حدود الطول أو يتم إرسالها إلى قائمة انتظار غير صالحة أو لم تتم معالجتها بنجاح إلى قائمة انتظار الرسائل المهملة. لإرسال رسائل إلى قائمة انتظار الرسائل غير المستخدمة، أضف الدالة SendDeadLetterMessage إلى الملف الخاص بكmain.go.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage يأخذ عنصر azservicebus.Client وعنصر azservicebus.ReceivedMessage. ثم يرسل الرسالة إلى قائمة انتظار الرسائل غير المستخدمة. تأخذ الدالة معلمتين: سياق وعنصر azservicebus.DeadLetterOptions. ترجع الدالة Receiver.DeadLetterMessage خطأ إذا فشل إرسال الرسالة إلى قائمة انتظار الأحرف غير المستخدمة.

لتلقي الرسائل من قائمة انتظار الأحرف غير المستخدمة، أضف الدالة ReceiveDeadLetterMessage إلى الملفmain.go.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage يأخذ عنصر azservicebus.Client وينشئ عنصر azservicebus.Receiver جديد مع خيارات لقائمة انتظار الأحرف غير المستخدمة. ثم يتلقى الرسائل من قائمة انتظار الرسائل غير المستخدمة. ثم تتلقى الدالة رسالة واحدة من قائمة انتظار الأحرف غير المستخدمة. ثم يقوم بطباعة سبب الرسالة غير المستخدمة ووصفها.

نموذج التعليمة البرمجية

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

تشغيل التعليمات البرمجية

قبل تشغيل التعليمات البرمجية، أنشئ متغير بيئة باسم AZURE_SERVICEBUS_HOSTNAME. تعيين قيمة متغير البيئة إلى مساحة ناقل خدمة مسماة.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

من ثم، شغّل الأمر التالي go run لتشغيل التطبيق:

go run main.go

الخطوات التالية

لمزيد من المعلومات، تحقق من الارتباطات التالية: