Freigeben über


Senden und Empfangen von Nachrichten für Azure Service Bus-Warteschlangen (Go)

In diesem Tutorial erfahren Sie, wie Sie mithilfe der Programmiersprache Go Nachrichten an Azure Service Bus-Warteschlangen senden und Nachrichten daraus empfangen.

Bei Azure Service Bus handelt es sich um einen vollständig verwalteten Nachrichtenbroker für Unternehmen mit Nachrichtenwarteschlangen und Publish/Subscribe-Funktionen. Service Bus wird verwendet, um Anwendungen und Dienste voneinander zu entkoppeln und bietet einen verteilten und zuverlässigen Hochleistungs-Nachrichtentransport.

Das Paket azservicebus des Azure SDK for Go ermöglicht es Ihnen, Nachrichten von Azure Service Bus zu senden und zu empfangen und die Go-Programmiersprache zu verwenden.

Am Ende dieses Tutorials können Sie eine einzelne Nachricht oder einen Batch von Nachrichten an eine Warteschlange senden, Nachrichten empfangen und Nachrichten, die nicht verarbeitet werden, in die Warteschlange für unzustellbare Nachrichten verschieben.

Voraussetzungen

Erstellen der Beispiel-App

Erstellen Sie zunächst ein neues Go-Modul.

  1. Erstellen Sie ein neues Verzeichnis für das Modul, mit der Bezeichnung service-bus-go-how-to-use-queues.

  2. Initialisieren Sie das Modul im Verzeichnis azservicebus, und installieren Sie die erforderlichen Pakete.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Erstellen Sie eine neue Datei mit dem Namen main.go.

Authentifizieren und Erstellen eines Clients

Erstellen Sie eine neue Funktion mit der Bezeichnung GetClient in der Datei main.go und fügen Sie den folgenden Code hinzu:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

Die Funktion GetClient gibt ein neues azservicebus.Client-Objekt zurück, das mithilfe eines Azure Service Bus-Namespace und Anmeldeinformationen erstellt wird. Der Namespace wird von der Umgebungsvariable AZURE_SERVICEBUS_HOSTNAME bereitgestellt. Und die Anmeldeinformationen werden mithilfe der Funktion azidentity.NewDefaultAzureCredential erstellt.

Für die lokale Entwicklung wurde für DefaultAzureCredential das Zugriffstoken von Azure CLI verwendet, das durch Ausführen des az login-Befehls zum Authentifizieren bei Azure erstellt werden kann.

Tipp

Verwenden Sie die Funktion NewClientFromConnectionString, um sich mit einer Verbindungszeichenfolge zu authentifizieren.

Senden von Nachrichten an eine Warteschlange

Erstellen Sie in der Datei main.go eine neue Funktion mit der Bezeichnung SendMessage und fügen Sie den folgenden Code hinzu:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage verwendet zwei Parameter: eine Nachrichtenzeichenfolge und ein azservicebus.Client-Objekt. Anschließend wird ein neues azservicebus.Sender-Objekt erstellt und die Nachricht an die Warteschlange gesendet. Um Massennachrichten zu senden, fügen Sie der Datei main.go die Funktion SendMessageBatch hinzu.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch verwendet zwei Parameter: einen Slice von Nachrichten und ein azservicebus.Client-Objekt. Anschließend wird ein neues azservicebus.Sender-Objekt erstellt und die Nachrichten werden an die Warteschlange gesendet.

Empfangen von Nachrichten aus einer Warteschlange

Nachdem Sie Nachrichten an die Warteschlange gesendet haben, können Sie sie mit dem Typ azservicebus.Receiver empfangen. Um Nachrichten aus einer Warteschlange zu empfangen, fügen Sie die Funktion GetMessage zu Ihrer main.go-Datei hinzu.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage verwendet ein azservicebus.Client-Objekt und erstellt ein neues azservicebus.Receiver-Objekt. Anschließend werden die Nachrichten aus der Warteschlange empfangen. Die Funktion Receiver.ReceiveMessages verwendet zwei Parameter: einen Kontext und die Anzahl der zu empfangenden Nachrichten. Die Funktion Receiver.ReceiveMessages gibt einen Slice von azservicebus.ReceivedMessage-Objekten zurück.

Als Nächstes wird eine for-Schleife durch die Nachrichten durchlaufen und der Nachrichtentext wird gedruckt. Anschließend wird die Funktion CompleteMessage aufgerufen, um die Nachricht fertigzustellen. Dadurch wird sie aus der Warteschlange entfernt.

Nachrichten, die Längenbeschränkungen überschreiten, werden an eine Warteschlange für ungültige Nachrichten gesendet. Nachrichten, die nicht erfolgreich verarbeitet werden, können an die Warteschlange für unzustellbare Nachrichten gesendet werden. Um Nachrichten an die Warteschlange für unzustellbare Nachrichten zu senden, fügen Sie die Funktion SendDeadLetterMessage zu Ihrer main.go-Datei hinzu.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage verwendet ein azservicebus.Client-Objekt und ein azservicebus.ReceivedMessage-Objekt. Anschließend wird die Nachricht an die Warteschlange für unzustellbare Nachrichten gesendet. Die Funktion verwendet zwei Parameter: einen Kontext und ein azservicebus.DeadLetterOptions-Objekt. Die Funktion Receiver.DeadLetterMessage gibt einen Fehler zurück, wenn die Nachricht nicht an die Warteschlange für unzustellbare Nachrichten gesendet werden kann.

Um Nachrichten aus der Warteschlange für unzustellbare Nachrichten zu empfangen, fügen Sie Ihrer main.go-Datei die Funktion ReceiveDeadLetterMessage hinzu.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage verwendet ein azservicebus.Client-Objekt und erstellt ein neues azservicebus.Receiver-Objekt mit Optionen für die Warteschlange für unzustellbare Nachrichten. Anschließend werden die Nachrichten aus der Warteschlange für unzustellbare Nachrichten empfangen. Die Funktion empfängt dann eine Nachricht aus der Warteschlange für unzustellbare Nachrichten. Anschließend werden der Grund und die Beschreibung für die Unzustellbarkeit dieser Nachricht gedruckt.

Beispielcode

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Ausführen des Codes

Bevor Sie den Code ausführen, erstellen Sie eine Umgebungsvariable namens AZURE_SERVICEBUS_HOSTNAME. Legen Sie den Wert der Umgebungsvariablen auf den Service Bus-Namespace fest.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Führen Sie als Nächstes den folgenden go run-Befehl aus, um die App auszuführen:

go run main.go

Nächste Schritte

Weitere Informationen finden Sie über die folgenden Links: