إرسال رسائل واستلام الرسائل من قوائم انتظار ناقل خدمة Azure (Go)
في هذا البرنامج التعليمي، ستتعلم كيفية إرسال الرسائل إلى قوائم انتظار ناقل خدمة Azure وتلقيها منها باستخدام لغة برمجة Go.
ناقل خدمة Azure هو وسيط رسائل مؤسسي مُدار بالكامل مع قوائم انتظار الرسائل وقدرات النشر والاشتراك. يتم استخدام ناقل الخدمة لفصل التطبيقات والخدمات عن بعضها البعض، مما يوفر نقل رسائل موزعة وموثوقة وعالية الأداء.
تسمح لك حزمة Azure SDK azservicebus الخاصة بـ Go بإرسال واستقبال الرسائل من ناقل خدمة Azure واستخدام لغة برمجة Go.
بنهاية هذا البرنامج التعليمي، ستتمكن من: إرسال رسالة واحدة أو دفعة من الرسائل إلى قائمة انتظار، وتلقي الرسائل، والرسائل غير المستخدمة التي لم تتم معالجتها.
المتطلبات الأساسية
- اشتراك Azure. يمكنك تنشيط مزايا المشتركين في Visual Studio أو MSDN أو الاشتراك في "free account".
- إذا لم يكن لديك قائمة انتظار للعمل معها، فاتبع الخطوات الواردة في مقالة "Use مدخلMicrosoft Azure to create a ناقل خدمة Microsoft Azure queue" لإنشاء قائمة انتظار.
- إصدار Go 1.18 أو أعلى
إعادة إنشاء نموذج التطبيق
للبدء، قم بإنشاء وحدة نمطية Go جديدة.
إنشاء دليل جديد للوحدة النمطية المسماة
service-bus-go-how-to-use-queues
.في الدليل
azservicebus
، قم بتهيئة الوحدة النمطية وتثبيت الحزم المطلوبة.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
إنشاء ملف جديد يسمى
main.go
.
مصادقة عميل وإنشاؤه
في الملف main.go
أنشئ دالة جديدة مسماة GetClient
وأضف التعليمات البرمجية التالية:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
ترجع الدالة GetClient
عنصر azservicebus.Client
جديد تم إنشاؤه باستخدام مساحة اسم ناقل خدمة Azure وبيانات اعتماد. يتم توفير مساحة الاسم بواسطة متغير البيئة AZURE_SERVICEBUS_HOSTNAME
. ويتم إنشاء بيانات الاعتماد باستخدام الدالة azidentity.NewDefaultAzureCredential
.
للتطوير المحلي، استخدم DefaultAzureCredential
الرمز المميز للوصول من Azure CLI، والذي يمكن إنشاؤه عن طريق تشغيل الأمر az login
للمصادقة على Azure.
تلميح
للمصادقة باستخدام سلسلة اتصال، استخدم الدالة NewClientFromConnectionString.
إرسال الرسائل إلى قائمة الانتظار
في الملف main.go
أنشئ دالة جديدة مسماة SendMessage
وأضف التعليمات البرمجية التالية:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
يأخذ معلمتين: سلسلة رسالة وعنصر azservicebus.Client
. ثم ينشئ عنصر azservicebus.Sender
جديد ويرسل الرسالة إلى قائمة الانتظار. لإرسال رسائل مجمعة، أضف الدالة SendMessageBatch
إلى الملف main.go
الخاص بك.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
يأخذ معلمتين: شريحة من الرسائل وعنصر azservicebus.Client
. ثم يقوم بإنشاء عنصر azservicebus.Sender
جديد وإرسال الرسائل إلى قائمة الانتظار.
استقبال الرسائل من قائمة الانتظار
بعد إرسال رسائل إلى قائمة الانتظار، يمكنك تلقيها مع النوع azservicebus.Receiver
. لتلقي الرسائل من قائمة انتظار، أضف الدالة GetMessage
إلى ملفك main.go
.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
يأخذ عنصر azservicebus.Client
وينشئ عنصر azservicebus.Receiver
جديد. ثم يتلقى الرسائل من قائمة الانتظار. تأخذ الدالة Receiver.ReceiveMessages
معلمتين: سياق وعدد الرسائل المراد تلقيها. ترجع الدالة Receiver.ReceiveMessages
شريحة من azservicebus.ReceivedMessage
العناصر.
بعد ذلك، for
يتكرر التكرار الحلقي عبر الرسائل ويطبع نص الرسالة. ثم يتم استدعاء الدالة CompleteMessage
لإكمال الرسالة، وإزالتها من قائمة الانتظار.
يمكن إرسال الرسائل التي تتجاوز حدود الطول أو يتم إرسالها إلى قائمة انتظار غير صالحة أو لم تتم معالجتها بنجاح إلى قائمة انتظار الرسائل المهملة. لإرسال رسائل إلى قائمة انتظار الرسائل غير المستخدمة، أضف الدالة SendDeadLetterMessage
إلى الملف الخاص بكmain.go
.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
يأخذ عنصر azservicebus.Client
وعنصر azservicebus.ReceivedMessage
. ثم يرسل الرسالة إلى قائمة انتظار الرسائل غير المستخدمة. تأخذ الدالة معلمتين: سياق وعنصر azservicebus.DeadLetterOptions
. ترجع الدالة Receiver.DeadLetterMessage
خطأ إذا فشل إرسال الرسالة إلى قائمة انتظار الأحرف غير المستخدمة.
لتلقي الرسائل من قائمة انتظار الأحرف غير المستخدمة، أضف الدالة ReceiveDeadLetterMessage
إلى الملفmain.go
.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
يأخذ عنصر azservicebus.Client
وينشئ عنصر azservicebus.Receiver
جديد مع خيارات لقائمة انتظار الأحرف غير المستخدمة. ثم يتلقى الرسائل من قائمة انتظار الرسائل غير المستخدمة. ثم تتلقى الدالة رسالة واحدة من قائمة انتظار الأحرف غير المستخدمة. ثم يقوم بطباعة سبب الرسالة غير المستخدمة ووصفها.
نموذج التعليمة البرمجية
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
تشغيل التعليمات البرمجية
قبل تشغيل التعليمات البرمجية، أنشئ متغير بيئة باسم AZURE_SERVICEBUS_HOSTNAME
. تعيين قيمة متغير البيئة إلى مساحة ناقل خدمة مسماة.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
من ثم، شغّل الأمر التالي go run
لتشغيل التطبيق:
go run main.go
الخطوات التالية
لمزيد من المعلومات، تحقق من الارتباطات التالية:
الملاحظات
https://aka.ms/ContentUserFeedback.
قريبًا: خلال عام 2024، سنتخلص تدريجيًا من GitHub Issues بوصفها آلية إرسال ملاحظات للمحتوى ونستبدلها بنظام ملاحظات جديد. لمزيد من المعلومات، راجعإرسال الملاحظات وعرضها المتعلقة بـ