Senden und Empfangen von Nachrichten für Azure Service Bus-Warteschlangen (Go)
In diesem Tutorial erfahren Sie, wie Sie mithilfe der Programmiersprache Go Nachrichten an Azure Service Bus-Warteschlangen senden und Nachrichten daraus empfangen.
Bei Azure Service Bus handelt es sich um einen vollständig verwalteten Nachrichtenbroker für Unternehmen mit Nachrichtenwarteschlangen und Publish/Subscribe-Funktionen. Service Bus wird verwendet, um Anwendungen und Dienste voneinander zu entkoppeln und bietet einen verteilten und zuverlässigen Hochleistungs-Nachrichtentransport.
Das Paket azservicebus des Azure SDK for Go ermöglicht es Ihnen, Nachrichten von Azure Service Bus zu senden und zu empfangen und die Go-Programmiersprache zu verwenden.
Am Ende dieses Tutorials können Sie eine einzelne Nachricht oder einen Batch von Nachrichten an eine Warteschlange senden, Nachrichten empfangen und Nachrichten, die nicht verarbeitet werden, in die Warteschlange für unzustellbare Nachrichten verschieben.
Voraussetzungen
- Ein Azure-Abonnement. Sie können Ihre Visual Studio-oder MSDN-Abonnentenvorteile aktivieren oder sich für ein kostenloses Konto anmelden.
- Wenn Sie über keine Warteschlange verfügen, führen Sie die Schritte im Artikel Schnellstart: Erstellen einer Service Bus-Warteschlange mithilfe des Azure-Portals aus, um eine Warteschlange zu erstellen.
- Go Version 1.18 oder höher
Erstellen der Beispiel-App
Erstellen Sie zunächst ein neues Go-Modul.
Erstellen Sie ein neues Verzeichnis für das Modul, mit der Bezeichnung
service-bus-go-how-to-use-queues
.Initialisieren Sie das Modul im Verzeichnis
azservicebus
, und installieren Sie die erforderlichen Pakete.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Erstellen Sie eine neue Datei mit dem Namen
main.go
.
Authentifizieren und Erstellen eines Clients
Erstellen Sie eine neue Funktion mit der Bezeichnung GetClient
in der Datei main.go
und fügen Sie den folgenden Code hinzu:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
Die Funktion GetClient
gibt ein neues azservicebus.Client
-Objekt zurück, das mithilfe eines Azure Service Bus-Namespace und Anmeldeinformationen erstellt wird. Der Namespace wird von der Umgebungsvariable AZURE_SERVICEBUS_HOSTNAME
bereitgestellt. Und die Anmeldeinformationen werden mithilfe der Funktion azidentity.NewDefaultAzureCredential
erstellt.
Für die lokale Entwicklung wurde für DefaultAzureCredential
das Zugriffstoken von Azure CLI verwendet, das durch Ausführen des az login
-Befehls zum Authentifizieren bei Azure erstellt werden kann.
Tipp
Verwenden Sie die Funktion NewClientFromConnectionString, um sich mit einer Verbindungszeichenfolge zu authentifizieren.
Senden von Nachrichten an eine Warteschlange
Erstellen Sie in der Datei main.go
eine neue Funktion mit der Bezeichnung SendMessage
und fügen Sie den folgenden Code hinzu:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
verwendet zwei Parameter: eine Nachrichtenzeichenfolge und ein azservicebus.Client
-Objekt. Anschließend wird ein neues azservicebus.Sender
-Objekt erstellt und die Nachricht an die Warteschlange gesendet. Um Massennachrichten zu senden, fügen Sie der Datei main.go
die Funktion SendMessageBatch
hinzu.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
verwendet zwei Parameter: einen Slice von Nachrichten und ein azservicebus.Client
-Objekt. Anschließend wird ein neues azservicebus.Sender
-Objekt erstellt und die Nachrichten werden an die Warteschlange gesendet.
Empfangen von Nachrichten aus einer Warteschlange
Nachdem Sie Nachrichten an die Warteschlange gesendet haben, können Sie sie mit dem Typ azservicebus.Receiver
empfangen. Um Nachrichten aus einer Warteschlange zu empfangen, fügen Sie die Funktion GetMessage
zu Ihrer main.go
-Datei hinzu.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
verwendet ein azservicebus.Client
-Objekt und erstellt ein neues azservicebus.Receiver
-Objekt. Anschließend werden die Nachrichten aus der Warteschlange empfangen. Die Funktion Receiver.ReceiveMessages
verwendet zwei Parameter: einen Kontext und die Anzahl der zu empfangenden Nachrichten. Die Funktion Receiver.ReceiveMessages
gibt einen Slice von azservicebus.ReceivedMessage
-Objekten zurück.
Als Nächstes wird eine for
-Schleife durch die Nachrichten durchlaufen und der Nachrichtentext wird gedruckt. Anschließend wird die Funktion CompleteMessage
aufgerufen, um die Nachricht fertigzustellen. Dadurch wird sie aus der Warteschlange entfernt.
Nachrichten, die Längenbeschränkungen überschreiten, werden an eine Warteschlange für ungültige Nachrichten gesendet. Nachrichten, die nicht erfolgreich verarbeitet werden, können an die Warteschlange für unzustellbare Nachrichten gesendet werden. Um Nachrichten an die Warteschlange für unzustellbare Nachrichten zu senden, fügen Sie die Funktion SendDeadLetterMessage
zu Ihrer main.go
-Datei hinzu.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
verwendet ein azservicebus.Client
-Objekt und ein azservicebus.ReceivedMessage
-Objekt. Anschließend wird die Nachricht an die Warteschlange für unzustellbare Nachrichten gesendet. Die Funktion verwendet zwei Parameter: einen Kontext und ein azservicebus.DeadLetterOptions
-Objekt. Die Funktion Receiver.DeadLetterMessage
gibt einen Fehler zurück, wenn die Nachricht nicht an die Warteschlange für unzustellbare Nachrichten gesendet werden kann.
Um Nachrichten aus der Warteschlange für unzustellbare Nachrichten zu empfangen, fügen Sie Ihrer main.go
-Datei die Funktion ReceiveDeadLetterMessage
hinzu.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
verwendet ein azservicebus.Client
-Objekt und erstellt ein neues azservicebus.Receiver
-Objekt mit Optionen für die Warteschlange für unzustellbare Nachrichten. Anschließend werden die Nachrichten aus der Warteschlange für unzustellbare Nachrichten empfangen. Die Funktion empfängt dann eine Nachricht aus der Warteschlange für unzustellbare Nachrichten. Anschließend werden der Grund und die Beschreibung für die Unzustellbarkeit dieser Nachricht gedruckt.
Beispielcode
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Ausführen des Codes
Bevor Sie den Code ausführen, erstellen Sie eine Umgebungsvariable namens AZURE_SERVICEBUS_HOSTNAME
. Legen Sie den Wert der Umgebungsvariablen auf den Service Bus-Namespace fest.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Führen Sie als Nächstes den folgenden go run
-Befehl aus, um die App auszuführen:
go run main.go
Nächste Schritte
Weitere Informationen finden Sie über die folgenden Links: