Üzenetek küldése és fogadása Azure Service Bus üzenetsorokból (Ugrás)
Ebben az oktatóanyagban megtudhatja, hogyan küldhet üzeneteket Azure Service Bus üzenetsorokra, és hogyan fogadhat üzeneteket a Go programozási nyelv használatával.
Azure Service Bus egy teljes körűen felügyelt vállalati üzenetközvetítő üzenetsorokkal és közzétételi/feliratkozási képességekkel. A Service Bus alkalmazásokat és szolgáltatásokat különít el egymástól, elosztott, megbízható és nagy teljesítményű üzenetátvitelt biztosítva.
Az Azure SDK for Go azservicebus csomagja lehetővé teszi az üzenetek küldését és fogadását Azure Service Bus és a Go programozási nyelv használatával.
Az oktatóanyag végére képes lesz arra, hogy egyetlen üzenetet vagy üzenetköteget küldjön egy üzenetsorba, fogadjon üzeneteket, és ne dolgozza fel a kézbesítetlen üzeneteket.
Előfeltételek
- Azure-előfizetés. Aktiválhatja Visual Studio- vagy MSDN-előfizetői előnyeit , vagy regisztrálhat egy ingyenes fiókra.
- Ha nem rendelkezik üzenetsorsal, kövesse a Service Bus-üzenetsor létrehozása Azure Portal című cikk lépéseit az üzenetsor létrehozásához.
- 1.18-os vagy újabb verzió
Mintaalkalmazás létrehozása
Először hozzon létre egy új Go modult.
Hozzon létre egy új könyvtárat a nevű
service-bus-go-how-to-use-queues
modulhoz.Inicializálja a
azservicebus
modult a könyvtárban, és telepítse a szükséges csomagokat.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Hozzon létre egy új fájlt
main.go
néven.
Ügyfél hitelesítése és létrehozása
A fájlban main.go
hozzon létre egy új GetClient
nevű függvényt, és adja hozzá a következő kódot:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
A GetClient
függvény egy új azservicebus.Client
objektumot ad vissza, amely egy Azure Service Bus névtér és egy hitelesítő adat használatával jön létre. A névteret a AZURE_SERVICEBUS_HOSTNAME
környezeti változó biztosítja. A hitelesítő adatokat pedig a azidentity.NewDefaultAzureCredential
függvény használatával hozza létre.
Helyi fejlesztéshez a DefaultAzureCredential
használt hozzáférési jogkivonat az Azure CLI-ből, amely a parancs futtatásával hozható létre az az login
Azure-ban való hitelesítéshez.
Tipp
Ha kapcsolati sztring szeretne hitelesíteni, használja a NewClientFromConnectionString függvényt.
Üzenetek küldése egy üzenetsorba
A fájlban main.go
hozzon létre egy új SendMessage
nevű függvényt, és adja hozzá a következő kódot:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
két paramétert vesz fel: egy üzenetsztringet és egy objektumot azservicebus.Client
. Ezután létrehoz egy új azservicebus.Sender
objektumot, és elküldi az üzenetet az üzenetsornak. Tömeges üzenetek küldéséhez adja hozzá a függvényt SendMessageBatch
a main.go
fájlhoz.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
két paramétert vesz fel: egy üzenetszeletet és egy objektumot azservicebus.Client
. Ezután létrehoz egy új azservicebus.Sender
objektumot, és elküldi az üzeneteket az üzenetsorba.
Üzenetek fogadása üzenetsorból
Miután üzeneteket küldött az üzenetsorba, a típussal azservicebus.Receiver
fogadhatja őket. Ha üzenetsorból szeretne üzeneteket fogadni, adja hozzá a függvényt GetMessage
a main.go
fájlhoz.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
egy objektumot azservicebus.Client
vesz fel, és létrehoz egy új azservicebus.Receiver
objektumot. Ezután fogadja az üzeneteket az üzenetsorból. A Receiver.ReceiveMessages
függvény két paramétert vesz igénybe: egy környezetet és a fogadandó üzenetek számát. A Receiver.ReceiveMessages
függvény objektumszeletet azservicebus.ReceivedMessage
ad vissza.
Ezután egy for
hurok végighalad az üzeneteken, és kinyomtatja az üzenet törzsét. Ezután a CompleteMessage
függvényt meghívja az üzenet befejezéséhez, és eltávolítja azt az üzenetsorból.
A hosszkorlátot túllépő üzeneteket a rendszer érvénytelen üzenetsorba küldi, vagy a feldolgozásuk sikertelen. Ha üzeneteket szeretne küldeni a kézbesítetlen levelek üzenetsorába, adja hozzá a SendDeadLetterMessage
függvényt a main.go
fájlhoz.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
azservicebus.Client
egy objektumot és egy objektumot azservicebus.ReceivedMessage
vesz fel. Ezután elküldi az üzenetet a kézbesítetlen levelek üzenetsorába. A függvény két paramétert vesz igénybe: egy környezetet és egy objektumot azservicebus.DeadLetterOptions
. A Receiver.DeadLetterMessage
függvény hibát ad vissza, ha az üzenetet nem sikerül elküldeni a kézbesítetlen levelek üzenetsorába.
Ha üzeneteket szeretne fogadni a kézbesítetlen levelek üzenetsorából, adja hozzá a függvényt ReceiveDeadLetterMessage
a main.go
fájlhoz.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
egy objektumot azservicebus.Client
vesz fel, és létrehoz egy új azservicebus.Receiver
objektumot a kézbesítetlen levelek üzenetsorának beállításaival. Ezután megkapja az üzeneteket a kézbesítetlen levelek üzenetsorából. A függvény ezután egy üzenetet kap a kézbesítetlen levelek üzenetsorából. Ezután kinyomtatja az üzenet holt betűjének okát és leírását.
Mintakód
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
A kód futtatása
A kód futtatása előtt hozzon létre egy nevű környezeti változót AZURE_SERVICEBUS_HOSTNAME
. Állítsa a környezeti változó értékét a Service Bus névtérre.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Ezután futtassa a következő go run
parancsot az alkalmazás futtatásához:
go run main.go
Következő lépések
További információért tekintse meg az alábbi hivatkozásokat: