Spring Cloud Azure 支持 for Spring Integration

本文适用于: ✔️版本 4.14.0 ✔️ 版本 5.8.0

适用于 Azure 的 Spring Integration 扩展为 Azure SDK for Java 提供的各种服务的 Spring Integration 适配器。 我们为这些 Azure 服务提供 Spring Integration 支持:事件中心、服务总线、存储队列。 下面是支持的适配器列表:

Spring Integration 与 Azure 事件中心

关键概念

Azure 事件中心是大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。

Spring Integration 在基于 Spring 的应用程序中启用轻型消息传送,并支持通过声明性适配器与外部系统集成。 这些适配器针对 Spring 对远程处理、消息传递和计划的支持提供了更高级别的抽象。 事件中心扩展项目的 Spring Integration 为Azure 事件中心提供入站和出站通道适配器和网关。

注意

RxJava 支持 API 从版本 4.0.0 中删除。 有关详细信息,请参阅 Javadoc。

使用者组

事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 当 Kafka 将所有已提交的偏移量存储在中转站中时,必须存储正在手动处理的事件中心消息的偏移量。 事件中心 SDK 提供用于在Azure 存储中存储此类偏移量的函数。

分区支持

事件中心提供与 Kafka 类似的物理分区概念。 但是,与 Kafka 在使用者和分区之间自动重新平衡不同,事件中心提供了一种抢占模式。 存储帐户充当租约,以确定哪个分区由哪个使用者拥有。 当新的使用者启动时,它将尝试从大多数重负载的使用者中窃取某些分区,以实现工作负荷均衡。

若要指定负载均衡策略,开发人员可以使用 EventHubsContainerProperties 该配置。 有关如何配置EventHubsContainerProperties的示例,请参阅以下部分

Batch 使用者支持

支持 EventHubsInboundChannelAdapter 批处理使用模式。 若要启用它,用户可以在构造EventHubsInboundChannelAdapter实例时指定ListenerMode.BATCH侦听器模式。 启用后,将接收有效负载是批处理事件列表的消息,并将其传递到下游通道。 每个消息标头也会转换为列表,其中内容是从每个事件分析的关联标头值。 对于分区 ID 的公共标头,检查pointer 和最后一个排队属性,它们显示为整个事件批次的单个值共享同一个。 有关详细信息,请参阅 事件中心消息标头 部分。

注意

仅当使用 MANUAL 检查point 模式时,检查point 标头才存在。

批处理使用者的检查点支持两种模式: BATCHMANUALBATCHmode 是一种自动检查点模式,用于在收到事件后检查将整批事件一起指向一起。 MANUAL模式是检查用户指向事件。 使用时,检查点器将传递到消息标头中,用户可以使用该检查点执行检查点。

批处理使用策略可以由属性max-sizemax-wait-time指定,并且,这是max-size可选的必需属性max-wait-time。 若要指定批处理使用策略,开发人员可以使用 EventHubsContainerProperties 该策略进行配置。 有关如何配置EventHubsContainerProperties的示例,请参阅以下部分

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
</dependency>

配置

此初学者提供以下 3 部分的配置选项:

连接ion 配置属性

本部分包含用于连接到Azure 事件中心的配置选项。

注意

如果选择使用安全主体通过 Microsoft Entra ID 进行身份验证和授权来访问 Azure 资源,请参阅 “使用 Microsoft Entra ID 授权访问”,以确保安全主体已获得访问 Azure 资源的足够权限。

连接 spring-cloud-azure-starter-integration-eventhubs 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.eventhubs.enabled boolean 是否启用Azure 事件中心。
spring.cloud.azure.eventhubs.connection-string 字符串 事件中心命名空间连接字符串值。
spring.cloud.azure.eventhubs.namespace 字符串 事件中心命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成
spring.cloud.azure.eventhubs.domain-name 字符串 Azure 事件中心命名空间值的域名。
spring.cloud.azure.eventhubs.custom-endpoint-address 字符串 自定义终结点地址。
spring.cloud.azure.eventhubs.shared-connection 布尔 基础 EventProcessorClient 和 EventHubProducerAsyncClient 是否使用相同的连接。 默认情况下,为创建的每个事件中心客户端构造并使用新连接。

检查点配置属性

本部分包含用于保存分区所有权和检查点信息的 存储 Blob 服务的配置选项。

注意

从版本 4.0.0 起,spring.cloud.azure.eventhubs.processor.检查point-store.create-container-if-not-exists 未手动启用,不会自动创建存储容器。

检查点 spring-cloud-azure-starter-integration-eventhubs 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.eventhubs.processor。检查point-store.create-container-if-not-exists 布尔 是否允许创建容器(如果不存在)。
spring.cloud.azure.eventhubs.processor。检查point-store.account-name 字符串 存储帐户的名称。
spring.cloud.azure.eventhubs.processor。检查point-store.account-key 字符串 存储帐户访问密钥。
spring.cloud.azure.eventhubs.processor。检查point-store.container-name 字符串 存储容器名称。

常见的 Azure 服务 SDK 配置选项也可以配置为存储 Blob 检查point 存储。 支持的配置选项在 Spring Cloud Azure 配置引入,可以使用统一前缀spring.cloud.azure.或前缀进行spring.cloud.azure.eventhubs.processor.checkpoint-store配置。

事件中心处理器配置属性

使用该EventHubsInboundChannelAdapterEventProcessorClient事件中心的消息来配置开发人员可用于EventHubsContainerProperties配置的整体属性EventProcessorClient。 请参阅 以下部分 ,了解如何使用 EventHubsInboundChannelAdapter

基本用法

将消息发送到Azure 事件中心

  1. 填写凭据配置选项。

    • 对于连接字符串凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT-CONTAINER}
                  account-name: ${CHECKPOINT-STORAGE-ACCOUNT}
                  account-key: ${CHECKPOINT-ACCESS-KEY}
      
    • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      
    • 对于凭据即服务主体,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
      

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  1. 使用 EventHubsTemplate bean 创建DefaultMessageHandler以将消息发送到事件中心。

    class Demo {
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENTHUB_NAME = "eh1";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
            return handler;
        }
    }
    
  2. 通过消息通道使用上述消息处理程序创建消息网关绑定。

    class Demo {
        @Autowired
        EventHubOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用网关发送消息。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

从Azure 事件中心接收消息

  1. 填写凭据配置选项。

  2. 创建消息通道作为输入通道的 bean。

    @Configuration
    class Demo {
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 EventHubsMessageListenerContainer bean 创建EventHubsInboundChannelAdapter以从事件中心接收消息。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENTHUB_NAME = "eh1";
        private static final String CONSUMER_GROUP = "$Default";
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(
                @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENTHUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    }
    
  4. 通过之前创建的消息通道,使用 EventHubsInboundChannelAdapter 创建消息接收器绑定。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

配置 EventHubsMessageConverter 以自定义 objectMapper

EventHubsMessageConverter 是作为可配置的豆,允许用户自定义 ObjectMapper。

Batch 使用者支持

若要批量使用来自事件中心的消息与上述示例类似,除了用户应设置批处理消耗的相关配置选项之外 EventHubsInboundChannelAdapter

创建 EventHubsInboundChannelAdapter时,侦听器模式应设置为 BATCH。 创建 bean 时EventHubsMessageListenerContainer,请将检查point 模式设置为任MANUALBATCH一模式,并且可以根据需要配置批处理选项。

@Configuration
class Demo {
    private static final String INPUT_CHANNEL = "input";
    private static final String EVENTHUB_NAME = "eh1";
    private static final String CONSUMER_GROUP = "$Default";

    @Bean
    public EventHubsInboundChannelAdapter messageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            EventHubsMessageListenerContainer listenerContainer) {
        EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(processorContainer, ListenerMode.BATCH);
        adapter.setOutputChannel(inputChannel);
        return adapter;
    }

    @Bean
    public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
        EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
        containerProperties.setEventHubName(EVENTHUB_NAME);
        containerProperties.setConsumerGroup(CONSUMER_GROUP);
        containerProperties.getBatch().setMaxSize(100);
        containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
        return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
    }
}

事件中心消息标头

下表说明了事件中心消息属性如何映射到 Spring 消息标头。 对于Azure 事件中心,消息称为 event

在记录侦听器模式下的事件中心消息/事件属性和 Spring Message 标头之间映射:

事件中心事件属性 Spring Message 标头常量 类型 描述
排队时间 EventHubsHeaders#ENQUEUED_TIME 即时 事件在事件中心分区中排队时的即时(UTC)。
偏移量 EventHubsHeaders#OFF标准版T Long 从关联的事件中心分区接收事件的偏移量。
分区键 AzureHeaders#PARTITION_KEY 字符串 如果最初发布事件时设置了分区哈希键,则为分区哈希键。
分区 ID AzureHeaders#RAW_PARTITION_ID 字符串 事件中心的分区 ID。
序列号 EventHubsHeaders#标准版QUENCE_NUMBER Long 在关联事件中心分区中排队时分配给事件的序列号。
最后排队事件属性 EventHubsHeaders#LAST_ENQUEUED_EVENT_PROPERTIES LastEnqueuedEventProperties 此分区中最后一个排队事件的属性。
NA AzureHeaders#CHECKPOINTER 检查点器 检查指向特定消息的标头。

用户可以分析每个事件相关信息的消息标头。 若要为事件设置消息标头,所有自定义标头都将作为事件的应用程序属性放置,其中标头设置为属性键。 从事件中心接收事件时,所有应用程序属性都将转换为消息标头。

注意

不支持手动设置分区键、排队时间、偏移量和序列号的消息标头。

启用批处理使用者模式后,将列出批处理消息的特定标头,其中包含每个事件中心事件中的值列表。

在批处理侦听器模式下,事件中心消息/事件属性和 Spring Message 标头之间的映射:

事件中心事件属性 Spring Batch 消息标头常量 类型 描述
排队时间 EventHubsHeaders#ENQUEUED_TIME 即时列表 事件中心分区中排队的每个事件的即时列表(UTC)。
偏移量 EventHubsHeaders#OFF标准版T Long 列表 从关联的事件中心分区接收每个事件的偏移量列表。
分区键 AzureHeaders#PARTITION_KEY 字符串列表 如果在最初发布每个事件时设置分区哈希键的列表。
序列号 EventHubsHeaders#标准版QUENCE_NUMBER Long 列表 在关联事件中心分区中排队时分配给每个事件的序列号的列表。
系统属性 EventHubsHeaders#BATCH_CONVERTED_SYSTEM_PROPERTIES 地图列表 每个事件的系统属性列表。
应用程序属性 EventHubsHeaders#BATCH_CONVERTED_APPLICATION_PROPERTIES 地图列表 每个事件的应用程序属性的列表,其中放置了所有自定义的消息标头或事件属性。

注意

发布消息时,上述所有批处理标头都将从消息中删除(如果存在)。

示例

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库。

Spring 与 Azure 服务总线的集成

关键概念

Spring Integration 在基于 Spring 的应用程序中启用轻型消息传送,并支持通过声明性适配器与外部系统集成。

适用于 Azure 服务总线 扩展项目的 Spring Integration 为Azure 服务总线提供入站和出站通道适配器。

注意

CompletableFuture 支持 API 已从版本 2.10.0 弃用,由 4.0.0 版中的 Reactor Core 取代。 有关详细信息,请参阅 Javadoc。

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-servicebus</artifactId>
</dependency>

配置

此初学者提供以下 2 部分的配置选项:

连接配置属性

本部分包含用于连接到Azure 服务总线的配置选项。

注意

如果选择使用安全主体通过 Microsoft Entra ID 进行身份验证和授权来访问 Azure 资源,请参阅 “使用 Microsoft Entra ID 授权访问”,以确保安全主体已获得访问 Azure 资源的足够权限。

连接 spring-cloud-azure-starter-integration-servicebus 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.servicebus.enabled boolean 是否启用Azure 服务总线。
spring.cloud.azure.servicebus.connection-string 字符串 服务总线命名空间连接字符串值。
spring.cloud.azure.servicebus.namespace 字符串 服务总线命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成
spring.cloud.azure.servicebus.domain-name 字符串 Azure 服务总线命名空间值的域名。

服务总线处理器配置属性

使用ServiceBusInboundChannelAdapterServiceBusProcessorClient消息来配置整体属性,开发人员可以使用ServiceBusContainerProperties这些属性ServiceBusProcessorClient进行配置。 请参阅 以下部分 ,了解如何使用 ServiceBusInboundChannelAdapter

基本用法

将消息发送到Azure 服务总线

  1. 填写凭据配置选项。

    • 对于连接字符串凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            servicebus:
              connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            servicebus:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  • 对于凭据即服务主体,请在 application.yml 文件中配置以下属性:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          servicebus:
            namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  1. 使用 DefaultMessageHandlerServiceBusTemplate bean 创建以将消息发送到服务总线,为 ServiceBusTemplate 设置实体类型。 此示例以服务总线队列为例。

    class Demo {
        private static final String OUTPUT_CHANNEL = "queue.output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
            serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
            DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
    
            return handler;
        }
    }
    
  2. 通过消息通道使用上述消息处理程序创建消息网关绑定。

    class Demo {
        @Autowired
        QueueOutboundGateway messagingGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface QueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用网关发送消息。

    class Demo {
        public void demo() {
            this.messagingGateway.send(message);
        }
    }
    

接收来自Azure 服务总线的消息

  1. 填写凭据配置选项。

  2. 创建消息通道作为输入通道的 bean。

    @Configuration
    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 ServiceBusMessageListenerContainer bean 创建ServiceBusInboundChannelAdapter以接收消息以服务总线。 此示例以服务总线队列为例。

    @Configuration
    class Demo {
        private static final String QUEUE_NAME = "queue1";
    
        @Bean
        public ServiceBusMessageListenerContainer messageListenerContainer(ServiceBusProcessorFactory processorFactory) {
            ServiceBusContainerProperties containerProperties = new ServiceBusContainerProperties();
            containerProperties.setEntityName(QUEUE_NAME);
            containerProperties.setAutoComplete(false);
            return new ServiceBusMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
            @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
            ServiceBusMessageListenerContainer listenerContainer) {
            ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    }
    
  4. 通过之前创建的消息通道创建消息接收器绑定 ServiceBusInboundChannelAdapter

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        }
    }
    

配置 ServiceBusMessageConverter 以自定义 objectMapper

ServiceBusMessageConverter 是作为可配置的豆子,允许用户自定义 ObjectMapper

服务总线邮件头

对于可映射到多个 Spring 标头常量的某些服务总线标头,列出了不同 Spring 标头的优先级。

服务总线标头和 Spring Headers 之间的映射:

服务总线消息标头和属性 Spring message 标头常量 类型 可配置 说明
内容类型 MessageHeaders#CONTENT_TYPE 字符串 消息的RFC2045内容类型描述符。
相关 ID ServiceBusMessageHeaders#CORRELATION_ID 字符串 消息的相关 ID
消息 ID ServiceBusMessageHeaders#MESSAGE_ID 字符串 消息的消息 ID,此标头的优先级高于 MessageHeaders#ID
消息 ID MessageHeaders#ID UUID 消息的消息 ID,此标头的优先级低于 ServiceBusMessageHeaders#MESSAGE_ID
分区键 ServiceBusMessageHeaders#PARTITION_KEY 字符串 用于将消息发送到分区实体的分区键。
回复 MessageHeaders#REPLY_CHANNEL 字符串 要向其发送答复的实体的地址。
回复会话 ID ServiceBusMessageHeaders#REPLY_TO_SESSION_ID 字符串 消息的 ReplyToGroupId 属性值。
计划的排队时间 utc ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME OffsetDateTime 消息在服务总线中排队的日期/时间,此标头的优先级高于 AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE
计划的排队时间 utc AzureHeaders#SCHEDULED_ENQUEUE_MESSAGE Integer 邮件在服务总线中排队的日期/时间,此标头的优先级低于 ServiceBusMessageHeaders#SCHEDULED_ENQUEUE_TIME
会话 ID ServiceBusMessageHeaders#SESSION_ID 字符串 会话感知实体的会话 IDentifier。
生存时间 ServiceBusMessageHeaders#TIME_TO_LIVE 持续时间 此消息过期之前的持续时间。
若要 ServiceBusMessageHeaders#TO 字符串 消息的“收件人”地址,保留供将来在路由方案中使用,目前被中转站本身忽略。
主题 ServiceBusMessageHeaders#SUBJECT 字符串 邮件的主题。
死信错误说明 ServiceBusMessageHeaders#DEAD_LETTER_ERROR_DESCRIPTION 字符串 已死信的消息的说明。
死信原因 ServiceBusMessageHeaders#DEAD_LETTER_REASON 字符串 消息死信的原因。
死信源 ServiceBusMessageHeaders#DEAD_LETTER_SOURCE 字符串 消息为死信的实体。
传递计数 ServiceBusMessageHeaders#DELIVERY_COUNT long 此消息传递到客户端的次数。
排队序列号 ServiceBusMessageHeaders#ENQUEUED_SEQUENCE_NUMBER long 通过服务总线分配给消息的排队序列号。
排队时间 ServiceBusMessageHeaders#ENQUEUED_TIME OffsetDateTime 在服务总线中排队此消息的日期时间。
过期时间 ServiceBusMessageHeaders#EXPIRES_AT OffsetDateTime 此消息到期的日期/时间。
锁定令牌 ServiceBusMessageHeaders#LOCK_TOKEN 字符串 当前消息的锁定令牌。
锁定到 ServiceBusMessageHeaders#LOCKED_UNTIL OffsetDateTime 此消息的锁定过期的日期/时间。
序列号 ServiceBusMessageHeaders#SEQUENCE_NUMBER long 通过服务总线分配给消息的唯一编号。
状态 ServiceBusMessageHeaders#STATE ServiceBusMessageState 消息的状态,可以是“活动”、“延迟”或“计划”。

分区键支持

此初学者通过允许在消息标头中设置分区键和会话 ID,支持服务总线分区。 本部分介绍如何设置消息的分区键。

建议: 用作 ServiceBusMessageHeaders.PARTITION_KEY 标头的键。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

不建议,但当前受支持:AzureHeaders.PARTITION_KEY 作为标头的键。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(AzureHeaders.PARTITION_KEY, "Customize partition key")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

注意

ServiceBusMessageHeaders.PARTITION_KEY在消息标头中同时设置和AzureHeaders.PARTITION_KEY设置时,ServiceBusMessageHeaders.PARTITION_KEY首选。

会话支持

此示例演示如何在应用程序中手动设置消息的会话 ID。

public class SampleController {
    @PostMapping("/messages")
    public ResponseEntity<String> sendMessage(@RequestParam String message) {
        LOGGER.info("Going to add message {} to Sinks.Many.", message);
        many.emitNext(MessageBuilder.withPayload(message)
                                    .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
                                    .build(), Sinks.EmitFailureHandler.FAIL_FAST);
        return ResponseEntity.ok("Sent!");
    }
}

注意

ServiceBusMessageHeaders.SESSION_ID在消息标头中设置会话 ID 时,还会设置不同的ServiceBusMessageHeaders.PARTITION_KEY标头,会话 ID 的值最终将用于覆盖分区键的值。

示例

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库。

Spring 与 Azure 存储队列的集成

关键概念

Azure 队列存储是一项可存储大量消息的服务。 可以使用 HTTP 或 HTTPS 通过经验证的调用从世界任何位置访问消息。 队列消息大小最大可为 64 KB。 一个队列可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列通常用于创建要异步处理的积压工作 (backlog)。

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
</dependency>

配置

此初学者提供以下配置选项:

连接配置属性

本部分包含用于连接到Azure 存储队列的配置选项。

注意

如果选择使用安全主体通过 Microsoft Entra ID 进行身份验证和授权来访问 Azure 资源,请参阅 “使用 Microsoft Entra ID 授权访问”,以确保安全主体已获得访问 Azure 资源的足够权限。

连接 spring-cloud-azure-starter-integration-storage-queue 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.storage.queue.enabled boolean 是否启用Azure 存储队列。
spring.cloud.azure.storage.queue.connection-string 字符串 存储队列命名空间连接字符串值。
spring.cloud.azure.storage.queue.accountName 字符串 存储队列帐户名称。
spring.cloud.azure.storage.queue.accountKey 字符串 存储队列帐户密钥。
spring.cloud.azure.storage.queue.endpoint 字符串 存储队列服务终结点。
spring.cloud.azure.storage.queue.sasToken 字符串 Sas 令牌凭据
spring.cloud.azure.storage.queue.serviceVersion QueueServiceVersion 发出 API 请求时使用的 QueueServiceVersion。
spring.cloud.azure.storage.queue.messageEncoding 字符串 队列消息编码。

基本用法

将消息发送到Azure 存储队列

  1. 填写凭据配置选项。

    • 对于连接字符串凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            storage:
              queue:
                connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
      
    • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              managed-identity-enabled: true
              client-id: ${AZURE_CLIENT_ID}
            profile:
              tenant-id: <tenant>
            storage:
              queue:
                namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
      

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  • 对于凭据即服务主体,请在 application.yml 文件中配置以下属性:

    spring:
      cloud:
        azure:
          credential:
            client-id: ${AZURE_CLIENT_ID}
            client-secret: ${AZURE_CLIENT_SECRET}
          profile:
            tenant-id: <tenant>
          storage:
            queue:
              namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
    

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  1. 使用 StorageQueueTemplate bean 创建DefaultMessageHandler,以将消息发送到存储队列。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    }
    
  2. 通过消息通道创建包含上述消息处理程序的消息网关绑定。

    class Demo {
        @Autowired
        StorageQueueOutboundGateway storageQueueOutboundGateway;
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    }
    
  3. 使用网关发送消息。

    class Demo {
        public void demo() {
            this.storageQueueOutboundGateway.send(message);
        }
    }
    

从Azure 存储队列接收消息

  1. 填写凭据配置选项。

  2. 创建消息通道作为输入通道的 bean。

    class Demo {
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    }
    
  3. 使用 StorageQueueTemplate bean 创建StorageQueueMessageSource以接收消息以存储队列。

    class Demo {
        private static final String STORAGE_QUEUE_NAME = "example";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    }
    
  4. 创建一个消息接收器绑定,其中存储通过之前创建的消息通道在上一步中创建的QueueMessageSource。

    class Demo {
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("New message received: '{}'", message);
            checkpointer.success()
                .doOnError(Throwable::printStackTrace)
                .doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
                .block();
        }
    }
    

示例

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库。