Envío de mensajes a las colas de Azure Service Bus (Java) y recepción desde ellas
En este inicio rápido creará aplicaciones de Java para enviar mensajes a una cola de Azure Service Bus y recibir mensajes desde allí.
Nota
En este inicio rápido se proporcionan instrucciones paso a paso para un escenario sencillo de envío de mensajes a una cola de Service Bus y la recepción de estos. Puede encontrar ejemplos pregenerados de Java para Azure Service Bus en el repositorio de Azure SDK para Java en GitHub.
Sugerencia
Si trabaja con recursos de Azure Service Bus en una aplicación de Spring, se recomienda considerar el uso de Spring Cloud Azure como alternativa. Spring Cloud Azure es un proyecto de código abierto que proporciona una integración perfecta de Spring con los servicios de Azure. Para más información sobre Spring Cloud Azure y para ver un ejemplo con Service Bus, consulte Spring Cloud Stream con Azure Service Bus.
Requisitos previos
- Suscripción a Azure. Para completar este tutorial, deberá tener una cuenta de Azure. Puede activar sus ventajas de suscriptor a MSDN o registrarse para obtener una cuenta gratuita.
- Si no tiene una cola con la que trabajar, siga los pasos del artículo Uso de Azure Portal para crear una cola de Service Bus para crear una. Anote la cadena de conexión para el espacio de nombres de Service Bus y el nombre de la cola que creó.
- Instale el SDK de Azure para Java. Si usa Eclipse, puede instalar Azure Toolkit for Eclipse, que incluye el SDK de Azure para Java. Después puede agregar las Bibliotecas de Microsoft Azure para Java al proyecto. Si utiliza IntelliJ, consulte Instalación de Azure Toolkit for IntelliJ.
mensajes a una cola
En esta sección creará un proyecto de consola de Java y agregará código para enviar mensajes a la cola que creó anteriormente.
Creación de un proyecto de consola de Java
Cree un proyecto de Java mediante Eclipse o la herramienta que prefiera.
Configuración de la aplicación para usar Service Bus
Agregue referencias a las bibliotecas de Azure Core y Azure Service Bus.
Si usa Eclipse y ha creado una aplicación de consola de Java, convierta el proyecto de Java a Maven: haga clic con el botón derecho en el proyecto en la ventana del Explorador de paquetes, seleccione Configurar ->Convert to Maven project (Convertir en proyecto de Maven). Luego, agregue dependencias a estas dos bibliotecas, tal como se muestra en el ejemplo siguiente.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.sbusquickstarts</groupId>
<artifactId>sbustopicqs</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<release>15</release>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-servicebus</artifactId>
<version>7.7.0</version>
</dependency>
</dependencies>
</project>
Incorporación de código para enviar mensajes a la cola
Agregue las siguientes instrucciones
import
al principio del archivo Java.import com.azure.messaging.servicebus.*; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.Arrays; import java.util.List;
En la clase, defina las variables para almacenar la cadena de conexión y el nombre de la cola, como se muestra a continuación:
static String connectionString = "<NAMESPACE CONNECTION STRING>"; static String queueName = "<QUEUE NAME>";
Reemplace
<NAMESPACE CONNECTION STRING>
por la cadena de conexión del espacio de nombres de Service Bus. Reemplace<QUEUE NAME>
por el nombre de la cola.Agregue un método denominado
sendMessage
a la clase para enviar un mensaje a la cola.static void sendMessage() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // send one message to the queue senderClient.sendMessage(new ServiceBusMessage("Hello, World!")); System.out.println("Sent a single message to the queue: " + queueName); }
Agregue un método denominado
createMessages
a la clase para crear una lista de mensajes. Normalmente, estos mensajes se obtienen de distintas partes de la aplicación. En este caso crearemos una lista de mensajes de ejemplo.static List<ServiceBusMessage> createMessages() { // create a list of messages and return it to the caller ServiceBusMessage[] messages = { new ServiceBusMessage("First message"), new ServiceBusMessage("Second message"), new ServiceBusMessage("Third message") }; return Arrays.asList(messages); }
Agregue un método denominado
sendMessageBatch
para enviar mensajes a la cola que ha creado. Este método crea unServiceBusSenderClient
para la cola, invoca el métodocreateMessages
para obtener la lista de mensajes, prepara uno o varios lotes y envía los lotes a la cola.static void sendMessageBatch() { // create a Service Bus Sender client for the queue ServiceBusSenderClient senderClient = new ServiceBusClientBuilder() .connectionString(connectionString) .sender() .queueName(queueName) .buildClient(); // Creates an ServiceBusMessageBatch where the ServiceBus. ServiceBusMessageBatch messageBatch = senderClient.createMessageBatch(); // create a list of messages List<ServiceBusMessage> listOfMessages = createMessages(); // We try to add as many messages as a batch can fit based on the maximum size and send to Service Bus when // the batch can hold no more messages. Create a new batch for next set of messages and repeat until all // messages are sent. for (ServiceBusMessage message : listOfMessages) { if (messageBatch.tryAddMessage(message)) { continue; } // The batch is full, so we create a new batch and send the batch. senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); // create a new batch messageBatch = senderClient.createMessageBatch(); // Add that message that we couldn't before. if (!messageBatch.tryAddMessage(message)) { System.err.printf("Message is too large for an empty batch. Skipping. Max size: %s.", messageBatch.getMaxSizeInBytes()); } } if (messageBatch.getCount() > 0) { senderClient.sendMessages(messageBatch); System.out.println("Sent a batch of messages to the queue: " + queueName); } //close the client senderClient.close(); }
mensajes de una cola
En esta sección agregará código para recuperar mensajes de la cola.
Agregue un método denominado
receiveMessages
para recibir mensajes de la cola. Este método crea unServiceBusProcessorClient
para la cola al especificar un controlador para procesar los mensajes y otro, para los errores. A continuación, inicia el procesador, espera unos segundos, imprime los mensajes que se reciben, y detiene y cierra el procesador.Importante
Reemplace
QueueTest
enQueueTest::processMessage
en el código por el nombre de la clase.// handles received messages static void receiveMessages() throws InterruptedException { CountDownLatch countdownLatch = new CountDownLatch(1); // Create an instance of the processor through the ServiceBusClientBuilder ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder() .connectionString(connectionString) .processor() .queueName(queueName) .processMessage(QueueTest::processMessage) .processError(context -> processError(context, countdownLatch)) .buildProcessorClient(); System.out.println("Starting the processor"); processorClient.start(); TimeUnit.SECONDS.sleep(10); System.out.println("Stopping and closing the processor"); processorClient.close(); }
Agregue el método
processMessage
para procesar un mensaje recibido de la suscripción de Service Bus.private static void processMessage(ServiceBusReceivedMessageContext context) { ServiceBusReceivedMessage message = context.getMessage(); System.out.printf("Processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(), message.getSequenceNumber(), message.getBody()); }
Agregue el método
processError
para controlar los mensajes de error.private static void processError(ServiceBusErrorContext context, CountDownLatch countdownLatch) { System.out.printf("Error when receiving messages from namespace: '%s'. Entity: '%s'%n", context.getFullyQualifiedNamespace(), context.getEntityPath()); if (!(context.getException() instanceof ServiceBusException)) { System.out.printf("Non-ServiceBusException occurred: %s%n", context.getException()); return; } ServiceBusException exception = (ServiceBusException) context.getException(); ServiceBusFailureReason reason = exception.getReason(); if (reason == ServiceBusFailureReason.MESSAGING_ENTITY_DISABLED || reason == ServiceBusFailureReason.MESSAGING_ENTITY_NOT_FOUND || reason == ServiceBusFailureReason.UNAUTHORIZED) { System.out.printf("An unrecoverable error occurred. Stopping processing with reason %s: %s%n", reason, exception.getMessage()); countdownLatch.countDown(); } else if (reason == ServiceBusFailureReason.MESSAGE_LOCK_LOST) { System.out.printf("Message lock lost for message: %s%n", context.getException()); } else if (reason == ServiceBusFailureReason.SERVICE_BUSY) { try { // Choosing an arbitrary amount of time to wait until trying again. TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { System.err.println("Unable to sleep for period of time"); } } else { System.out.printf("Error source %s, reason %s, message: %s%n", context.getErrorSource(), reason, context.getException()); } }
Actualice el método
main
para invocar los métodossendMessage
,sendMessageBatch
yreceiveMessages
, y para iniciarInterruptedException
.public static void main(String[] args) throws InterruptedException { sendMessage(); sendMessageBatch(); receiveMessages(); }
Ejecutar la aplicación
Al ejecutar la aplicación, verá los siguientes mensajes en la ventana de la consola.
Sent a single message to the queue: myqueue
Sent a batch of messages to the queue: myqueue
Starting the processor
Processing message. Session: 88d961dd801f449e9c3e0f8a5393a527, Sequence #: 1. Contents: Hello, World!
Processing message. Session: e90c8d9039ce403bbe1d0ec7038033a0, Sequence #: 2. Contents: First message
Processing message. Session: 311a216a560c47d184f9831984e6ac1d, Sequence #: 3. Contents: Second message
Processing message. Session: f9a871be07414baf9505f2c3d466c4ab, Sequence #: 4. Contents: Third message
Stopping and closing the processor
En la página Información general del espacio de nombres de Service Bus en Azure Portal, verá el recuento de mensajes entrantes y salientes. Es posible que tenga que esperar alrededor de un minuto y luego actualizar la página para ver los valores más recientes.
Seleccione la cola en esta página Información general para ir a la página Cola de Service Bus. En esta página también verá el recuento de mensajes entrantes y salientes. También verá otra información, como el tamaño actual de la cola, el tamaño máximo o el recuento de mensajes activos.
Pasos siguientes
Consulte la documentación y los ejemplos siguientes: