Partilhar via


Suporte do Spring Cloud Azure para Spring Integration

Este artigo aplica-se a: ✔️ Versão 4.14.0 Versão 5.8.0 ✔️

O Spring Integration Extension for Azure fornece adaptadores Spring Integration para os vários serviços fornecidos pelo SDK do Azure para Java. Fornecemos suporte ao Spring Integration para estes serviços do Azure: Hubs de Eventos, Service Bus, Fila de Armazenamento. Segue-se uma lista de adaptadores suportados:

Integração do Spring com Hubs de Eventos do Azure

Conceitos-chave

Os Hubs de Eventos do Azure são uma plataforma de streaming de big data e um serviço de ingestão de eventos. Pode receber e processar milhões de eventos por segundo. Os dados enviados para um hub de eventos podem ser transformados e armazenados em qualquer fornecedor de análise em tempo real ou adaptadores de armazenamento/criação de batches.

O Spring Integration permite mensagens leves dentro de aplicativos baseados em Spring e suporta a integração com sistemas externos por meio de adaptadores declarativos. Esses adaptadores fornecem um nível mais alto de abstração em relação ao suporte do Spring para comunicação remota, mensagens e agendamento. O projeto de extensão Spring Integration for Event Hubs fornece adaptadores de canal de entrada e saída e gateways para Hubs de Eventos do Azure.

Nota

As APIs de suporte do RxJava são descartadas da versão 4.0.0. Consulte Javadoc para obter detalhes.

Grupo de consumidores

Os Hubs de Eventos fornecem suporte semelhante ao grupo de consumidores do Apache Kafka, mas com uma lógica ligeiramente diferente. Enquanto Kafka armazena todos os deslocamentos confirmados no corretor, você precisa armazenar deslocamentos de mensagens de Hubs de Eventos sendo processadas manualmente. O SDK dos Hubs de Eventos fornece a função para armazenar esses deslocamentos dentro do Armazenamento do Azure.

Suporte de particionamento

Os Hubs de Eventos fornecem um conceito de partição física semelhante ao Kafka. Mas, ao contrário do reequilíbrio automático entre consumidores e partições de Kafka, os Hubs de Eventos fornecem uma espécie de modo preventivo. A conta de armazenamento atua como uma concessão para determinar qual partição é de propriedade de qual consumidor. Quando um novo consumidor começa, ele tentará roubar algumas partições da maioria dos consumidores pesados para alcançar o balanceamento da carga de trabalho.

Para especificar a estratégia de balanceamento de carga, os desenvolvedores podem usar EventHubsContainerProperties para a configuração. Consulte a seção a seguir para obter um exemplo de como configurar EventHubsContainerPropertieso .

Suporte ao consumidor em lote

O EventHubsInboundChannelAdapter suporta o modo de consumo em lote. Para habilitá-lo, os usuários podem especificar o modo de ouvinte como ListenerMode.BATCH ao construir uma EventHubsInboundChannelAdapter instância. Quando habilitada, uma mensagem da qual a carga útil é uma lista de eventos em lote será recebida e passada para o canal downstream. Cada cabeçalho de mensagem também é convertido como uma lista, cujo conteúdo é o valor de cabeçalho associado analisado de cada evento. Para os cabeçalhos comuns de ID de partição, ponteiro de verificação e últimas propriedades enfileiradas, eles são apresentados como um único valor para todo o lote de eventos compartilha o mesmo. Para obter mais informações, consulte a seção Cabeçalhos de mensagem dos Hubs de Eventos.

Nota

O cabeçalho do ponto de verificação só existe quando o modo de ponto de verificação MANUAL é usado.

O ponto de verificação do consumidor em lote suporta dois modos: BATCH e MANUAL. BATCH modo é um modo de ponto de verificação automático para verificar todo o lote de eventos juntos uma vez que eles são recebidos. MANUAL modo é verificar os eventos pelos usuários. Quando usado, o Checkpointer será passado para o cabeçalho da mensagem e os usuários poderão usá-lo para fazer checkpointing.

A política de consumo em lote pode ser especificada pelas propriedades de max-size e , onde max-size é uma propriedade necessária enquanto max-wait-time é max-wait-timeopcional. Para especificar a estratégia de consumo em lote, os desenvolvedores podem usar EventHubsContainerProperties para a configuração. Consulte a seção a seguir para obter um exemplo de como configurar EventHubsContainerPropertieso .

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

Configuração

Este acionador de partida fornece as seguintes 3 partes das opções de configuração:

Propriedades de configuração da conexão

Esta seção contém as opções de configuração usadas para se conectar aos Hubs de Eventos do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão de spring-cloud-azure-starter-integration-eventhubs:

Propriedade Type Description
spring.cloud.azure.eventhubs.enabled boolean Se um Hubs de Eventos do Azure está habilitado.
spring.cloud.azure.eventhubs.connection-string String Valor da cadeia de conexão de namespace de Hubs de Eventos.
spring.cloud.azure.eventhubs.namespace String Valor de namespace de Hubs de Eventos, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.eventhubs.nome-de-domínio String Nome de domínio de um valor de Namespace dos Hubs de Eventos do Azure.
spring.cloud.azure.eventhubs.custom-endpoint-address String Endereço de ponto final personalizado.
spring.cloud.azure.eventhubs.shared-connection Boolean Se o subjacente EventProcessorClient e EventHubProducerAsyncClient usam a mesma conexão. Por padrão, uma nova conexão é construída e usada para cada cliente do Hub de Eventos criado.

Propriedades de configuração do ponto de verificação

Esta seção contém as opções de configuração para o serviço de Blobs de Armazenamento, que é usado para manter a propriedade da partição e as informações do ponto de verificação.

Nota

A partir da versão 4.0.0, quando a propriedade de spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists não estiver habilitada manualmente, nenhum contêiner de armazenamento será criado automaticamente.

Propriedades configuráveis de ponto de verificação de spring-cloud-azure-starter-integration-eventhubs:

Propriedade Type Description
spring.cloud.azure.eventhubs.processor.checkpoint-store.create-container-if-not-exists Boolean Se a criação de contêineres não existe, deve ser permitida.
spring.cloud.azure.eventhubs.processor.checkpoint-store.nome-da-conta String Nome da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key String Chave de acesso da conta de armazenamento.
spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name String Nome do recipiente de armazenamento.

As opções comuns de configuração do SDK do Serviço do Azure também são configuráveis para o armazenamento de pontos de verificação de Blob de Armazenamento. As opções de configuração com suporte são introduzidas na configuração do Spring Cloud Azure e podem ser configuradas com o prefixo unificado ou o prefixo spring.cloud.azure. de spring.cloud.azure.eventhubs.processor.checkpoint-store.

Propriedades de configuração do processador do Hub de Eventos

O EventHubsInboundChannelAdapter usa o EventProcessorClient para consumir mensagens de um hub de eventos, para configurar as propriedades gerais de um EventProcessorClient, os desenvolvedores podem usar EventHubsContainerProperties para a configuração. Consulte a secção seguinte sobre como trabalhar com EventHubsInboundChannelAdaptero .

Utilização básica

Enviar mensagens para Hubs de Eventos do Azure

  1. Preencha as opções de configuração de credenciais.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • Para credenciais como identidades gerenciadas, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

Nota

Os valores permitidos sãotenant-id: common, , consumersorganizations, ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e da organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  1. Crie DefaultMessageHandler com o EventHubsTemplate bean para enviar mensagens para Hubs de Eventos.

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. Crie uma ligação de gateway de mensagens com o manipulador de mensagens acima por meio de um canal de mensagem.

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envie mensagens usando o gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Receber mensagens dos Hubs de Eventos do Azure

  1. Preencha as opções de configuração de credenciais.

  2. Crie um bean de canal de mensagem como o canal de entrada.

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Crie EventHubsInboundChannelAdapter com o EventHubsMessageListenerContainer bean para receber mensagens de Hubs de Eventos.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. Crie uma associação de recetor de mensagem com EventHubsInboundChannelAdapter por meio do canal de mensagem criado anteriormente.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurar o EventHubsMessageConverter para personalizar o objectMapper

EventHubsMessageConverter é feito como um bean configurável para permitir que os usuários personalizem o ObjectMapper.

Suporte ao consumidor em lote

Para consumir mensagens de Hubs de Eventos em lotes é semelhante ao exemplo acima, além disso, os usuários devem definir as opções de configuração relacionadas ao consumo em lote para EventHubsInboundChannelAdapter.

Ao criar EventHubsInboundChannelAdapter, o modo de ouvinte deve ser definido como BATCH. Ao criar o bean do , defina o modo de ponto de verificação como um ou MANUALBATCH, e as opções de lote podem ser configuradas EventHubsMessageListenerContainerconforme necessário.

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

Cabeçalhos de mensagem dos Hubs de Eventos

A tabela a seguir ilustra como as propriedades de mensagem dos Hubs de Eventos são mapeadas para cabeçalhos de mensagem Spring. Para Hubs de Eventos do Azure, a mensagem é chamada como event.

Mapeamento entre Hubs de Eventos, Propriedades de Mensagem/Evento e Cabeçalhos de Mensagem Spring no Modo de Escuta de Gravação:

Propriedades de eventos de Hubs de Eventos Constantes de cabeçalho de mensagem de mola Type Description
Tempo enfileirado EventHubsHeaders#ENQUEUED_TIME Instantâneo O instante, em UTC, de quando o evento foi enfileirado na partição do Hub de Eventos.
Desvio EventHubsHeaders#DESLOCAMENTO Longo O deslocamento do evento quando ele foi recebido da partição do Hub de Eventos associada.
Chave de partição AzureHeaders#PARTITION_KEY String A chave de hash da partição, se tiver sido definida durante a publicação original do evento.
ID da Partição AzureHeaders#RAW_PARTITION_ID String A ID da partição do Hub de Eventos.
Número de sequência EventHubsHeaders#SEQUENCE_NUMBER Longo O número de sequência atribuído ao evento quando ele foi enfileirado na partição do Hub de Eventos associada.
Propriedades do último evento enfileirado EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties As propriedades do último evento enfileirado nesta partição.
ND AzureHeaders#CHECKPOINTER Ponteiro de verificação O cabeçalho do ponto de verificação da mensagem específica.

Os usuários podem analisar os cabeçalhos das mensagens para obter as informações relacionadas de cada evento. Para definir um cabeçalho de mensagem para o evento, todos os cabeçalhos personalizados serão colocados como uma propriedade de aplicativo de um evento, onde o cabeçalho é definido como a chave de propriedade. Quando os eventos são recebidos dos Hubs de Eventos, todas as propriedades do aplicativo serão convertidas no cabeçalho da mensagem.

Nota

Não há suporte para cabeçalhos de mensagem de chave de partição, tempo enfileirado, deslocamento e número de sequência para serem definidos manualmente.

Quando o modo batch-consumer está habilitado, os cabeçalhos específicos de mensagens em lote são listados da seguinte forma, que contém uma lista de valores de cada evento de Hubs de Eventos único.

Mapeamento entre Hubs de Eventos, Propriedades de Mensagem/Evento e Cabeçalhos de Mensagem primavera no Modo de Escuta em Lote:

Propriedades de eventos de Hubs de Eventos Constantes de cabeçalho de mensagem de lote de mola Type Description
Tempo enfileirado EventHubsHeaders#ENQUEUED_TIME Lista de Instant Lista do instante, em UTC, de quando cada evento foi enfileirado na partição do Hub de Eventos.
Desvio EventHubsHeaders#DESLOCAMENTO Lista de Longa Lista do deslocamento de cada evento quando ele foi recebido da partição do Hub de Eventos associada.
Chave de partição AzureHeaders#PARTITION_KEY Lista de String Lista da chave de hash da partição, se ela foi definida ao publicar originalmente cada evento.
Número de sequência EventHubsHeaders#SEQUENCE_NUMBER Lista de Longa Lista do número de sequência atribuído a cada evento quando ele foi enfileirado na partição do Hub de Eventos associada.
Propriedades do sistema EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES Lista de Mapa Lista das propriedades do sistema de cada evento.
Propriedades da aplicação EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES Lista de Mapa Lista das propriedades do aplicativo de cada evento, onde todos os cabeçalhos de mensagem personalizados ou propriedades do evento são colocados.

Nota

Ao publicar mensagens, todos os cabeçalhos de lote acima serão removidos das mensagens, se existirem.

Exemplos

Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.

Integração do Spring com o Azure Service Bus

Conceitos-chave

O Spring Integration permite mensagens leves dentro de aplicativos baseados em Spring e suporta a integração com sistemas externos por meio de adaptadores declarativos.

O projeto de extensão Spring Integration for Azure Service Bus fornece adaptadores de canal de entrada e saída para o Azure Service Bus.

Nota

As APIs de suporte do CompletableFuture foram preteridas da versão 2.10.0 e substituídas pelo Reator Core da versão 4.0.0. Consulte Javadoc para obter detalhes.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

Configuração

Este acionador de partida fornece as seguintes 2 partes das opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar ao Barramento de Serviço do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão do spring-cloud-azure-starter-integration-servicebus:

Propriedade Type Description
spring.cloud.azure.servicebus.enabled boolean Se um Barramento de Serviço do Azure está habilitado.
spring.cloud.azure.servicebus.connection-string String Valor da cadeia de conexão do Namespace do Service Bus.
spring.cloud.azure.servicebus.namespace String Valor de namespace do Service Bus, que é o prefixo do FQDN. Um FQDN deve ser composto por NamespaceName.DomainName
spring.cloud.azure.servicebus.nome-de-domínio String Nome de domínio de um valor de Namespace do Barramento de Serviço do Azure.

Propriedades de configuração do processador do Service Bus

O ServiceBusInboundChannelAdapter usa o ServiceBusProcessorClient para consumir mensagens, para configurar as propriedades gerais de um ServiceBusProcessorClient, os desenvolvedores podem usar ServiceBusContainerProperties para a configuração. Consulte a secção seguinte sobre como trabalhar com ServiceBusInboundChannelAdaptero .

Utilização básica

Enviar mensagens para o Barramento de Serviço do Azure

  1. Preencha as opções de configuração de credenciais.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Para credenciais como identidades gerenciadas, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota

Os valores permitidos sãotenant-id: common, , consumersorganizations, ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e da organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota

Os valores permitidos sãotenant-id: common, , consumersorganizations, ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e da organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  1. Crie DefaultMessageHandler com o bean para enviar mensagens para o Service Bus, defina o tipo de entidade para o ServiceBusTemplate ServiceBusTemplate. Este exemplo usa a Fila do Barramento de Serviço como exemplo.

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. Crie uma ligação de gateway de mensagens com o manipulador de mensagens acima por meio de um canal de mensagem.

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envie mensagens usando o gateway.

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

Receber mensagens do Barramento de Serviço do Azure

  1. Preencha as opções de configuração de credenciais.

  2. Crie um bean de canal de mensagem como o canal de entrada.

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Crie ServiceBusInboundChannelAdapter com o ServiceBusMessageListenerContainer bean para receber mensagens no Service Bus. Este exemplo usa a Fila do Barramento de Serviço como exemplo.

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. Crie uma ligação ServiceBusInboundChannelAdapter com o recetor de mensagens através do canal de mensagens que criamos anteriormente.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

Configurar ServiceBusMessageConverter para personalizar objectMapper

ServiceBusMessageConverter é feito como um bean configurável para permitir que os usuários personalizem ObjectMappero .

Cabeçalhos de mensagens do Barramento de Serviço

Para alguns cabeçalhos do Service Bus que podem ser mapeados para várias constantes de cabeçalho Spring, a prioridade de cabeçalhos Spring diferentes é listada.

Mapeamento entre cabeçalhos do Service Bus e cabeçalhos de mola:

Cabeçalhos e propriedades de mensagens do Barramento de Serviço Constantes de cabeçalho de mensagem de mola Type Configurável Description
Tipo de conteúdo MessageHeaders#CONTENT_TYPE Cadeia (de carateres) Sim O descritor RFC2045 Content-Type da mensagem.
ID de Correlação ServiceBusMessageHeaders#CORRELATION_ID Cadeia (de carateres) Sim O ID de correlação da mensagem
ID da mensagem ServiceBusMessageHeaders#MESSAGE_ID Cadeia (de carateres) Sim O ID da mensagem da mensagem, este cabeçalho tem prioridade maior do que MessageHeaders#ID.
ID da mensagem MessageHeaders#ID UUID Sim O ID da mensagem da mensagem, este cabeçalho tem prioridade menor do que ServiceBusMessageHeaders#MESSAGE_ID.
Chave de partição ServiceBusMessageHeaders#PARTITION_KEY Cadeia (de carateres) Sim A chave de partição para enviar a mensagem para uma entidade particionada.
Responder a MessageHeaders#REPLY_CHANNEL Cadeia (de carateres) Sim O endereço de uma entidade para a qual enviar respostas.
Responder ao ID da sessão ServiceBusMessageHeaders#REPLY_TO_SESSION_ID Cadeia (de carateres) Sim O valor da propriedade ReplyToGroupId da mensagem.
Tempo de enfila agendado utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime Sim A data/hora em que a mensagem deve ser enfileirada no Service Bus, esse cabeçalho tem prioridade maior do que AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE.
Tempo de enfila agendado utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Número inteiro Sim A data/hora em que a mensagem deve ser enfileirada no Service Bus, esse cabeçalho tem prioridade menor do que ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME.
ID da Sessão ServiceBusMessageHeaders#SESSION_ID Cadeia (de carateres) Sim O IDentifier de sessão para uma entidade com reconhecimento de sessão.
Time to live ServiceBusMessageHeaders#TIME_TO_LIVE Duration Sim A duração do tempo antes que esta mensagem expire.
To ServiceBusMessageHeaders#TO Cadeia (de carateres) Sim O endereço "para" da mensagem, reservado para uso futuro em cenários de roteamento e atualmente ignorado pelo próprio corretor.
Subject ServiceBusMessageHeaders#SUBJECT Cadeia (de carateres) Sim O assunto da mensagem.
Descrição do erro de letra morta ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION String Não A descrição de uma mensagem com letra morta.
Razão letra morta ServiceBusMessageHeaders#DEAD_LETTER_REASON String Não A razão pela qual uma mensagem estava morta.
Fonte letra morta ServiceBusMessageHeaders#DEAD_LETTER_SOURCE String Não A entidade na qual a mensagem foi escrita sem letra.
Contagem de entregas ServiceBusMessageHeaders#DELIVERY_COUNT long Não O número de vezes que esta mensagem foi entregue aos clientes.
Número de sequência enfileirado ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long Não O número de sequência enfileirado atribuído a uma mensagem pelo Service Bus.
Tempo enfileirado ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime Não A data/hora em que esta mensagem foi enfileirada no Service Bus.
Expira em ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime Não A data/hora em que esta mensagem expirará.
Token de bloqueio ServiceBusMessageHeaders#LOCK_TOKEN String Não O token de bloqueio para a mensagem atual.
Bloqueado até ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime Não A data/hora em que o bloqueio desta mensagem expira.
Número de sequência ServiceBusMessageHeaders#SEQUENCE_NUMBER long Não O número exclusivo atribuído a uma mensagem pelo Service Bus.
Estado ServiceBusMessageHeaders#STATE ServiceBusMessageState Não O estado da mensagem, que pode ser Ativa, Adiada ou Agendada.

Suporte de chave de partição

Este acionador de partida suporta particionamento do Service Bus, permitindo definir a chave de partição e o ID da sessão no cabeçalho da mensagem. Esta seção apresenta como definir a chave de partição para mensagens.

Recomendado: Use ServiceBusMessageHeaders.PARTITION_KEY como a chave do cabeçalho.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Não recomendado, mas atualmente suportado:AzureHeaders.PARTITION_KEY como a chave do cabeçalho.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Quando ambos ServiceBusMessageHeaders.PARTITION_KEY e AzureHeaders.PARTITION_KEY são definidos nos cabeçalhos da mensagem, ServiceBusMessageHeaders.PARTITION_KEY é preferível.

Suporte de sessão

Este exemplo demonstra como definir manualmente a ID de sessão de uma mensagem no aplicativo.

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

Nota

Quando o ServiceBusMessageHeaders.SESSION_ID é definido nos cabeçalhos da mensagem, e um cabeçalho diferente ServiceBusMessageHeaders.PARTITION_KEY também é definido, o valor do ID da sessão será eventualmente usado para substituir o valor da chave de partição.

Exemplos

Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.

Integração do Spring com a fila de armazenamento do Azure

Conceitos-chave

O Armazenamento de Filas do Azure é um serviço para alojar grandes quantidades de mensagens. Você acessa mensagens de qualquer lugar do mundo por meio de chamadas autenticadas usando HTTP ou HTTPS. Uma mensagem de fila pode ter até 64 KB de tamanho. Uma fila pode conter milhões de mensagens, até o limite de capacidade total de uma conta de armazenamento. As filas são comumente usadas para criar uma lista de pendências de trabalho para processar de forma assíncrona.

Configuração de dependência

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

Configuração

Este acionador de partida fornece as seguintes opções de configuração:

Propriedades de configuração de conexão

Esta seção contém as opções de configuração usadas para se conectar à Fila de Armazenamento do Azure.

Nota

Se você optar por usar uma entidade de segurança para autenticar e autorizar com a ID do Microsoft Entra para acessar um recurso do Azure, consulte Autorizar acesso com a ID do Microsoft Entra para garantir que a entidade de segurança tenha recebido a permissão suficiente para acessar o recurso do Azure.

Propriedades configuráveis de conexão do spring-cloud-azure-starter-integration-storage-queue:

Propriedade Type Description
spring.cloud.azure.storage.queue.enabled boolean Se uma Fila de Armazenamento do Azure está habilitada.
spring.cloud.azure.storage.queue.connection-string String Valor da cadeia de conexão do namespace da fila de armazenamento.
spring.cloud.azure.storage.queue.accountName String Nome da conta da fila de armazenamento.
spring.cloud.azure.storage.queue.accountKey String Chave de conta da fila de armazenamento.
spring.cloud.azure.storage.queue.endpoint String Ponto de extremidade do serviço de fila de armazenamento.
spring.cloud.azure.storage.queue.sasToken String Credencial de token Sas
spring.cloud.azure.storage.queue.serviceVersão QueueServiceVersion QueueServiceVersion que é usado ao fazer solicitações de API.
spring.cloud.azure.storage.queue.messageEncoding String Codificação de mensagens em fila.

Utilização básica

Enviar mensagens para a Fila de Armazenamento do Azure

  1. Preencha as opções de configuração de credenciais.

    • Para credenciais como cadeia de conexão, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • Para credenciais como identidades gerenciadas, configure as seguintes propriedades no arquivo application.yml :

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

Nota

Os valores permitidos sãotenant-id: common, , consumersorganizations, ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e da organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  • Para credenciais como entidade de serviço, configure as seguintes propriedades no arquivo application.yml :

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

Nota

Os valores permitidos sãotenant-id: common, , consumersorganizations, ou o ID do locatário. Para obter mais informações sobre esses valores, consulte a seção Usado o ponto de extremidade errado (contas pessoais e da organização) de AADSTS50020 de erro - A conta de usuário do provedor de identidade não existe no locatário. Para obter informações sobre como converter seu aplicativo de locatário único, consulte Converter aplicativo de locatário único em multilocatário no Microsoft Entra ID.

  1. Crie DefaultMessageHandler com o bean para enviar mensagens para a StorageQueueTemplate Fila de Armazenamento.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. Crie uma ligação de gateway de mensagem com o manipulador de mensagens acima por meio de um canal de mensagem.

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. Envie mensagens usando o gateway.

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

Receber mensagens da Fila de Armazenamento do Azure

  1. Preencha as opções de configuração de credenciais.

  2. Crie um bean de canal de mensagem como o canal de entrada.

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. Crie StorageQueueMessageSource com o StorageQueueTemplate bean para receber mensagens na Fila de Armazenamento.

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. Crie uma associação de recetor de mensagem com StorageQueueMessageSource criada na última etapa por meio do canal de mensagem que criamos antes.

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

Exemplos

Para obter mais informações, consulte o repositório azure-spring-boot-samples no GitHub.