Berichten verzenden naar en berichten ontvangen van Azure Service Bus wachtrijen (Go)
In deze zelfstudie leert u hoe u berichten kunt verzenden naar en ontvangen van Azure Service Bus wachtrijen met behulp van de programmeertaal Go.
Azure Service Bus is een volledig beheerde zakelijke berichtenbroker met berichtenwachtrijen en mogelijkheden voor publiceren/abonneren. Service Bus wordt gebruikt om toepassingen en services van elkaar te ontkoppelen, waardoor een gedistribueerd, betrouwbaar en hoogwaardig berichtentransport mogelijk is.
Met het pakket azservicebus van de Azure SDK voor Go kunt u berichten verzenden en ontvangen van Azure Service Bus en de programmeertaal Go gebruiken.
Aan het einde van deze zelfstudie kunt u het volgende doen: één bericht of batch berichten verzenden naar een wachtrij, berichten ontvangen en berichten in onbestelbare berichten die niet worden verwerkt.
Vereisten
- Een Azure-abonnement. U kunt uw voordelen als Visual Studio- of MSDN-abonnee activeren of u aanmelden voor een gratis account.
- Als u geen wachtrij hebt om te gebruiken, volgt u de stappen in het artikel De Azure-portal gebruiken om een Service Bus-wachtrij te maken om een wachtrij te maken.
- Go versie 1.18 of hoger
De voorbeeld-app maken
Maak eerst een nieuwe Go-module.
Maak een nieuwe map voor de module met de naam
service-bus-go-how-to-use-queues
.Initialiseer de module in de
azservicebus
map en installeer de vereiste pakketten.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Maak een nieuw bestand met de naam
main.go
.
Een client verifiëren en maken
Maak in het main.go
bestand een nieuwe functie met de naam GetClient
en voeg de volgende code toe:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
De GetClient
functie retourneert een nieuw azservicebus.Client
object dat is gemaakt met behulp van een Azure Service Bus naamruimte en een referentie. De naamruimte wordt geleverd door de omgevingsvariabele AZURE_SERVICEBUS_HOSTNAME
. En de referentie wordt gemaakt met behulp van de azidentity.NewDefaultAzureCredential
functie.
Voor lokale ontwikkeling heeft de DefaultAzureCredential
het toegangstoken van Azure CLI gebruikt, dat kan worden gemaakt door de az login
opdracht uit te voeren om te verifiëren bij Azure.
Tip
Als u zich wilt verifiëren met een connection string gebruikt u de functie NewClientFromConnectionString.
Berichten verzenden naar een wachtrij
Maak in het main.go
bestand een nieuwe functie met de naam SendMessage
en voeg de volgende code toe:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
heeft twee parameters: een berichttekenreeks en een azservicebus.Client
-object. Vervolgens wordt er een nieuw azservicebus.Sender
object gemaakt en wordt het bericht naar de wachtrij verzonden. Als u bulkberichten wilt verzenden, voegt u de SendMessageBatch
functie toe aan uw main.go
bestand.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
heeft twee parameters: een segment berichten en een azservicebus.Client
-object. Vervolgens wordt er een nieuw azservicebus.Sender
object gemaakt en worden de berichten naar de wachtrij verzonden.
Berichten van een wachtrij ontvangen
Nadat u berichten naar de wachtrij hebt verzonden, kunt u deze ontvangen met het azservicebus.Receiver
type. Als u berichten van een wachtrij wilt ontvangen, voegt u de GetMessage
functie toe aan uw main.go
bestand.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
neemt een azservicebus.Client
object en maakt een nieuw azservicebus.Receiver
object. Vervolgens worden de berichten van de wachtrij ontvangen. De Receiver.ReceiveMessages
functie heeft twee parameters: een context en het aantal berichten dat moet worden ontvangen. De Receiver.ReceiveMessages
functie retourneert een segment van azservicebus.ReceivedMessage
objecten.
Vervolgens doorloopt een for
lus de berichten en wordt de hoofdtekst van het bericht afgedrukt. Vervolgens wordt de CompleteMessage
functie aangeroepen om het bericht te voltooien en uit de wachtrij te verwijderen.
Berichten die de lengtelimieten overschrijden, naar een ongeldige wachtrij worden verzonden of die niet goed worden verwerkt, kunnen worden verzonden naar de wachtrij met onbestelbare berichten. Als u berichten wilt verzenden naar de wachtrij met onbestelbare berichten, voegt u de SendDeadLetterMessage
functie toe aan uw main.go
bestand.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
gebruikt een azservicebus.Client
object en een azservicebus.ReceivedMessage
object. Vervolgens wordt het bericht naar de wachtrij met onbestelbare berichten verzonden. De functie heeft twee parameters: een context en een azservicebus.DeadLetterOptions
object. De Receiver.DeadLetterMessage
functie retourneert een fout als het bericht niet naar de wachtrij met onbestelbare berichten kan worden verzonden.
Als u berichten uit de wachtrij met onbestelbare berichten wilt ontvangen, voegt u de ReceiveDeadLetterMessage
functie toe aan uw main.go
bestand.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
neemt een azservicebus.Client
object en maakt een nieuw azservicebus.Receiver
object met opties voor de wachtrij met onbestelbare berichten. Vervolgens worden de berichten van de wachtrij met onbestelbare berichten ontvangen. De functie ontvangt vervolgens één bericht uit de wachtrij met onbestelbare berichten. Vervolgens worden de reden en beschrijving van de dode letter voor dat bericht afgedrukt.
Voorbeeldcode
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
De code uitvoeren
Voordat u de code uitvoert, maakt u een omgevingsvariabele met de naam AZURE_SERVICEBUS_HOSTNAME
. Stel de waarde van de omgevingsvariabele in op de Service Bus-naamruimte.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Voer vervolgens de volgende go run
opdracht uit om de app uit te voeren:
go run main.go
Volgende stappen
Bekijk de volgende koppelingen voor meer informatie: