Megosztás a következőn keresztül:


Üzenetek küldése és fogadása Azure Service Bus üzenetsorokból (Ugrás)

Ebben az oktatóanyagban megtudhatja, hogyan küldhet üzeneteket Azure Service Bus üzenetsorokra, és hogyan fogadhat üzeneteket a Go programozási nyelv használatával.

Azure Service Bus egy teljes körűen felügyelt vállalati üzenetközvetítő üzenetsorokkal és közzétételi/feliratkozási képességekkel. A Service Bus alkalmazásokat és szolgáltatásokat különít el egymástól, elosztott, megbízható és nagy teljesítményű üzenetátvitelt biztosítva.

Az Azure SDK for Go azservicebus csomagja lehetővé teszi az üzenetek küldését és fogadását Azure Service Bus és a Go programozási nyelv használatával.

Az oktatóanyag végére képes lesz arra, hogy egyetlen üzenetet vagy üzenetköteget küldjön egy üzenetsorba, fogadjon üzeneteket, és ne dolgozza fel a kézbesítetlen üzeneteket.

Előfeltételek

Mintaalkalmazás létrehozása

Először hozzon létre egy új Go modult.

  1. Hozzon létre egy új könyvtárat a nevű service-bus-go-how-to-use-queuesmodulhoz.

  2. Inicializálja a azservicebus modult a könyvtárban, és telepítse a szükséges csomagokat.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Hozzon létre egy új fájlt main.go néven.

Ügyfél hitelesítése és létrehozása

A fájlban main.go hozzon létre egy új GetClient nevű függvényt, és adja hozzá a következő kódot:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

A GetClient függvény egy új azservicebus.Client objektumot ad vissza, amely egy Azure Service Bus névtér és egy hitelesítő adat használatával jön létre. A névteret a AZURE_SERVICEBUS_HOSTNAME környezeti változó biztosítja. A hitelesítő adatokat pedig a azidentity.NewDefaultAzureCredential függvény használatával hozza létre.

Helyi fejlesztéshez a DefaultAzureCredential használt hozzáférési jogkivonat az Azure CLI-ből, amely a parancs futtatásával hozható létre az az login Azure-ban való hitelesítéshez.

Tipp

Ha kapcsolati sztring szeretne hitelesíteni, használja a NewClientFromConnectionString függvényt.

Üzenetek küldése egy üzenetsorba

A fájlban main.go hozzon létre egy új SendMessage nevű függvényt, és adja hozzá a következő kódot:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage két paramétert vesz fel: egy üzenetsztringet és egy objektumot azservicebus.Client . Ezután létrehoz egy új azservicebus.Sender objektumot, és elküldi az üzenetet az üzenetsornak. Tömeges üzenetek küldéséhez adja hozzá a függvényt SendMessageBatch a main.go fájlhoz.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch két paramétert vesz fel: egy üzenetszeletet és egy objektumot azservicebus.Client . Ezután létrehoz egy új azservicebus.Sender objektumot, és elküldi az üzeneteket az üzenetsorba.

Üzenetek fogadása üzenetsorból

Miután üzeneteket küldött az üzenetsorba, a típussal azservicebus.Receiver fogadhatja őket. Ha üzenetsorból szeretne üzeneteket fogadni, adja hozzá a függvényt GetMessage a main.go fájlhoz.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage egy objektumot azservicebus.Client vesz fel, és létrehoz egy új azservicebus.Receiver objektumot. Ezután fogadja az üzeneteket az üzenetsorból. A Receiver.ReceiveMessages függvény két paramétert vesz igénybe: egy környezetet és a fogadandó üzenetek számát. A Receiver.ReceiveMessages függvény objektumszeletet azservicebus.ReceivedMessage ad vissza.

Ezután egy for hurok végighalad az üzeneteken, és kinyomtatja az üzenet törzsét. Ezután a CompleteMessage függvényt meghívja az üzenet befejezéséhez, és eltávolítja azt az üzenetsorból.

A hosszkorlátot túllépő üzeneteket a rendszer érvénytelen üzenetsorba küldi, vagy a feldolgozásuk sikertelen. Ha üzeneteket szeretne küldeni a kézbesítetlen levelek üzenetsorába, adja hozzá a SendDeadLetterMessage függvényt a main.go fájlhoz.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage azservicebus.Client egy objektumot és egy objektumot azservicebus.ReceivedMessage vesz fel. Ezután elküldi az üzenetet a kézbesítetlen levelek üzenetsorába. A függvény két paramétert vesz igénybe: egy környezetet és egy objektumot azservicebus.DeadLetterOptions . A Receiver.DeadLetterMessage függvény hibát ad vissza, ha az üzenetet nem sikerül elküldeni a kézbesítetlen levelek üzenetsorába.

Ha üzeneteket szeretne fogadni a kézbesítetlen levelek üzenetsorából, adja hozzá a függvényt ReceiveDeadLetterMessage a main.go fájlhoz.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage egy objektumot azservicebus.Client vesz fel, és létrehoz egy új azservicebus.Receiver objektumot a kézbesítetlen levelek üzenetsorának beállításaival. Ezután megkapja az üzeneteket a kézbesítetlen levelek üzenetsorából. A függvény ezután egy üzenetet kap a kézbesítetlen levelek üzenetsorából. Ezután kinyomtatja az üzenet holt betűjének okát és leírását.

Mintakód

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

A kód futtatása

A kód futtatása előtt hozzon létre egy nevű környezeti változót AZURE_SERVICEBUS_HOSTNAME. Állítsa a környezeti változó értékét a Service Bus névtérre.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Ezután futtassa a következő go run parancsot az alkalmazás futtatásához:

go run main.go

Következő lépések

További információért tekintse meg az alábbi hivatkozásokat: