Inviare e ricevere messaggi da code di bus di servizio di Azure (Go)
In questa esercitazione si apprenderà come inviare e ricevere messaggi da bus di servizio di Azure code usando il linguaggio di programmazione Go.
bus di servizio di Azure è un broker di messaggi aziendale completamente gestito con code di messaggi e funzionalità di pubblicazione/sottoscrizione. Il bus di servizio viene usato per separare applicazioni e servizi l'uno dall'altro, fornendo un trasporto di messaggi distribuito, affidabile e ad alte prestazioni.
Il pacchetto azservicebus di Azure SDK per Go consente di inviare e ricevere messaggi da bus di servizio di Azure e di usare il linguaggio di programmazione Go.
Al termine di questa esercitazione, sarà possibile inviare un singolo messaggio o batch di messaggi a una coda, ricevere messaggi e messaggi non recapitabili non elaborati.
Prerequisiti
- Una sottoscrizione di Azure. È possibile attivare i vantaggi della sottoscrizione Visual Studio o MSDN oppure registrarsi per ottenere un account gratuito.
- Se non si ha una coda da usare, seguire la procedura descritta nell'articolo Usare il portale di Azure per creare una coda del bus di servizio per crearne una.
- Go versione 1.18 o successiva
Creare l'app di esempio
Per iniziare, creare un nuovo modulo Go.
Creare una nuova directory per il modulo denominato
service-bus-go-how-to-use-queues
.azservicebus
Nella directory inizializzare il modulo e installare i pacchetti necessari.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Creare un file denominato
main.go
.
Autenticare e creare un client
main.go
Nel file creare una nuova funzione denominata GetClient
e aggiungere il codice seguente:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
La GetClient
funzione restituisce un nuovo azservicebus.Client
oggetto creato usando uno spazio dei nomi bus di servizio di Azure e una credenziale. Lo spazio dei nomi viene fornito dalla AZURE_SERVICEBUS_HOSTNAME
variabile di ambiente. E le credenziali vengono create usando la azidentity.NewDefaultAzureCredential
funzione .
Per lo sviluppo locale, è DefaultAzureCredential
stato usato il token di accesso dall'interfaccia della riga di comando di Azure, che può essere creato eseguendo il az login
comando per l'autenticazione in Azure.
Suggerimento
Per eseguire l'autenticazione con una stringa di connessione, usare la funzione NewClientFromConnectionString .
Inviare messaggi a una coda
main.go
Nel file creare una nuova funzione denominata SendMessage
e aggiungere il codice seguente:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
accetta due parametri: una stringa di messaggio e un azservicebus.Client
oggetto . Crea quindi un nuovo azservicebus.Sender
oggetto e invia il messaggio alla coda. Per inviare messaggi in blocco, aggiungere la SendMessageBatch
funzione al main.go
file.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
accetta due parametri: una sezione di messaggi e un azservicebus.Client
oggetto . Crea quindi un nuovo azservicebus.Sender
oggetto e invia i messaggi alla coda.
Ricevere messaggi da una coda
Dopo aver inviato messaggi alla coda, è possibile riceverli con il azservicebus.Receiver
tipo . Per ricevere messaggi da una coda, aggiungere la GetMessage
funzione al main.go
file.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
accetta un azservicebus.Client
oggetto e crea un nuovo azservicebus.Receiver
oggetto. Riceve quindi i messaggi dalla coda. La Receiver.ReceiveMessages
funzione accetta due parametri: un contesto e il numero di messaggi da ricevere. La Receiver.ReceiveMessages
funzione restituisce una sezione di azservicebus.ReceivedMessage
oggetti.
Successivamente, un for
ciclo scorre i messaggi e stampa il corpo del messaggio. Viene quindi chiamata la CompleteMessage
funzione per completare il messaggio, rimuovendolo dalla coda.
I messaggi che superano i limiti di lunghezza, vengono inviati a una coda non valida o non vengono elaborati correttamente possono essere inviati alla coda dei messaggi non recapitabili. Per inviare messaggi alla coda dei messaggi non recapitabili, aggiungere la SendDeadLetterMessage
funzione al main.go
file.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
accetta un azservicebus.Client
oggetto e un azservicebus.ReceivedMessage
oggetto . Invia quindi il messaggio alla coda dei messaggi non recapitabili. La funzione accetta due parametri: un contesto e un azservicebus.DeadLetterOptions
oggetto . La Receiver.DeadLetterMessage
funzione restituisce un errore se il messaggio non riesce a essere inviato alla coda dei messaggi non recapitabili.
Per ricevere messaggi dalla coda dei messaggi non recapitabili, aggiungere la ReceiveDeadLetterMessage
funzione al main.go
file.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
accetta un azservicebus.Client
oggetto e crea un nuovo azservicebus.Receiver
oggetto con opzioni per la coda dei messaggi non recapitabili. Riceve quindi i messaggi dalla coda dei messaggi non recapitabili. La funzione riceve quindi un messaggio dalla coda dei messaggi non recapitabili. Stampa quindi il motivo e la descrizione dei messaggi non recapitabili per tale messaggio.
Codice di esempio
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Eseguire il codice
Prima di eseguire il codice, creare una variabile di ambiente denominata AZURE_SERVICEBUS_HOSTNAME
. Impostare il valore della variabile di ambiente sullo spazio dei nomi del bus di servizio.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
Eseguire quindi il comando seguente go run
per eseguire l'app:
go run main.go
Passaggi successivi
Per altre informazioni, vedere i collegamenti seguenti: