Odesílání zpráv do front Azure Service Bus a příjem zpráv z front (Go)

V tomto kurzu se dozvíte, jak odesílat zprávy do front Azure Service Bus a přijímat je z front pomocí programovacího jazyka Go.

Azure Service Bus je plně spravovaný podnikový zprostředkovatel zpráv s frontami zpráv a možnostmi publikování a odběru. Service Bus se používá k oddělení aplikací a služeb od sebe a poskytuje distribuovaný, spolehlivý a vysoce výkonný přenos zpráv.

Balíček azservicebus sady Azure SDK pro Go umožňuje odesílat a přijímat zprávy z Azure Service Bus a pomocí programovacího jazyka Go.

Na konci tohoto kurzu budete umět: odeslat jednu zprávu nebo dávku zpráv do fronty, přijímat zprávy a nedoručované zprávy, které se nezpracují.

Požadavky

Vytvoření ukázkové aplikace

Začněte vytvořením nového modulu Go.

  1. Vytvořte nový adresář pro modul s názvem service-bus-go-how-to-use-queues.

  2. V adresáři azservicebus inicializujte modul a nainstalujte požadované balíčky.

    go mod init service-bus-go-how-to-use-queues
    
    go get github.com/Azure/azure-sdk-for-go/sdk/azidentity
    
    go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
    
  3. Vytvořte nový soubor s názvem main.go.

Ověření a vytvoření klienta

main.go V souboru vytvořte novou funkci s názvem GetClient a přidejte následující kód:

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

Funkce GetClient vrátí nový azservicebus.Client objekt vytvořený pomocí oboru názvů Azure Service Bus a přihlašovacích údajů. Obor názvů je poskytován proměnnou AZURE_SERVICEBUS_HOSTNAME prostředí. Přihlašovací údaje se vytvoří pomocí azidentity.NewDefaultAzureCredential funkce .

Pro místní vývoj DefaultAzureCredential použil přístupový token z Azure CLI, který je možné vytvořit spuštěním az login příkazu pro ověření v Azure.

Tip

K ověření pomocí připojovacího řetězce použijte funkci NewClientFromConnectionString .

Zasílání zpráv do fronty

main.go V souboru vytvořte novou funkci s názvem SendMessage a přidejte následující kód:

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

SendMessage má dva parametry: řetězec zprávy a azservicebus.Client objekt. Potom vytvoří nový azservicebus.Sender objekt a odešle zprávu do fronty. Pokud chcete odesílat hromadné zprávy, přidejte SendMessageBatch do main.go souboru funkci .

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())
	
	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
			panic(err)
		}
	}
	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

SendMessageBatch má dva parametry: řez zpráv a azservicebus.Client objekt. Potom vytvoří nový azservicebus.Sender objekt a odešle zprávy do fronty.

Příjem zpráv z fronty

Po odeslání zpráv do fronty je můžete přijímat pomocí azservicebus.Receiver typu . Pokud chcete přijímat zprávy z fronty, přidejte GetMessage do main.go souboru funkci .

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetMessageazservicebus.Client vezme objekt a vytvoří nový azservicebus.Receiver objekt. Pak přijme zprávy z fronty. Funkce Receiver.ReceiveMessages přijímá dva parametry: kontext a počet zpráv, které se mají přijmout. Funkce Receiver.ReceiveMessages vrátí řez azservicebus.ReceivedMessage objektů.

Dále smyčka for prochází zprávy a vytiskne text zprávy. Potom se CompleteMessage zavolá funkce, která zprávu dokončí a odebere ji z fronty.

Zprávy, které překračují limity délky, jsou odeslány do neplatné fronty nebo nejsou úspěšně zpracovány, mohou být odeslány do fronty nedoručených zpráv. Pokud chcete odesílat zprávy do fronty nedoručených zpráv, přidejte SendDeadLetterMessage do main.go souboru funkci .

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

DeadLetterMessageazservicebus.Client přebírá objekt a azservicebus.ReceivedMessage objekt. Potom zprávu odešle do fronty nedoručených zpráv. Funkce přebírá dva parametry: kontext a azservicebus.DeadLetterOptions objekt. Funkce Receiver.DeadLetterMessage vrátí chybu, pokud se zpráva nepodaří odeslat do fronty nedoručených zpráv.

Pokud chcete přijímat zprávy z fronty nedoručených zpráv, přidejte ReceiveDeadLetterMessage do main.go souboru funkci .

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

GetDeadLetterMessageazservicebus.Client vezme objekt a vytvoří nový azservicebus.Receiver objekt s možnostmi pro frontu nedoručených zpráv. Pak obdrží zprávy z fronty nedoručených zpráv. Funkce pak přijme jednu zprávu z fronty nedoručených zpráv. Pak vypíše důvod nedoručených zpráv a popis této zprávy.

Ukázka kódu

package main

import (
	"context"
	"errors"
	"fmt"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func GetClient() *azservicebus.Client {
	namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
	if !ok {
		panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
	}

	cred, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}

	client, err := azservicebus.NewClient(namespace, cred, nil)
	if err != nil {
		panic(err)
	}
	return client
}

func SendMessage(message string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	sbMessage := &azservicebus.Message{
		Body: []byte(message),
	}
	err = sender.SendMessage(context.TODO(), sbMessage, nil)
	if err != nil {
		panic(err)
	}
}

func SendMessageBatch(messages []string, client *azservicebus.Client) {
	sender, err := client.NewSender("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer sender.Close(context.TODO())

	batch, err := sender.NewMessageBatch(context.TODO(), nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
		if errors.Is(err, azservicebus.ErrMessageTooLarge) {
			fmt.Printf("Message batch is full. We should send it and create a new one.\n")
		}
	}

	if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
		panic(err)
	}
}

func GetMessage(count int, client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue("myqueue", nil) 
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		body := message.Body
		fmt.Printf("%s\n", string(body))

		err = receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func DeadLetterMessage(client *azservicebus.Client) {
	deadLetterOptions := &azservicebus.DeadLetterOptions{
		ErrorDescription: to.Ptr("exampleErrorDescription"),
		Reason:           to.Ptr("exampleReason"),
	}

	receiver, err := client.NewReceiverForQueue("myqueue", nil)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	if len(messages) == 1 {
		err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
		if err != nil {
			panic(err)
		}
	}
}

func GetDeadLetterMessage(client *azservicebus.Client) {
	receiver, err := client.NewReceiverForQueue(
		"myqueue",
		&azservicebus.ReceiverOptions{
			SubQueue: azservicebus.SubQueueDeadLetter,
		},
	)
	if err != nil {
		panic(err)
	}
	defer receiver.Close(context.TODO())

	messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
	if err != nil {
		panic(err)
	}

	for _, message := range messages {
		fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) 
		err := receiver.CompleteMessage(context.TODO(), message, nil)
		if err != nil {
			panic(err)
		}
	}
}

func main() {
	client := GetClient()

	fmt.Println("send a single message...")
	SendMessage("firstMessage", client)

	fmt.Println("send two messages as a batch...")
	messages := [2]string{"secondMessage", "thirdMessage"}
	SendMessageBatch(messages[:], client)

	fmt.Println("\nget all three messages:")
	GetMessage(3, client)

	fmt.Println("\nsend a message to the Dead Letter Queue:")
	SendMessage("Send message to Dead Letter", client)
	DeadLetterMessage(client)
	GetDeadLetterMessage(client)
}

Spuštění kódu

Před spuštěním kódu vytvořte proměnnou prostředí s názvem AZURE_SERVICEBUS_HOSTNAME. Nastavte hodnotu proměnné prostředí na obor názvů služby Service Bus.

export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>

Dále spusťte aplikaci spuštěním následujícího go run příkazu:

go run main.go

Další kroky

Další informace najdete na následujících odkazech: