Compartir vía


Spring Cloud Soporte técnico de Azure for Spring Integration

Este artículo se aplica a: ✔️ Versión 4.14.0 ✔️ versión 5.8.0

La extensión de integración de Spring para Azure proporciona adaptadores de integración de Spring para los distintos servicios proporcionados por el SDK de Azure para Java. Proporcionamos compatibilidad con Spring Integration para estos servicios de Azure: Event Hubs, Service Bus, Storage Queue. A continuación se muestra una lista de adaptadores admitidos:

Integración de Spring con Azure Event Hubs

Conceptos clave

Azure Event Hubs es una plataforma de streaming de macrodatos y un servicio de ingesta de eventos. Puede recibir y procesar millones de eventos por segundo. Los datos enviados a un centro de eventos se pueden transformar y almacenar con cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes y almacenamiento.

Spring Integration permite la mensajería ligera dentro de las aplicaciones basadas en Spring y admite la integración con sistemas externos a través de adaptadores declarativos. Esos adaptadores proporcionan un nivel superior de abstracción a través de la compatibilidad de Spring con la comunicación remota, la mensajería y la programación. El proyecto de extensión Spring Integration for Event Hubs proporciona adaptadores y puertas de enlace de canal entrantes y salientes para Azure Event Hubs.

Nota:

Las API de soporte técnico de RxJava se quitan de la versión 4.0.0. Consulte Javadoc para obtener más información.

Grupo de consumidores

Event Hubs proporciona compatibilidad similar del grupo de consumidores como Apache Kafka, pero con una lógica ligeramente diferente. Aunque Kafka almacena todos los desplazamientos confirmados en el agente, debe almacenar los desplazamientos de los mensajes de Event Hubs que se procesan manualmente. El SDK de Event Hubs proporciona la función para almacenar estos desplazamientos dentro de Azure Storage.

Compatibilidad con particiones

Event Hubs proporciona un concepto similar de partición física como Kafka. Pero a diferencia del reequilibrio automático de Kafka entre consumidores y particiones, Event Hubs proporciona un tipo de modo preferente. La cuenta de almacenamiento actúa como concesión para determinar qué partición es propiedad del consumidor. Cuando se inicia un nuevo consumidor, intentará robar algunas particiones de los consumidores más cargados para lograr el equilibrio de carga de trabajo.

Para especificar la estrategia de equilibrio de carga, los desarrolladores pueden usar EventHubsContainerProperties para la configuración. Consulte la sección siguiente para obtener un ejemplo de cómo configurar EventHubsContainerProperties.

Compatibilidad con consumidores de Batch

EventHubsInboundChannelAdapter admite el modo de consumo por lotes. Para habilitarlo, los usuarios pueden especificar el modo de escucha como ListenerMode.BATCH al construir una EventHubsInboundChannelAdapter instancia. Cuando está habilitada, se recibirá un mensaje que indica que la carga es una lista de eventos por lotes y se pasará al canal de bajada. Cada encabezado de mensaje también se convierte como una lista, de la que el contenido es el valor de encabezado asociado analizado de cada evento. Para los encabezados comunes del identificador de partición, el punto de control y las últimas propiedades en cola, se presentan como un valor único para todo el lote de eventos que comparten el mismo. Para obtener más información, consulte la sección Encabezados de mensaje de Event Hubs .

Nota:

El encabezado de punto de control solo existe cuando se usa el modo de punto de control MANUAL .

El punto de comprobación del consumidor por lotes admite dos modos: BATCH y MANUAL. BATCH mode es un modo de control automático para controlar todo el lote de eventos juntos una vez que se reciben. MANUAL el modo consiste en controlar los eventos por parte de los usuarios. Cuando se usa, el controlador de puntos de control se pasará al encabezado del mensaje y los usuarios podrían usarlo para realizar puntos de control.

La directiva de consumo por lotes se puede especificar mediante propiedades de max-size y , donde max-size es una propiedad necesaria mientras max-wait-time es max-wait-timeopcional. Para especificar la estrategia de consumo por lotes, los desarrolladores pueden usar EventHubsContainerProperties para la configuración. Consulte la sección siguiente para obtener un ejemplo de cómo configurar EventHubsContainerProperties.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuración

Este inicio proporciona las siguientes tres partes de las opciones de configuración:

Propiedades de configuración de Conectar ion

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Event Hubs.

Nota:

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Conectar ion propiedades configurables de spring-cloud-azure-starter-integration-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.enabled boolean Si una instancia de Azure Event Hubs está habilitada.
spring.cloud.azure.eventhubs.connection-string Cadena Espacio de nombres de Event Hubs cadena de conexión valor.
spring.cloud.azure.eventhubs.namespace Cadena Valor del espacio de nombres de Event Hubs, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.domain-name Cadena Nombre de dominio de un valor de espacio de nombres de Azure Event Hubs.
spring.cloud.azure.eventhubs.custom-endpoint-address Cadena Dirección del punto de conexión personalizado.
spring.cloud.azure.eventhubs.shared-connection Booleano Si EventProcessorClient y EventHubProducerAsyncClient subyacente usan la misma conexión. De forma predeterminada, se crea una nueva conexión y se usa para cada cliente del centro de eventos creado.

Propiedades de configuración del punto de control

Esta sección contiene las opciones de configuración del servicio Storage Blobs, que se usa para conservar la propiedad de la partición y la información del punto de control.

Nota:

Desde la versión 4.0.0, cuando la propiedad de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists no está habilitada manualmente, no se creará automáticamente ningún contenedor de almacenamiento.

Propiedades configurables de punto de control de spring-cloud-azure-starter-integration-eventhubs:

Propiedad Tipo Descripción
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Booleano Si quiere permitir la creación de contenedores si no existe.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name Cadena Nombre de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key Cadena Clave de acceso de la cuenta de almacenamiento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name Cadena Nombre del contenedor de almacenamiento.

Las opciones comunes de configuración del SDK de Azure Service también se pueden configurar para el almacén de puntos de control de Blob de Storage. Las opciones de configuración admitidas se presentan en la configuración de Azure de Spring Cloud y se pueden configurar con el prefijo spring.cloud.azure. unificado o el prefijo de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propiedades de configuración del procesador del centro de eventos

EventHubsInboundChannelAdapter usa EventProcessorClient para consumir mensajes de un centro de eventos para configurar las propiedades generales de un EventProcessorClient, los desarrolladores pueden usar EventHubsContainerProperties para la configuración. Consulte la sección siguiente sobre cómo trabajar con EventHubsInboundChannelAdapter.

Uso básico

Envío de mensajes a Azure Event Hubs

  1. Rellene las opciones de configuración de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  1. Cree DefaultMessageHandler con el EventHubsTemplate bean para enviar mensajes a Event Hubs.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensajes mediante la puerta de enlace.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recepción de mensajes de Azure Event Hubs

  1. Rellene las opciones de configuración de credenciales.

  2. Cree un bean del canal de mensajes como canal de entrada.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Cree EventHubsInboundChannelAdapter con el EventHubsMessageListenerContainer bean para recibir mensajes de Event Hubs.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Cree un enlace de receptor de mensajes con EventHubsInboundChannelAdapter a través del canal de mensajes creado antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configuración de EventHubsMessageConverter para personalizar objectMapper

EventHubsMessageConverter se realiza como un bean configurable para permitir a los usuarios personalizar ObjectMapper.

Compatibilidad con consumidores de Batch

Para consumir mensajes de Event Hubs en lotes es similar al ejemplo anterior, además de que los usuarios deben establecer las opciones de configuración relacionadas que consumen por lotes para EventHubsInboundChannelAdapter.

Al crear EventHubsInboundChannelAdapter, el modo de agente de escucha debe establecerse como BATCH. Cuando cree bean de , establezca el modo de punto de EventHubsMessageListenerContainercontrol como MANUAL o BATCHy las opciones por lotes se pueden configurar según sea necesario.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Encabezados de mensaje de Event Hubs

En la tabla siguiente se muestra cómo se asignan las propiedades del mensaje de Event Hubs a los encabezados de mensaje de Spring. Para Azure Event Hubs, se llama al mensaje como event.

Asignación entre las propiedades de evento o mensaje de Event Hubs y los encabezados de mensaje de Spring en modo de agente de escucha de registros:

Propiedades del evento de Event Hubs Constantes de encabezado de mensaje de Spring Tipo Descripción
Hora puesta en cola EventHubsHeaders#ENQUEUED_TIME Instantáneos El instante, en UTC, de cuándo se puso en cola el evento en la partición del centro de eventos.
Desplazamiento EventHubsHeaders#OFFSET Largo Desplazamiento del evento cuando se recibió de la partición del centro de eventos asociada.
Clave de partición AzureHeaders#PARTITION_KEY Cadena Clave hash de partición si se estableció al publicar originalmente el evento.
Id. de partición AzureHeaders#RAW_PARTITION_ID Cadena Identificador de partición del centro de eventos.
Número de secuencia EventHubsHeaders#SEQUENCE_NUMBER Largo Número de secuencia asignado al evento cuando se puso en cola en la partición del centro de eventos asociado.
Últimas propiedades de evento en cola EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties Propiedades del último evento en cola en esta partición.
N/D AzureHeaders#CHECKPOINTER Checkpointer Encabezado del punto de control del mensaje específico.

Los usuarios pueden analizar los encabezados de mensaje para obtener la información relacionada de cada evento. Para establecer un encabezado de mensaje para el evento, todos los encabezados personalizados se colocarán como una propiedad de aplicación de un evento, donde el encabezado se establece como clave de propiedad. Cuando se reciben eventos de Event Hubs, todas las propiedades de la aplicación se convertirán en el encabezado del mensaje.

Nota:

No se admiten los encabezados de mensaje de clave de partición, tiempo en cola, desplazamiento y número de secuencia para establecerse manualmente.

Cuando el modo de consumidor por lotes está habilitado, los encabezados específicos de los mensajes por lotes se enumeran a continuación, que contiene una lista de valores de cada evento de Event Hubs único.

Asignación entre event Hubs Message/Event Properties y Spring Message Headers en el modo de agente de escucha por lotes:

Propiedades del evento de Event Hubs Constantes de encabezado de mensaje de Spring Batch Tipo Descripción
Hora puesta en cola EventHubsHeaders#ENQUEUED_TIME Lista de instantáneas Lista del instante, en UTC, de cuándo se puso en cola cada evento en la partición del centro de eventos.
Desplazamiento EventHubsHeaders#OFFSET Lista de long Lista del desplazamiento de cada evento cuando se recibió de la partición del centro de eventos asociada.
Clave de partición AzureHeaders#PARTITION_KEY Lista de cadenas Lista de la clave de hash de partición si se estableció al publicar originalmente cada evento.
Número de secuencia EventHubsHeaders#SEQUENCE_NUMBER Lista de long Lista del número de secuencia asignado a cada evento cuando se puso en cola en la partición del centro de eventos asociada.
Propiedades del sistema EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista de mapa Lista de las propiedades del sistema de cada evento.
Propiedades de la aplicación EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista de mapa Lista de las propiedades de la aplicación de cada evento, donde se colocan todos los encabezados de mensaje personalizados o propiedades de evento.

Nota:

Al publicar mensajes, todos los encabezados por lotes anteriores se quitarán de los mensajes si existen.

Ejemplos

Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.

Integración de Spring con Azure Service Bus

Conceptos clave

Spring Integration permite la mensajería ligera dentro de las aplicaciones basadas en Spring y admite la integración con sistemas externos a través de adaptadores declarativos.

El proyecto de extensión Spring Integration for Azure Service Bus proporciona adaptadores de canal entrantes y salientes para Azure Service Bus.

Nota:

Las API de soporte técnico completableFuture han quedado en desuso de la versión 2.10.0 y se reemplazan por Reactor Core de la versión 4.0.0. Consulte Javadoc para obtener más información.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuración

Este inicio proporciona las siguientes 2 partes de las opciones de configuración:

propiedades de configuración de Conectar ion

Esta sección contiene las opciones de configuración que se usan para conectarse a Azure Service Bus.

Nota:

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Conectar las propiedades configurables de spring-cloud-azure-starter-integration-servicebus:

Propiedad Tipo Descripción
spring.cloud.azure.servicebus.enabled boolean Indica si una instancia de Azure Service Bus está habilitada.
spring.cloud.azure.servicebus.connection-string Cadena Espacio de nombres de Service Bus cadena de conexión valor.
espacio de nombres spring.cloud.azure.servicebus.namespace Cadena Valor del espacio de nombres de Service Bus, que es el prefijo del FQDN. Un FQDN debe estar compuesto por NamespaceName.DomainName
spring.cloud.azure.servicebus.domain-name Cadena Nombre de dominio de un valor de espacio de nombres de Azure Service Bus.

Propiedades de configuración del procesador de Service Bus

ServiceBusInboundChannelAdapter usa ServiceBusProcessorClient para consumir mensajes, para configurar las propiedades generales de un ServiceBusProcessorClient, los desarrolladores pueden usar ServiceBusContainerProperties para la configuración. Consulte la sección siguiente sobre cómo trabajar con ServiceBusInboundChannelAdapter.

Uso básico

Envío de mensajes a Azure Service Bus

  1. Rellene las opciones de configuración de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  1. Cree DefaultMessageHandler con el ServiceBusTemplate bean para enviar mensajes a Service Bus y establezca el tipo de entidad para ServiceBusTemplate. En este ejemplo se toma la cola de Service Bus como ejemplo.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensajes mediante la puerta de enlace.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Recepción de mensajes de Azure Service Bus

  1. Rellene las opciones de configuración de credenciales.

  2. Cree un bean del canal de mensajes como canal de entrada.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Cree ServiceBusInboundChannelAdapter con el ServiceBusMessageListenerContainer bean para recibir mensajes en Service Bus. En este ejemplo se toma la cola de Service Bus como ejemplo.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Cree un enlace de receptor de mensajes con ServiceBusInboundChannelAdapter a través del canal de mensajes que creamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configuración de ServiceBusMessageConverter para personalizar objectMapper

ServiceBusMessageConverter se realiza como un bean configurable para permitir a los usuarios personalizar ObjectMapper.

Encabezados de mensaje de Service Bus

Para algunos encabezados de Service Bus que se pueden asignar a varias constantes de encabezado de Spring, se muestra la prioridad de distintos encabezados de Spring.

Asignación entre encabezados de Service Bus y encabezados de Spring:

Encabezados y propiedades del mensaje de Service Bus Constantes de encabezado de mensaje spring Tipo Configurable Descripción
Tipo de contenido MessageHeaders#CONTENT_TYPE Cadena Descriptor de tipo de contenido RFC2045 del mensaje.
Identificador de correlación ServiceBusMessageHeaders#CORRELATION_ID Cadena Identificador de correlación del mensaje
Id. del mensaje ServiceBusMessageHeaders#MESSAGE_ID Cadena El identificador de mensaje del mensaje, este encabezado tiene mayor prioridad que MessageHeaders#ID.
Id. del mensaje MessageHeaders#ID UUID El identificador de mensaje del mensaje, este encabezado tiene una prioridad menor que ServiceBusMessageHeaders#MESSAGE_ID.
Clave de partición ServiceBusMessageHeaders#PARTITION_KEY Cadena Clave de partición para enviar el mensaje a una entidad con particiones.
Responder a MessageHeaders#REPLY_CHANNEL Cadena Dirección de una entidad a la que se van a enviar respuestas.
Responder al identificador de sesión ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Cadena Valor de la propiedad ReplyToGroupId del mensaje.
Hora de puesta en cola programada utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Fecha y hora en la que se debe poner el mensaje en cola en Service Bus, este encabezado tiene una prioridad más alta que AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Hora de puesta en cola programada utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Entero Fecha y hora en la que se debe poner en cola el mensaje en Service Bus, este encabezado tiene una prioridad menor que ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
Id. sesión ServiceBusMessageHeaders#SESSION_ID Cadena Identificador de sesión para una entidad compatible con sesión.
Período de vida ServiceBusMessageHeaders#TIME_TO_LIVE Duration Duración del tiempo antes de que expire este mensaje.
En ServiceBusMessageHeaders#TO Cadena La dirección "to" del mensaje, reservada para su uso futuro en escenarios de enrutamiento y actualmente ignorada por el propio agente.
Asunto ServiceBusMessageHeaders#SUBJECT Cadena Asunto del mensaje.
Descripción del error de mensajes fallidos ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION Cadena No La descripción de un mensaje que se ha escrito con mensajes fallidos.
Motivo de mensajes fallidos ServiceBusMessageHeaders#DEAD_LETTER_REASON Cadena No La razón por la que se ha producido un mensaje fallido.
Origen de mensajes fallidos ServiceBusMessageHeaders#DEAD_LETTER_SOURCE Cadena No La entidad en la que el mensaje estaba en mensajes fallidos.
Recuento de entregas ServiceBusMessageHeaders#DELIVERY_COUNT long No Número de veces que este mensaje se entregó a los clientes.
Número de secuencia en cola ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long No Número de secuencia en cola asignado a un mensaje por Service Bus.
Hora puesta en cola ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime No Fecha y hora en la que se puso en cola este mensaje en Service Bus.
Expira a las ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime No Fecha y hora en la que expirará este mensaje.
Token de bloqueo ServiceBusMessageHeaders#LOCK_TOKEN Cadena No Token de bloqueo del mensaje actual.
Bloqueado hasta ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime No Fecha y hora en la que expira el bloqueo de este mensaje.
Número de secuencia ServiceBusMessageHeaders#SEQUENCE_NUMBER long No Número único asignado a un mensaje por Service Bus.
Estado ServiceBusMessageHeaders#STATE ServiceBusMessageState No Estado del mensaje, que puede ser Activo, Diferido o Programado.

Compatibilidad con claves de partición

Este inicio admite la creación de particiones de Service Bus al permitir establecer la clave de partición y el identificador de sesión en el encabezado del mensaje. En esta sección se presenta cómo establecer la clave de partición para los mensajes.

Recomendado: use ServiceBusMessageHeaders.PARTITION_KEY como clave del encabezado.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

No se recomienda, pero actualmente se admite:AzureHeaders.PARTITION_KEY como clave del encabezado.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota:

Cuando y ServiceBusMessageHeaders.PARTITION_KEYAzureHeaders.PARTITION_KEY se establecen en los encabezados de mensaje, ServiceBusMessageHeaders.PARTITION_KEY se prefiere.

Compatibilidad con sesiones

En este ejemplo se muestra cómo establecer manualmente el identificador de sesión de un mensaje en la aplicación.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota:

ServiceBusMessageHeaders.SESSION_ID Cuando se establece en los encabezados del mensaje y también se establece otro ServiceBusMessageHeaders.PARTITION_KEY encabezado, el valor del identificador de sesión se usará finalmente para sobrescribir el valor de la clave de partición.

Ejemplos

Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.

Integración de Spring con Azure Queue Storage

Conceptos clave

Azure Queue Storage es un servicio para almacenar grandes cantidades de mensajes, a los que se puede acceder desde cualquier lugar del mundo a través de llamadas autenticadas mediante HTTP o HTTPS. Un mensaje de la cola puede llegar a tener hasta 64 KB. Una cola puede contener millones de mensajes, hasta el límite de capacidad total de una cuenta de almacenamiento. Las colas se utilizan normalmente para crear un trabajo pendiente del trabajo que se va a procesar de forma asincrónica.

Configuración de dependencias

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuración

Este inicio proporciona las siguientes opciones de configuración:

propiedades de configuración de Conectar ion

Esta sección contiene las opciones de configuración que se usan para conectarse a la cola de Azure Storage.

Nota:

Si decide usar una entidad de seguridad para autenticar y autorizar con el identificador de Entra de Microsoft para acceder a un recurso de Azure, consulte Autorización del acceso con el identificador de Microsoft Entra para asegurarse de que a la entidad de seguridad se le ha concedido el permiso suficiente para acceder al recurso de Azure.

Conectar las propiedades configurables de spring-cloud-azure-starter-integration-storage-queue:

Propiedad Tipo Descripción
spring.cloud.azure.storage.queue.enabled boolean Si una cola de Azure Storage está habilitada.
spring.cloud.azure.storage.queue.connection-string Cadena Espacio de nombres de cola de Storage cadena de conexión valor.
spring.cloud.azure.storage.queue.accountName Cadena Nombre de la cuenta de cola de Storage.
spring.cloud.azure.storage.queue.accountKey Cadena Clave de cuenta de cola de Storage.
spring.cloud.azure.storage.queue.endpoint Cadena Punto de conexión de Servicio de cola de Storage.
spring.cloud.azure.storage.queue.sasToken Cadena Credencial del token de Sas
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion QueueServiceVersion que se usa al realizar solicitudes de API.
spring.cloud.azure.storage.queue.messageEncoding Cadena Codificación de mensajes de cola.

Uso básico

Envío de mensajes a la cola de Azure Storage

  1. Rellene las opciones de configuración de credenciales.

    • Para las credenciales como cadena de conexión, configure las siguientes propiedades en el archivo application.yml:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Para las credenciales como identidades administradas, configure las siguientes propiedades en el archivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  • Para las credenciales como entidad de servicio, configure las siguientes propiedades en el archivo application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota:

Los valores permitidos para tenant-id son: common, organizations, consumerso el identificador de inquilino. Para obtener más información sobre estos valores, consulte la sección Uso del punto de conexión incorrecto (cuentas personales y de organización) de Error AADSTS50020: la cuenta de usuario del proveedor de identidades no existe en el inquilino. Para obtener información sobre la conversión de la aplicación de un solo inquilino, consulte Conversión de una aplicación de inquilino único en varios inquilinos en microsoft Entra ID.

  1. Cree DefaultMessageHandler con el StorageQueueTemplate bean para enviar mensajes a la cola de Storage.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Cree un enlace de puerta de enlace de mensajes con el controlador de mensajes anterior a través de un canal de mensajes.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Enviar mensajes mediante la puerta de enlace.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Recepción de mensajes de la cola de Azure Storage

  1. Rellene las opciones de configuración de credenciales.

  2. Cree un bean del canal de mensajes como canal de entrada.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Cree StorageQueueMessageSource con el StorageQueueTemplate bean para recibir mensajes en la cola de Storage.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Cree un enlace de receptor de mensajes con StorageQueueMessageSource creado en el último paso a través del canal de mensajes que creamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Ejemplos

Para más información, consulte el repositorio azure-spring-boot-samples en GitHub.