Dela via


Skicka meddelanden till och ta emot meddelanden från Azure Service Bus köer (Go)

I den här självstudien får du lära dig hur du skickar meddelanden till och tar emot meddelanden från Azure Service Bus köer med programmeringsspråket Go.

Azure Service Bus är en fullständigt hanterad meddelandekö för företag med meddelandeköer och funktioner för publicering/prenumeration. Service Bus används för att frikoppla program och tjänster från varandra, vilket ger en distribuerad, tillförlitlig och högpresterande meddelandetransport.

Med Azure SDK för Go:s azservicebus-paket kan du skicka och ta emot meddelanden från Azure Service Bus och använda programmeringsspråket Go.

I slutet av den här självstudien kan du: skicka ett enda meddelande eller en batch med meddelanden till en kö, ta emot meddelanden och meddelanden med obeställbara meddelanden som inte bearbetas.

Förutsättningar

Skapa exempelappen

Börja med att skapa en ny Go-modul.

  1. Skapa en ny katalog för modulen med namnet service-bus-go-how-to-use-queues.

  2. Initiera modulen azservicebus i katalogen och installera de paket som krävs.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Skapa en ny fil med namnet main.go.

Autentisera och skapa en klient

main.go I filen skapar du en ny funktion med namnet GetClient och lägger till följande kod:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

Funktionen GetClient returnerar ett nytt azservicebus.Client objekt som skapas med hjälp av ett Azure Service Bus namnområde och en autentiseringsuppgift. Namnområdet tillhandahålls av AZURE_SERVICEBUS_HOSTNAME miljövariabeln. Och autentiseringsuppgifterna skapas med hjälp azidentity.NewDefaultAzureCredential av funktionen .

För lokal utveckling DefaultAzureCredential används åtkomsttoken från Azure CLI, som kan skapas genom att köra kommandot för att autentisera az login till Azure.

Tips

Om du vill autentisera med en anslutningssträng använder du funktionen NewClientFromConnectionString .

Skicka meddelanden till en kö

main.go I filen skapar du en ny funktion med namnet SendMessage och lägger till följande kod:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage tar två parametrar: en meddelandesträng och ett azservicebus.Client objekt. Sedan skapas ett nytt azservicebus.Sender objekt och meddelandet skickas till kön. Om du vill skicka massmeddelanden lägger du till funktionen main.go i SendMessageBatch filen.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch tar två parametrar: en del av meddelanden och ett azservicebus.Client objekt. Sedan skapas ett nytt azservicebus.Sender objekt och meddelanden skickas till kön.

Ta emot meddelanden från en kö

När du har skickat meddelanden till kön kan du ta emot dem med typen azservicebus.Receiver . Om du vill ta emot meddelanden från en kö lägger du till funktionen main.go i GetMessage filen.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessage tar ett azservicebus.Client objekt och skapar ett nytt azservicebus.Receiver objekt. Den tar sedan emot meddelandena från kön. Funktionen Receiver.ReceiveMessages tar två parametrar: en kontext och antalet meddelanden som ska ta emot. Funktionen Receiver.ReceiveMessages returnerar en sektor med azservicebus.ReceivedMessage objekt.

Därefter itererar en for loop genom meddelandena och skriver ut meddelandetexten. Sedan anropas CompleteMessage funktionen för att slutföra meddelandet och ta bort det från kön.

Meddelanden som överskrider längdgränserna, skickas till en ogiltig kö eller inte bearbetas kan skickas till kön med obeställbara meddelanden. Om du vill skicka meddelanden till kön med obeställbara meddelanden lägger du till funktionen main.go i SendDeadLetterMessage filen.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessage tar ett azservicebus.Client objekt och ett azservicebus.ReceivedMessage objekt. Meddelandet skickas sedan till kön med obeställbara meddelanden. Funktionen tar två parametrar: en kontext och ett azservicebus.DeadLetterOptions objekt. Funktionen Receiver.DeadLetterMessage returnerar ett fel om meddelandet inte kan skickas till kön med obeställbara meddelanden.

Om du vill ta emot meddelanden från kön med obeställbara meddelanden lägger du till funktionen main.go i ReceiveDeadLetterMessage filen.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessage tar ett azservicebus.Client objekt och skapar ett nytt azservicebus.Receiver objekt med alternativ för kön med obeställbara meddelanden. Den tar sedan emot meddelandena från kön med obeställbara meddelanden. Funktionen tar sedan emot ett meddelande från kön med obeställbara meddelanden. Sedan skriver den ut orsaken till och beskrivningen av det meddelandet.

Exempelkod

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Kör koden

Innan du kör koden skapar du en miljövariabel med namnet AZURE_SERVICEBUS_HOSTNAME. Ange miljövariabelns värde till Service Bus-namnområdet.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Kör sedan följande go run kommando för att köra appen:

go run main.go

Nästa steg

Mer information finns på följande länkar: