Wysyłanie komunikatów do i odbieranie komunikatów z kolejek Azure Service Bus (Go)

Z tego samouczka dowiesz się, jak wysyłać komunikaty do kolejek Azure Service Bus i odbierać je przy użyciu języka programowania Go.

Azure Service Bus jest w pełni zarządzanym brokerem komunikatów przedsiębiorstwa z kolejkami komunikatów i możliwościami publikowania/subskrybowania. Usługa Service Bus służy do oddzielenia od siebie aplikacji i usług, zapewniając rozproszony, niezawodny i wydajny transport komunikatów.

Pakiet azservicebus zestawu Azure SDK dla języka Go umożliwia wysyłanie i odbieranie komunikatów z Azure Service Bus oraz używanie języka programowania Go.

Po ukończeniu tego samouczka będziesz mieć możliwość wysyłania pojedynczego komunikatu lub partii komunikatów do kolejki, odbierania komunikatów i komunikatów utraconych, które nie są przetwarzane.

Wymagania wstępne

Tworzenie przykładowej aplikacji

Aby rozpocząć, utwórz nowy moduł Języka Go.

  1. Utwórz nowy katalog dla modułu o nazwie service-bus-go-how-to-use-queues.

  2. W katalogu zainicjuj azservicebus moduł i zainstaluj wymagane pakiety.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Utwórz nowy plik o nazwie main.go.

Uwierzytelnianie i tworzenie klienta

main.go W pliku utwórz nową funkcję o nazwie GetClient i dodaj następujący kod:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

Funkcja GetClient zwraca nowy azservicebus.Client obiekt utworzony przy użyciu Azure Service Bus przestrzeni nazw i poświadczeń. Przestrzeń nazw jest dostarczana przez zmienną środowiskową AZURE_SERVICEBUS_HOSTNAME . A poświadczenie jest tworzone przy użyciu azidentity.NewDefaultAzureCredential funkcji .

W przypadku programowania lokalnego DefaultAzureCredential używany token dostępu z poziomu interfejsu wiersza polecenia platformy Azure, który można utworzyć, uruchamiając az login polecenie w celu uwierzytelnienia na platformie Azure.

Porada

Aby uwierzytelnić się za pomocą parametrów połączenia, użyj funkcji NewClientFromConnectionString .

Wysyłanie komunikatów do kolejki

main.go W pliku utwórz nową funkcję o nazwie SendMessage i dodaj następujący kod:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage przyjmuje dwa parametry: ciąg komunikatu azservicebus.Client i obiekt. Następnie tworzy nowy azservicebus.Sender obiekt i wysyła komunikat do kolejki. Aby wysyłać komunikaty zbiorcze, dodaj SendMessageBatch funkcję do main.go pliku.

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch przyjmuje dwa parametry: fragment komunikatów i azservicebus.Client obiekt. Następnie tworzy nowy azservicebus.Sender obiekt i wysyła komunikaty do kolejki.

Odbieranie komunikatów z kolejki

Po wysłaniu komunikatów do kolejki można je odbierać przy użyciu azservicebus.Receiver typu . Aby odbierać komunikaty z kolejki, dodaj GetMessage funkcję do main.go pliku.

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessageazservicebus.Client pobiera obiekt i tworzy nowy azservicebus.Receiver obiekt. Następnie odbiera komunikaty z kolejki. Funkcja Receiver.ReceiveMessages przyjmuje dwa parametry: kontekst i liczbę komunikatów do odebrania. Funkcja Receiver.ReceiveMessages zwraca fragment azservicebus.ReceivedMessage obiektów.

Następnie pętla for iteruje komunikaty i drukuje treść komunikatu. Następnie funkcja jest wywoływana CompleteMessage w celu ukończenia komunikatu, usuwając ją z kolejki.

Komunikaty, które przekraczają limity długości, są wysyłane do nieprawidłowej kolejki lub nie są pomyślnie przetwarzane, można wysłać do kolejki utraconych wiadomości. Aby wysłać komunikaty do kolejki utraconych wiadomości, dodaj SendDeadLetterMessage funkcję do main.go pliku.

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessageazservicebus.Client przyjmuje obiekt i azservicebus.ReceivedMessage obiekt. Następnie wysyła wiadomość do kolejki utraconych wiadomości. Funkcja przyjmuje dwa parametry: kontekst i azservicebus.DeadLetterOptions obiekt. Funkcja Receiver.DeadLetterMessage zwraca błąd, jeśli komunikat nie zostanie wysłany do kolejki utraconych wiadomości.

Aby odbierać komunikaty z kolejki utraconych wiadomości, dodaj ReceiveDeadLetterMessage funkcję do main.go pliku.

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessageazservicebus.Client pobiera obiekt i tworzy nowy azservicebus.Receiver obiekt z opcjami kolejki utraconych wiadomości. Następnie odbiera komunikaty z kolejki utraconych wiadomości. Następnie funkcja odbiera jeden komunikat z kolejki utraconych wiadomości. Następnie drukuje przyczynę martwych listów i opis dla tej wiadomości.

Przykładowy kod

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Uruchamianie kodu

Przed uruchomieniem kodu utwórz zmienną środowiskową o nazwie AZURE_SERVICEBUS_HOSTNAME. Ustaw wartość zmiennej środowiskowej na przestrzeń nazw usługi Service Bus.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Następnie uruchom następujące go run polecenie, aby uruchomić aplikację:

go run main.go

Następne kroki

Aby uzyskać więcej informacji, zapoznaj się z następującymi linkami: