Compartir a través de


biblioteca cliente de Azure Service Bus para Java: versión 7.14.4

Microsoft Azure Service Bus es un agente de mensajes de integración empresarial completamente administrado. Service Bus puede desacoplar aplicaciones y servicios. Service Bus ofrece una plataforma confiable y segura para la transferencia asincrónica de datos y estado. Los datos se transfieren entre distintas aplicaciones y servicios mediante mensajes. Si desea obtener más información sobre Azure Service Bus, puede que desee revisar: ¿Qué es Service Bus?

La biblioteca cliente de Azure Service Bus permite enviar y recibir mensajes de Azure Service Bus y puede usarse para:

  • Transferir datos empresariales, como pedidos de ventas o compras, diarios o movimientos de inventario.
  • Desacopla las aplicaciones para mejorar la confiabilidad y escalabilidad de las aplicaciones y los servicios. Los clientes y servicios no tienen que estar en línea al mismo tiempo.
  • Habilite 1:n relaciones entre publicadores y suscriptores.
  • Implemente flujos de trabajo que requieran la ordenación de mensajes o el aplazamiento de mensajes.

Código | fuenteDocumentación | de referencia de APIDocumentación | del productoMuestras | Paquete (Maven)

Introducción

Requisitos previos

Para crear rápidamente los recursos de Service Bus necesarios en Azure y recibir una cadena de conexión para ellos, puede implementar nuestra plantilla de ejemplo haciendo clic en:

Inclusión del paquete

Inclusión del archivo BOM

Incluya azure-sdk-bom en el proyecto para depender de la versión de disponibilidad general (GA) de la biblioteca. En el fragmento de código siguiente, reemplace el marcador de posición {bom_version_to_target} por el número de versión. Para más información sobre la lista de materiales, consulte EL ARCHIVO LÉAME BOM del SDK de AZURE.

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-sdk-bom</artifactId>
            <version>{bom_version_to_target}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

y, luego, incluya la dependencia directa en la sección de dependencias sin la etiqueta de versión.

<dependencies>
  <dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
  </dependency>
</dependencies>

Inclusión de dependencias directas

Si quiere depender de una versión determinada de la biblioteca que no está presente en la lista de materiales, agregue la dependencia directa al proyecto como se indica a continuación.

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-messaging-servicebus</artifactId>
    <version>7.14.4</version>
</dependency>

Autenticar el cliente

Para que la biblioteca cliente de Service Bus interactúe con Service Bus, deberá comprender cómo conectarse y autorizarla.

Creación de clientes de Service Bus mediante una cadena de conexión

El medio más sencillo para la autenticación es usar una cadena de conexión, que se crea automáticamente al crear un espacio de nombres de Service Bus. Si no está familiarizado con las directivas de acceso compartido en Azure, puede seguir la guía paso a paso para obtener una cadena de conexión de Service Bus.

Se crean instancias de los clientes remitentes y receptores de Service Bus asincrónicos y sincrónicos mediante ServiceBusClientBuilder. Los fragmentos de código siguientes crean un remitente de Service Bus sincrónico y un receptor asincrónico, respectivamente.

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver()
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .buildAsyncClient();

Creación de un cliente de Service Bus mediante la Plataforma de identidad de Microsoft (anteriormente Azure Active Directory)

El SDK de Azure para Java admite el paquete azure Identity, lo que facilita la obtención de credenciales de la Plataforma de identidad de Microsoft. En primer lugar, agregue el paquete:

<dependency>
    <groupId>com.azure</groupId>
    <artifactId>azure-identity</artifactId>
    <version>1.10.1</version>
</dependency>
  • Problema conocido: el archivo pom.xml debe enumerar azure-messaging-servicebus antes azure-identity de las bibliotecas cliente. Este problema se resuelve con azure-identity:1.2.1. Obtenga más información en este artículo.

Las formas implementadas de solicitar una credencial están en el com.azure.identity.credential paquete. En el ejemplo siguiente se muestra cómo usar un secreto de cliente de aplicación de Azure Active Directory (AAD) para autorizar con Azure Service Bus.

Autorización con DefaultAzureCredential

La autorización es más fácil mediante DefaultAzureCredential. Encuentra la mejor credencial para usar en su entorno en ejecución. Para más información sobre el uso de la autorización de Azure Active Directory con Service Bus, consulte la documentación asociada.

Use la credencial de token devuelta para autenticar al cliente:

TokenCredential credential = new DefaultAzureCredentialBuilder()
    .build();
ServiceBusReceiverAsyncClient receiver = new ServiceBusClientBuilder()
    .credential("<<fully-qualified-namespace>>", credential)
    .receiver()
    .queueName("<<queue-name>>")
    .buildAsyncClient();

Conceptos clave

Puede interactuar con los tipos de recursos principales dentro de un espacio de nombres de Service Bus, de los cuales pueden existir varios y en qué transmisión real de mensajes tiene lugar. El espacio de nombres suele servir como contenedor de aplicaciones:

  • Una cola permite el envío y la recepción de mensajes, ordenados primero en salir. A menudo se usa para la comunicación de punto a punto.
  • Un tema es más adecuado para escenarios de publicador y suscriptor. Un tema publica mensajes en suscripciones, de los cuales pueden existir varios simultáneamente.
  • Una suscripción recibe mensajes de un tema. Cada suscripción es independiente y recibe una copia del mensaje enviado al tema.

Clientes de Service Bus

El generador ServiceBusClientBuilder se usa para crear todos los clientes de Service Bus.

Ejemplos

Envío de mensajes

Deberá crear un asincrónico o sincrónico ServiceBusSenderAsyncClientServiceBusSenderClient para enviar mensajes. Cada remitente puede enviar mensajes a una cola o a un tema.

El fragmento de código siguiente crea un sincrónico ServiceBusSenderClient para publicar un mensaje en una cola.

ServiceBusSenderClient sender = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
List<ServiceBusMessage> messages = Arrays.asList(
    new ServiceBusMessage("Hello world").setMessageId("1"),
    new ServiceBusMessage("Bonjour").setMessageId("2"));

sender.sendMessages(messages);

// When you are done using the sender, dispose of it.
sender.close();

Recepción de mensajes

Para recibir mensajes, deberá crear un ServiceBusProcessorClient con devoluciones de llamada para los mensajes entrantes y cualquier error que se produzca en el proceso. A continuación, puede iniciar y detener el cliente según sea necesario.

Al recibir un mensaje con el modo PeekLock , indica al agente que la lógica de la aplicación quiere liquidar (por ejemplo, completar, abandonar) los mensajes recibidos explícitamente.

// Sample code that processes a single message which is received in PeekLock mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    // Randomly complete or abandon each message. Ideally, in real-world scenarios, if the business logic
    // handling message reaches desired state such that it doesn't require Service Bus to redeliver
    // the same message, then context.complete() should be called otherwise context.abandon().
    final boolean success = Math.random() < 0.5;
    if (success) {
        try {
            context.complete();
        } catch (Exception completionError) {
            System.out.printf("Completion of the message %s failed\n", message.getMessageId());
            completionError.printStackTrace();
        }
    } else {
        try {
            context.abandon();
        } catch (Exception abandonError) {
            System.out.printf("Abandoning of the message %s failed\n", message.getMessageId());
            abandonError.printStackTrace();
        }
    }
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
                                .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
                                .processor()
                                .queueName("<< QUEUE NAME >>")
                                .receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
                                .disableAutoComplete() // Make sure to explicitly opt in to manual settlement (e.g. complete, abandon).
                                .processMessage(processMessage)
                                .processError(processError)
                                .disableAutoComplete()
                                .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

Al recibir un mensaje con el modo ReceiveAndDelete , indica al agente que considere todos los mensajes que envía al cliente receptor como se liquida cuando se envía.

// Sample code that processes a single message which is received in ReceiveAndDelete mode.
Consumer<ServiceBusReceivedMessageContext> processMessage = context -> {
    final ServiceBusReceivedMessage message = context.getMessage();
    System.out.printf("handler processing message. Session: %s, Sequence #: %s. Contents: %s%n", message.getMessageId(),
        message.getSequenceNumber(), message.getBody());
};

// Sample code that gets called if there's an error
Consumer<ServiceBusErrorContext> processError = errorContext -> {
    System.err.println("Error occurred while receiving message: " + errorContext.getException());
};

// create the processor client via the builder and its sub-builder
ServiceBusProcessorClient processorClient = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .processor()
    .queueName("<< QUEUE NAME >>")
    .receiveMode(ServiceBusReceiveMode.RECEIVE_AND_DELETE)
    .processMessage(processMessage)
    .processError(processError)
    .disableAutoComplete()
    .buildProcessorClient();

// Starts the processor in the background and returns immediately
processorClient.start();

Hay cuatro maneras de establecer mensajes mediante los métodos en el contexto del mensaje pasado a la devolución de llamada.

  • Completado: hace que el mensaje se elimine de la cola o tema.
  • Abandonar: libera el bloqueo del receptor en el mensaje que permite que otros receptores reciban el mensaje.
  • Aplazar: aplaza el mensaje de recibirse por medios normales. Para recibir mensajes diferidos, es necesario conservar el número de secuencia del mensaje.
  • Mensajes fallidos: mueve el mensaje a la cola de mensajes fallidos. Esto impedirá que se vuelva a recibir el mensaje. Para recibir mensajes de la cola de mensajes fallidos, se necesita un receptor con ámbito en la cola de mensajes fallidos.

Envío y recepción de colas o temas habilitados para la sesión

El uso de sesiones requiere que cree una cola o una suscripción habilitadas para la sesión. Puede obtener más información sobre cómo configurarlo en "Sesiones de mensajes".

Las sesiones de Azure Service Bus permiten la administración ordenada y conjunta de secuencias sin enlace de mensajes relacionados. Se pueden usar sesiones en patrones FIFO (primero en entrar, primero en salir) y de solicitud-respuesta. Cualquier remitente puede crear una sesión al enviar mensajes a un tema o una cola estableciendo la ServiceBusMessage.setSessionId(String) propiedad en algún identificador definido por la aplicación que sea único para la sesión.

A diferencia de las colas o suscripciones no habilitadas para sesión, solo un receptor puede leer desde una sesión en cualquier momento. Cuando un receptor captura una sesión, Service Bus bloquea la sesión de ese receptor y tiene acceso exclusivo a los mensajes de esa sesión.

Enviar un mensaje a una sesión

Cree un ServiceBusSenderClient para una suscripción de tema o cola habilitada para sesión. Al establecer ServiceBusMessage.setSessionId(String) en , ServiceBusMessage se publicará el mensaje en esa sesión. Si la sesión no existe, se crea.

// Setting sessionId publishes that message to a specific session, in this case, "greeting".
ServiceBusMessage message = new ServiceBusMessage("Hello world")
    .setSessionId("greetings");

sender.sendMessage(message);

Recepción de mensajes de una sesión

Recibir mensajes de sesiones es similar a recibir mensajes de una cola o suscripción no habilitada para sesión. La diferencia está en el generador y en la clase que se usa.

En caso de no sesión, usaría el sub builder processor(). En el caso de las sesiones, usaría el sub builder sessionProcessor(). Ambos sub builders crearán una instancia de ServiceBusProcessorClient configurada para trabajar en una sesión o en una entidad de Service Bus que no sea de sesión. En el caso del procesador de sesión, también puede pasar el número máximo de sesiones que desea que el procesador procese simultáneamente.

Creación de un receptor de cola de mensajes fallidos

Azure Service Bus colas y suscripciones de temas proporcionan una subcola secundaria, denominada cola de mensajes fallidos (DLQ). La cola de mensajes fallidos no se necesita crear explícitamente y no se puede eliminar ni administrar independientemente de la entidad principal. En el caso de las suscripciones de temas o colas habilitadas para la sesión o que no son de sesión, el receptor de mensajes fallidos se puede crear de la misma manera que se muestra a continuación. Obtenga más información sobre la cola de mensajes fallidos aquí.

ServiceBusReceiverClient receiver = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>")
    .receiver() // Use this for session or non-session enabled queue or topic/subscriptions
    .topicName("<< TOPIC NAME >>")
    .subscriptionName("<< SUBSCRIPTION NAME >>")
    .subQueue(SubQueue.DEAD_LETTER_QUEUE)
    .buildClient();

Uso compartido de la conexión entre clientes

La creación de una conexión física a Service Bus requiere recursos. Una aplicación debe compartir la conexión
entre los clientes que se pueden lograr compartiendo el generador de nivel superior, como se muestra a continuación.

// Create shared builder.
ServiceBusClientBuilder sharedConnectionBuilder = new ServiceBusClientBuilder()
    .connectionString("<< CONNECTION STRING FOR THE SERVICE BUS NAMESPACE >>");
// Create receiver and sender which will share the connection.
ServiceBusReceiverClient receiver = sharedConnectionBuilder
    .receiver()
    .queueName("<< QUEUE NAME >>")
    .buildClient();
ServiceBusSenderClient sender = sharedConnectionBuilder
    .sender()
    .queueName("<< QUEUE NAME >>")
    .buildClient();

Cuándo usar "ServiceBusProcessorClient".

¿Cuándo usar "ServiceBusProcessorClient", "ServiceBusReceiverClient" o ServiceBusReceiverAsyncClient? El procesador se crea con "ServiceBusReceiverAsyncClient", proporciona una forma cómoda de recibir mensajes con el autocompletar predeterminado y la renovación automática de bloqueos de mensajes en modo "PEEK_LOCK". El procesador es adecuado cuando las aplicaciones no han realizado el traslado completo al cliente receptor asincrónico y quieren procesar el mensaje en modo sincrónico. El procesador recibe mensajes para siempre porque se recupera de los errores de red internamente. Se realiza una llamada de función "ServiceBusProcessorClient:processMessage()" para cada mensaje. Como alternativa, también puede usar "ServiceBusReceiverClient", es un cliente de nivel inferior y proporciona una gama más amplia de API. Si el procesamiento asincrónico es
adecuado para la aplicación, puede usar "ServiceBusReceiverAsyncClient".

Solución de problemas

Habilitación del registro de cliente

El SDK de Azure para Java ofrece un artículo de registro coherente para ayudar a solucionar errores de aplicación y acelerar su resolución. Los registros generados capturarán el flujo de una aplicación antes de alcanzar el estado terminal para ayudar a encontrar el problema raíz. Consulte la wiki de registro para obtener instrucciones sobre cómo habilitar el registro.

Habilitación del registro de transporte de AMQP

Si habilitar el registro de cliente no es suficiente para diagnosticar los problemas. Puede habilitar el registro en un archivo en la biblioteca AMQP subyacente, Qpid Proton-J. Qpid Proton-J usa java.util.logging. Para habilitar el registro, cree un archivo de configuración con el contenido siguiente. O bien, establezca proton.trace.level=ALL y las opciones de configuración que desee para la java.util.logging.Handler implementación. Las clases de implementación y sus opciones se pueden encontrar en javadoc del SDK de Java 8.

Para realizar un seguimiento de los marcos de transporte de AMQP, establezca la variable de entorno: PN_TRACE_FRM=1.

Archivo "logging.properties" de ejemplo

El archivo de configuración siguiente registra la salida de seguimiento de proton-j al archivo "proton-trace.log".

handlers=java.util.logging.FileHandler
.level=OFF
proton.trace.level=ALL
java.util.logging.FileHandler.level=ALL
java.util.logging.FileHandler.pattern=proton-trace.log
java.util.logging.FileHandler.formatter=java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format=[%1$tF %1$tr] %3$s %4$s: %5$s %n

Excepciones comunes

Excepción de AMQP

Se trata de una excepción general para los errores relacionados con AMQP, que incluye los errores de AMQP como ErrorCondition y el contexto que provocó esta excepción como AmqpErrorContext. isTransient es un valor booleano que indica si la excepción es un error transitorio o no. Si se produce una excepción de AMQP transitoria, la biblioteca cliente reintenta la operación tantas veces como permite AmqpRetryOptions . Afterwords, se produce un error en la operación y se propaga una excepción al usuario.

AmqpErrorCondition contiene condiciones de error comunes al protocolo AMQP y que usan los servicios de Azure. Cuando se produce una excepción AMQP, examinar el campo de condición de error puede informar a los desarrolladores de por qué se produjo la excepción AMQP y, si es posible, cómo mitigar esta excepción. Puede encontrar una lista de todas las excepciones de AMQP en OASIS AMQP Versión 1.0 Errores de transporte.

La manera recomendada de resolver la excepción específica que representa la excepción amQP es seguir las instrucciones de excepciones de mensajería de Service Bus .

Descripción del comportamiento de las API

El documento aquí proporciona información sobre el comportamiento esperado de la API sincrónica receiveMessages cuando se usa para obtener más de un mensaje (a.k.a. captura previa implícita).

Pasos siguientes

Más allá de lo descrito, la biblioteca cliente de Azure Service Bus ofrece compatibilidad con muchos escenarios adicionales para ayudar a aprovechar el conjunto de características completo del servicio Azure Service Bus. Para ayudar a explorar algunos de estos escenarios, el siguiente conjunto de ejemplos está disponible aquí.

Contribuciones

Si desea convertirse en colaborador activo de este proyecto, consulte nuestras Directrices de contribución para obtener más información.

Impresiones