Envío y recepción de mensajes con destino y origen en colas de Azure Service Bus (Go)
En este tutorial, se muestra cómo enviar y recibir mensajes de colas de Azure Service Bus mediante el lenguaje de programación Go.
Azure Service Bus es un agente de mensajes empresarial totalmente administrado que incluye colas de mensajes y capacidades de publicación/suscripción. Service Bus se usa para desacoplar aplicaciones y servicios entre sí, proporcionando un transporte de mensajes distribuido, confiable y de alto rendimiento.
El paquete azservicebus de Azure SDK para Go permite enviar y recibir mensajes de Azure Service Bus y usar el lenguaje de programación Go.
Al final de este tutorial, podrá enviar un único mensaje o un lote de mensajes a una cola y recibir mensajes y mensajes fallidos que no se procesan.
Requisitos previos
- Suscripción a Azure. Puede activar sus beneficios de suscriptor de Visual Studio o MSDN o registrarse para obtener una cuenta gratuita.
- Si no tiene una cola con la que trabajar, siga los pasos del artículo Uso de Azure Portal para crear una cola de Service Bus para crear una.
- Go, versión 1.18 o posterior.
Crear la aplicación de ejemplo
Para empezar, cree un nuevo módulo de Go.
Cree un directorio para el módulo denominado
service-bus-go-how-to-use-queues
.En el directorio
azservicebus
, inicialice el módulo e instale los paquetes necesarios.go mod init service-bus-go-how-to-use-queues go get github.com/Azure/azure-sdk-for-go/sdk/azidentity go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Cree un nuevo archivo llamado
main.go
.
Autenticación y creación de un cliente
En el archivo main.go
, cree una nueva función denominada GetClient
y agregue el código siguiente:
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
La función GetClient
devuelve un nuevo objeto azservicebus.Client
que se crea mediante un espacio de nombres y una credencial de Azure Service Bus. La variable de entorno AZURE_SERVICEBUS_HOSTNAME
proporciona el espacio de nombres. Y la credencial se crea mediante la función azidentity.NewDefaultAzureCredential
.
Para el desarrollo local, DefaultAzureCredential
usó el token de acceso de la CLI de Azure, que se puede crear mediante la ejecución del comando az login
para autenticarse en Azure.
Sugerencia
Para autenticarse con una cadena de conexión, use la función NewClientFromConnectionString.
mensajes a una cola
En el archivo main.go
, cree una nueva función denominada SendMessage
y agregue el código siguiente:
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
SendMessage
toma dos parámetros: una cadena de mensaje y un objeto azservicebus.Client
. A continuación, crea un nuevo objeto azservicebus.Sender
y envía el mensaje a la cola. Para enviar mensajes de forma masiva, agregue la función SendMessageBatch
al archivo main.go
.
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
if err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil); err != nil {
panic(err)
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
SendMessageBatch
toma dos parámetros: un segmento de mensajes y un objeto azservicebus.Client
. A continuación, crea un nuevo objeto azservicebus.Sender
y envía los mensajes a la cola.
mensajes de una cola
Después de enviar mensajes a la cola, puede recibirlos con el tipo azservicebus.Receiver
. Para recibir mensajes de una cola, agregue la función GetMessage
al archivo main.go
.
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil) //Change myqueue to env var
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetMessage
toma un objeto azservicebus.Client
y crea un nuevo objeto azservicebus.Receiver
. A continuación, recibe los mensajes de la cola. La función Receiver.ReceiveMessages
toma dos parámetros: un contexto y el número de mensajes que se van a recibir. La función Receiver.ReceiveMessages
devuelve un segmento de objetos azservicebus.ReceivedMessage
.
A continuación, un bucle for
recorre en iteración los mensajes e imprime el cuerpo del mensaje. Luego se llama a la función CompleteMessage
para completar el mensaje, quitándolo de la cola.
Los mensajes que superan los límites de longitud se envían a una cola no válida, o si no se procesan correctamente, se pueden enviar a la cola de mensajes fallidos. Para enviar mensajes a la cola de mensajes fallidos, agregue la función SendDeadLetterMessage
al archivo main.go
.
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
DeadLetterMessage
toma un objeto azservicebus.Client
y un objeto azservicebus.ReceivedMessage
. A continuación, envía el mensaje a la cola de mensajes fallidos. La función toma dos parámetros: un contexto y un objeto azservicebus.DeadLetterOptions
. La función Receiver.DeadLetterMessage
devuelve un error si el mensaje no se puede enviar a la cola de mensajes fallidos.
Para recibir mensajes de la cola de mensajes fallidos, agregue la función ReceiveDeadLetterMessage
al archivo main.go
.
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription) //change to struct an unmarshal into it
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
GetDeadLetterMessage
toma un objeto azservicebus.Client
y crea un nuevo objeto azservicebus.Receiver
con opciones para la cola de mensajes fallidos. A continuación, recibe los mensajes de la cola de mensajes fallidos. La función recibe luego un mensaje de la cola de mensajes fallidos. A continuación, imprime el motivo y la descripción de los mensajes fallidos para ese mensaje.
Código de ejemplo
package main
import (
"context"
"errors"
"fmt"
"os"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)
func GetClient() *azservicebus.Client {
namespace, ok := os.LookupEnv("AZURE_SERVICEBUS_HOSTNAME") //ex: myservicebus.servicebus.windows.net
if !ok {
panic("AZURE_SERVICEBUS_HOSTNAME environment variable not found")
}
cred, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
panic(err)
}
client, err := azservicebus.NewClient(namespace, cred, nil)
if err != nil {
panic(err)
}
return client
}
func SendMessage(message string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
sbMessage := &azservicebus.Message{
Body: []byte(message),
}
err = sender.SendMessage(context.TODO(), sbMessage, nil)
if err != nil {
panic(err)
}
}
func SendMessageBatch(messages []string, client *azservicebus.Client) {
sender, err := client.NewSender("myqueue", nil)
if err != nil {
panic(err)
}
defer sender.Close(context.TODO())
batch, err := sender.NewMessageBatch(context.TODO(), nil)
if err != nil {
panic(err)
}
for _, message := range messages {
err := batch.AddMessage(&azservicebus.Message{Body: []byte(message)}, nil)
if errors.Is(err, azservicebus.ErrMessageTooLarge) {
fmt.Printf("Message batch is full. We should send it and create a new one.\n")
}
}
if err := sender.SendMessageBatch(context.TODO(), batch, nil); err != nil {
panic(err)
}
}
func GetMessage(count int, client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), count, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
body := message.Body
fmt.Printf("%s\n", string(body))
err = receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func DeadLetterMessage(client *azservicebus.Client) {
deadLetterOptions := &azservicebus.DeadLetterOptions{
ErrorDescription: to.Ptr("exampleErrorDescription"),
Reason: to.Ptr("exampleReason"),
}
receiver, err := client.NewReceiverForQueue("myqueue", nil)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
if len(messages) == 1 {
err := receiver.DeadLetterMessage(context.TODO(), messages[0], deadLetterOptions)
if err != nil {
panic(err)
}
}
}
func GetDeadLetterMessage(client *azservicebus.Client) {
receiver, err := client.NewReceiverForQueue(
"myqueue",
&azservicebus.ReceiverOptions{
SubQueue: azservicebus.SubQueueDeadLetter,
},
)
if err != nil {
panic(err)
}
defer receiver.Close(context.TODO())
messages, err := receiver.ReceiveMessages(context.TODO(), 1, nil)
if err != nil {
panic(err)
}
for _, message := range messages {
fmt.Printf("DeadLetter Reason: %s\nDeadLetter Description: %s\n", *message.DeadLetterReason, *message.DeadLetterErrorDescription)
err := receiver.CompleteMessage(context.TODO(), message, nil)
if err != nil {
panic(err)
}
}
}
func main() {
client := GetClient()
fmt.Println("send a single message...")
SendMessage("firstMessage", client)
fmt.Println("send two messages as a batch...")
messages := [2]string{"secondMessage", "thirdMessage"}
SendMessageBatch(messages[:], client)
fmt.Println("\nget all three messages:")
GetMessage(3, client)
fmt.Println("\nsend a message to the Dead Letter Queue:")
SendMessage("Send message to Dead Letter", client)
DeadLetterMessage(client)
GetDeadLetterMessage(client)
}
Ejecución del código
Antes de ejecutar el código, cree una variable de entorno llamada AZURE_SERVICEBUS_HOSTNAME
. Establezca el valor de la variable de entorno en el espacio de nombres del Service Bus.
export AZURE_SERVICEBUS_HOSTNAME=<YourServiceBusHostName>
A continuación, ejecute el siguiente comando go run
para ejecutar la aplicación:
go run main.go
Pasos siguientes
Para más información, consulte los siguientes vínculos: