Spring Cloud Azure 支持 for Spring Cloud Stream

本文适用于: ✔️版本 4.14.0 ✔️ 版本 5.8.0

Spring Cloud Stream 是一个框架,用于构建与共享消息系统连接的高度可缩放的事件驱动微服务。

该框架提供了一个灵活的编程模型,基于已建立和熟悉的 Spring 习惯和最佳做法。 这些最佳做法包括对持久性发布/子语义、使用者组和有状态分区的支持。

当前绑定器实现包括:

适用于 Azure 事件中心 的 Spring Cloud Stream Binder

关键概念

适用于 Azure 事件中心 的 Spring Cloud Stream Binder 为 Spring Cloud Stream 框架提供绑定实现。 此实现在其基础上使用 Spring Integration 事件中心通道适配器。 从设计的角度来看,事件中心与 Kafka 类似。 此外,可以通过 Kafka API 访问事件中心。 如果项目依赖于 Kafka API,则可以使用 Kafka API 示例尝试 事件中心

使用者组

事件中心提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 当 Kafka 将所有已提交的偏移量存储在中转站中时,必须存储正在手动处理的事件中心消息的偏移量。 事件中心 SDK 提供用于在Azure 存储中存储此类偏移量的函数。

分区支持

事件中心提供与 Kafka 类似的物理分区概念。 但是,与 Kafka 在使用者和分区之间自动重新平衡不同,事件中心提供了一种抢占模式。 存储帐户充当租约,以确定哪个使用者拥有哪个分区。 当新的使用者启动时,它会尝试从加载最重的使用者中窃取某些分区,以实现工作负荷均衡。

若要指定负载均衡策略,请提供其 spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.load-balancing.* 属性。 有关详细信息,请参阅 “使用者属性 ”部分。

Batch 使用者支持

Spring Cloud Azure Stream 事件中心绑定器支持 Spring Cloud Stream Batch 使用者功能

若要使用批处理使用者模式,请将 spring.cloud.stream.bindings.<binding-name>.consumer.batch-mode 属性设置为 true. 启用后,将收到包含批处理事件列表有效负载的消息,并将其传递给 Consumer 函数。 每个消息标头也转换为列表,其中内容是从每个事件分析的关联标头值。 分区 ID、检查pointer 和最后一个排队属性的公共标头显示为单个值,因为整个事件批次共享相同的值。 有关详细信息,请参阅 Spring Integration 的 Spring Cloud Azure 支持的事件中心消息标头部分。

注意

仅当使用 检查point 模式时MANUAL,检查point 标头才存在。

批处理使用者的检查点支持两种模式: BATCHMANUALBATCH模式是自动检查点模式,用于在绑定器接收事件后检查将整批事件一起指向一起。 MANUAL模式是检查用户指向事件。 使用时,消息Checkpointer将传递到消息标头中,用户可以使用它执行检查点。

可以通过设置max-size前缀为spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.batch.的和max-wait-time属性来指定批大小。 该 max-size 属性是必需的,属性 max-wait-time 是可选的。 有关详细信息,请参阅 “使用者属性 ”部分。

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
</dependency>

或者,也可以使用 Spring Cloud Azure Stream 事件中心初学者,如以下 Maven 示例所示:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-eventhubs</artifactId>
</dependency>

配置

绑定器提供以下三部分的配置选项:

连接配置属性

本部分包含用于连接到Azure 事件中心的配置选项。

注意

如果选择使用安全主体通过 Microsoft Entra ID 进行身份验证和授权来访问 Azure 资源,请参阅 “使用 Microsoft Entra ID 授权访问”,以确保安全主体已获得访问 Azure 资源的足够权限。

连接 spring-cloud-azure-stream-binder-eventhubs 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.eventhubs.enabled boolean 是否启用Azure 事件中心。
spring.cloud.azure.eventhubs.connection-string 字符串 事件中心命名空间连接字符串值。
spring.cloud.azure.eventhubs.namespace 字符串 事件中心命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成
spring.cloud.azure.eventhubs.domain-name 字符串 Azure 事件中心命名空间值的域名。
spring.cloud.azure.eventhubs.custom-endpoint-address 字符串 自定义终结点地址。

提示

常见的 Azure 服务 SDK 配置选项也可以针对 Spring Cloud Azure Stream 事件中心绑定器进行配置。 支持的配置选项在 Spring Cloud Azure 配置引入,可以使用统一前缀spring.cloud.azure.或前缀进行spring.cloud.azure.eventhubs.配置。

默认情况下,绑定器还支持 Spring Could Azure 资源管理器。 若要了解如何使用未被授予Data相关角色的安全主体检索连接字符串,请参阅 Spring Could Azure 资源管理器的基本使用部分。

检查点配置属性

本部分包含用于保存分区所有权和检查点信息的 存储 Blob 服务的配置选项。

注意

从版本 4.0.0 起,spring.cloud.azure.eventhubs.processor.检查point-store.create-container-if-not-exists 未手动启用,不会使用 spring.cloud.stream.bindings.binding-name.destination 的名称自动创建存储容器。

检查点 spring-cloud-azure-stream-binder-eventhubs 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.eventhubs.processor。检查point-store.create-container-if-not-exists 布尔 是否允许创建容器(如果不存在)。
spring.cloud.azure.eventhubs.processor。检查point-store.account-name 字符串 存储帐户的名称。
spring.cloud.azure.eventhubs.processor。检查point-store.account-key 字符串 存储帐户访问密钥。
spring.cloud.azure.eventhubs.processor。检查point-store.container-name 字符串 存储容器名称。

提示

常见的 Azure 服务 SDK 配置选项也可以配置为存储 Blob 检查point 存储。 支持的配置选项在 Spring Cloud Azure 配置引入,可以使用统一前缀spring.cloud.azure.或前缀进行spring.cloud.azure.eventhubs.processor.checkpoint-store配置。

Azure 事件中心绑定配置属性

以下选项分为四个部分:使用者属性、高级使用者配置、生成者属性和高级生成者配置。

使用者属性

这些属性通过 EventHubsConsumerProperties.

spring-cloud-azure-stream-binder-eventhubs 的使用者可配置属性:

properties 类型​​ 描述
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.检查point.mode CheckpointMode 使用者决定如何检查point 消息时使用的检查点模式
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.检查point.count Integer 确定每个分区的消息量以执行一个检查点。 仅在使用检查点模式时PARTITION_COUNT生效。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.检查point.interval 持续时间 确定执行一个检查点的时间间隔。 仅在使用检查点模式时TIME生效。
spring.cloud.stream.eventhubs.bindings。<binding-name.consumer.batch.max-size Integer 批处理中的最大事件数。 批处理使用者模式是必需的。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.batch.max-wait-time 持续时间 批处理使用的最大持续时间。 只有在启用批处理使用者模式并且是可选的时才生效。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.update-interval 持续时间 更新的时间间隔持续时间。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.strategy LoadBalancingStrategy 负载均衡策略。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.load-balancing.partition-ownership-expiration-interval 持续时间 分区所有权过期的持续时间。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.track-last-enqueued-event-properties 布尔 事件处理程序是否应请求有关其关联分区上最后排队事件的信息,并跟踪接收事件时的信息。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.prefetch-count Integer 使用者用来控制事件中心使用者在本地主动接收和排队的事件数的计数。
spring.cloud.stream.eventhubs.bindings.binding-name.consumer.initial-partition-event-position 将键映射为分区 ID 和值 StartPositionProperties 如果检查point 存储中不存在分区的检查点,则包含要用于每个分区的事件位置的映射。 此映射从分区 ID 中键键。

注意

配置 initial-partition-event-position 接受一个 map 指定每个事件中心的初始位置。 因此,其键是分区 ID,其值 StartPositionProperties 包括偏移量、序列号、排队日期时间和是否非独占的属性。 例如,可以将它设置为

spring:
  cloud:
    stream:
      eventhubs:
        bindings:
          <binding-name>:
            consumer:
              initial-partition-event-position:
                0:
                  offset: earliest
                1:
                  sequence-number: 100
                2:
                  enqueued-date-time: 2022-01-12T13:32:47.650005Z
                4:
                  inclusive: false
高级使用者配置

上述连接、检查点常见的 Azure SDK 客户端配置支持为每个绑定器使用者自定义,可以使用前缀spring.cloud.stream.eventhubs.bindings.<binding-name>.consumer.对其进行配置。

生成者属性

这些属性通过 EventHubsProducerProperties.

spring-cloud-azure-stream-binder-eventhubs 的生成者可配置属性:

properties 类型​​ 描述
spring.cloud.stream.eventhubs.bindings.binding-name.producer.sync boolean 生成者的同步的开关标志。 如果为 true,生成者将在发送操作后等待响应。
spring.cloud.stream.eventhubs.bindings.binding-name.producer.send-timeout long 发送操作后等待响应的时间量。 仅当启用同步生成者时才会生效。
高级生成者配置

上述 连接常见的 Azure SDK 客户端 配置支持对每个绑定器生成者进行自定义,可以使用前缀 spring.cloud.stream.eventhubs.bindings.<binding-name>.producer.对其进行配置。

基本用法

从/向事件中心发送和接收消息

  1. 使用凭据信息填充配置选项。

    • 对于连接字符串凭据,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            eventhubs:
              connection-string: ${EVENTHUB_NAMESPACE_CONNECTION_STRING}
              processor:
                checkpoint-store:
                  container-name: ${CHECKPOINT_CONTAINER}
                  account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                  account-key: ${CHECKPOINT_ACCESS_KEY}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      
    • 对于凭据即服务主体,请在 application.yml 文件中配置以下属性:

      spring:
        cloud:
          azure:
            credential:
              client-id: ${AZURE_CLIENT_ID}
              client-secret: ${AZURE_CLIENT_SECRET}
            profile:
              tenant-id: <tenant>
            eventhubs:
              namespace: ${EVENTHUB_NAMESPACE}
              processor:
                checkpoint-store:
                  container-name: ${CONTAINER_NAME}
                  account-name: ${ACCOUNT_NAME}
          function:
            definition: consume;supply
          stream:
            bindings:
              consume-in-0:
                destination: ${EVENTHUB_NAME}
                group: ${CONSUMER_GROUP}
              supply-out-0:
                destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
            eventhubs:
              bindings:
                consume-in-0:
                  consumer:
                    checkpoint:
                      mode: MANUAL
      

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

    spring:
      cloud:
        azure:
          credential:
            managed-identity-enabled: true
            client-id: ${AZURE_MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
          eventhubs:
            namespace: ${EVENTHUB_NAMESPACE}
            processor:
              checkpoint-store:
                container-name: ${CONTAINER_NAME}
                account-name: ${ACCOUNT_NAME}
        function:
          definition: consume;supply
        stream:
          bindings:
            consume-in-0:
              destination: ${EVENTHUB_NAME}
              group: ${CONSUMER_GROUP}
            supply-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_AS_ABOVE}
    
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
    
  1. 定义供应商和使用者。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
            );
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

分区支持

创建一个 PartitionSupplier 包含用户提供的分区信息,用于配置要发送的消息的分区信息。 以下流程图显示了获取分区 ID 和键的不同优先级的过程:

Diagram showing a flowchart of the partitioning support process.

Batch 使用者支持

  1. 提供批处理配置选项,如以下示例所示:

    spring:
      cloud:
        function:
          definition: consume
        stream:
          bindings:
            consume-in-0:
              destination: ${AZURE_EVENTHUB_NAME}
              group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
              consumer:
                batch-mode: true
          eventhubs:
            bindings:
              consume-in-0:
                consumer:
                  batch:
                    max-batch-size: 10 # Required for batch-consumer mode
                    max-wait-time: 1m # Optional, the default value is null
                  checkpoint:
                    mode: BATCH # or MANUAL as needed
    
  2. 定义供应商和使用者。

    对于检查指向模式BATCH,可以使用以下代码发送消息并在批处理中使用。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                        message.getPayload().get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                        ((List<Object>) message.getHeaders().get(EventHubsHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

    对于检查点模式MANUAL,可以使用以下代码以批处理方式发送消息和使用/检查点。

    @Bean
    public Consumer<Message<List<String>>> consume() {
        return message -> {
            for (int i = 0; i < message.getPayload().size(); i++) {
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
                    message.getPayload().get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_PARTITION_KEY)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_SEQUENCE_NUMBER)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_OFFSET)).get(i),
                    ((List<Object>) message.getHeaders().get(EventHubHeaders.BATCH_CONVERTED_ENQUEUED_TIME)).get(i));
            }
    
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            checkpointer.success()
                        .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                        .doOnError(error -> LOGGER.error("Exception found", error))
                        .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
        };
    }
    

注意

在批处理使用模式下,Spring Cloud Stream 绑定器的默认内容类型为 application/json,因此请确保消息有效负载与内容类型保持一致。 例如,使用默认内容类型 application/json 接收包含 String 有效负载的消息时,应 JSON String用原始 String 文本的双引号括起有效负载。 虽然对于 text/plain 内容类型,它可以是直接 String 的对象。 有关详细信息,请参阅 Spring Cloud Stream 内容类型协商

处理错误消息

  • 处理出站绑定错误消息

    默认情况下,Spring Integration 将创建名为 errorChannel 的全局错误通道。 配置以下消息终结点以处理出站绑定错误消息:

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 处理入站绑定错误消息

    Spring Cloud Stream 事件中心 Binder 支持两种解决方案来处理入站消息绑定的错误:自定义错误通道和处理程序。

    错误通道

    Spring Cloud Stream 为每个入站绑定提供错误通道。 将一个 ErrorMessage 发送到错误通道。 有关详细信息,请参阅 Spring Cloud Stream 文档中的“处理错误 ”。

    • 默认错误通道

      可以使用名为errorChannel > 的全局错误通道来使用所有入站绑定错误消息。 若要处理这些消息,请配置以下消息终结点:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • 特定于绑定的错误通道

      可以使用特定的错误通道来使用优先级高于默认错误通道的特定入站绑定错误消息。 若要处理这些消息,请配置以下消息终结点:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      注意

      特定于绑定的错误通道与其他提供的错误处理程序和通道互斥。

    错误处理程序

    Spring Cloud Stream 公开一种机制,用于通过添加 Consumer 接受 ErrorMessage 实例的处理程序来提供自定义错误处理程序。 有关详细信息,请参阅 Spring Cloud Stream 文档中的错误处理

    注意

    配置任何绑定错误处理程序后,它可以处理默认错误通道。

    • 绑定默认错误处理程序

      配置单个 Consumer bean 以使用所有入站绑定错误消息。 以下默认函数订阅每个入站绑定错误通道:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      还需要将 spring.cloud.stream.default.error-handler-definition 属性设置为函数名称。

    • 特定于绑定的错误处理程序

      Consumer将 bean 配置为使用特定的入站绑定错误消息。 以下函数订阅特定的入站绑定错误通道,优先级高于绑定默认错误处理程序:

      @Bean
      public Consumer<ErrorMessage> myErrorHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      还需要将 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition 属性设置为函数名称。

事件中心消息标头

有关支持的基本消息标头,请参阅 Spring Integration 的 Spring Cloud Azure 支持的事件中心消息标头部分。

多个绑定器支持

使用多个绑定器也支持对多个事件中心命名空间连接。 此示例采用连接字符串作为示例。 还支持服务主体和托管标识的凭据。 可以在每个联编程序的环境设置中设置相关属性。

  1. 若要将多个绑定器用于事件中心,请在 application.yml 文件中配置以下属性

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${EVENTHUB_NAME_01}
              group: ${CONSUMER_GROUP_01}
            supply1-out-0:
              destination: ${THE_SAME_EVENTHUB_NAME_01_AS_ABOVE}
            consume2-in-0:
              binder: eventhub-2
              destination: ${EVENTHUB_NAME_02}
              group: ${CONSUMER_GROUP_02}
            supply2-out-0:
              binder: eventhub-2
              destination: ${THE_SAME_EVENTHUB_NAME_02_AS_ABOVE}
          binders:
            eventhub-1:
              type: eventhubs
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_01_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_01}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
            eventhub-2:
              type: eventhubs
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      eventhubs:
                        connection-string: ${EVENTHUB_NAMESPACE_02_CONNECTION_STRING}
                        processor:
                          checkpoint-store:
                            container-name: ${CHECKPOINT_CONTAINER_02}
                            account-name: ${CHECKPOINT_STORAGE_ACCOUNT}
                            account-key: ${CHECKPOINT_ACCESS_KEY}
          eventhubs:
            bindings:
              consume1-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
              consume2-in-0:
                consumer:
                  checkpoint:
                    mode: MANUAL
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    注意

    前面的应用程序文件演示如何为所有绑定配置应用程序的单个默认轮询器。 如果要为特定绑定配置轮询器,可以使用配置,例如 spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000

  2. 我们需要定义两个供应商和两个消费者:

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message1 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message2 '{}' successfully checkpointed", message))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    

资源预配

事件中心绑定器支持预配事件中心和使用者组,用户可以使用以下属性来启用预配。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      eventhubs:
        resource:
          resource-group: ${AZURE_EVENTHUBS_RESOURECE_GROUP}

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

示例

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库。

适用于 Azure 服务总线的 Spring Cloud Stream Binder

关键概念

适用于 Azure 服务总线的 Spring Cloud Stream Binder 为 Spring Cloud Stream Framework 提供绑定实现。 此实现在其基础上使用 Spring Integration 服务总线 通道适配器。

计划消息

此绑定器支持将消息提交到主题以供延迟处理。 用户可以发送具有标头 x-delay 的计划消息,以毫秒为单位表示消息的延迟时间。 消息将在毫秒后 x-delay 传递到相应的主题。

使用者组

服务总线主题提供与 Apache Kafka 类似的使用者组支持,但逻辑略有不同。 此绑定程序依赖于 Subscription 主题来充当使用者组。

依赖项设置

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId>
</dependency>

或者,也可以使用 Spring Cloud Azure Stream 服务总线 Starter,如以下 Maven 示例所示:

<dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-stream-servicebus</artifactId>
</dependency>

配置

绑定器提供以下两部分的配置选项:

连接配置属性

本部分包含用于连接到Azure 服务总线的配置选项。

注意

如果选择使用安全主体通过 Microsoft Entra ID 进行身份验证和授权来访问 Azure 资源,请参阅 “使用 Microsoft Entra ID 授权访问”,以确保安全主体已获得访问 Azure 资源的足够权限。

连接 spring-cloud-azure-stream-binder-servicebus 的可配置属性:

properties 类型​​ 描述
spring.cloud.azure.servicebus.enabled boolean 是否启用Azure 服务总线。
spring.cloud.azure.servicebus.connection-string 字符串 服务总线命名空间连接字符串值。
spring.cloud.azure.servicebus.namespace 字符串 服务总线命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成
spring.cloud.azure.servicebus.domain-name 字符串 Azure 服务总线命名空间值的域名。

注意

常见的 Azure 服务 SDK 配置选项也可以针对 Spring Cloud Azure Stream 服务总线 binder 进行配置。 支持的配置选项在 Spring Cloud Azure 配置引入,可以使用统一前缀spring.cloud.azure.或前缀进行spring.cloud.azure.servicebus.配置。

默认情况下,绑定器还支持 Spring Could Azure 资源管理器。 若要了解如何使用未被授予Data相关角色的安全主体检索连接字符串,请参阅 Spring Could Azure 资源管理器的基本使用部分。

Azure 服务总线绑定配置属性

以下选项分为四个部分:使用者属性、高级使用者配置、生成者属性和高级生成者配置。

使用者属性

这些属性通过 ServiceBusConsumerProperties.

spring-cloud-azure-stream-binder-servicebus 的使用者可配置属性:

properties 类型 默认 说明
spring.cloud.stream.servicebus.bindings.binding-name.consumer.requeue-rejected boolean false 如果失败的消息路由到 DLQ。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-calls Integer 1 服务总线处理器客户端应处理的最大并发消息数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-concurrent-sessions 整数 Null 要在任何给定时间处理的最大并发会话数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.session-enabled 布尔 null 是否启用会话。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.prefetch-count 整数 0 服务总线处理器客户端的预提取计数。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.sub-queue SubQueue 要连接到的子队列的类型。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.max-auto-lock-renew-duration 持续时间 5m 继续自动续订锁的时间量。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.receive-mode ServiceBusReceiveMode peek_lock 服务总线处理器客户端的接收模式。
spring.cloud.stream.servicebus.bindings.binding-name.consumer.auto-complete 布尔 true 是否自动解决消息。 如果设置为 false,则会添加消息标头 Checkpointer ,使开发人员能够手动解决消息。
高级使用者配置

上述 连接常见的 Azure SDK 客户端 配置支持对每个绑定器使用者进行自定义,可以使用前缀 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.进行配置。

生成者属性

这些属性通过 ServiceBusProducerProperties.

spring-cloud-azure-stream-binder-servicebus 的生成者可配置属性:

properties 类型 默认 说明
spring.cloud.stream.servicebus.bindings.binding-name.producer.sync boolean false 用于同步生成者的切换标志。
spring.cloud.stream.servicebus.bindings.binding-name.producer.send-timeout long 10000 发送生成者的超时值。
spring.cloud.stream.servicebus.bindings.binding-name.producer.entity-type ServiceBusEntityType Null 服务总线绑定生成者所需的生成者的实体类型。

重要

使用绑定生成者时,需要配置其属性 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.entity-type

高级生成者配置

上述 连接常见的 Azure SDK 客户端 配置支持对每个绑定器生成者进行自定义,可以使用前缀 spring.cloud.stream.servicebus.bindings.<binding-name>.producer.对其进行配置。

基本用法

发送和接收来自/到服务总线的消息

  1. 使用凭据信息填充配置选项。

    • 对于连接字符串凭据,请在 application.yml 文件中配置以下属性:

          spring:
            cloud:
              azure:
                servicebus:
                  connection-string: ${SERVICEBUS_NAMESPACE_CONNECTION_STRING}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      
    • 对于凭据即服务主体,请在 application.yml 文件中配置以下属性:

          spring:
            cloud:
              azure:
                credential:
                  client-id: ${AZURE_CLIENT_ID}
                  client-secret: ${AZURE_CLIENT_SECRET}
                profile:
                  tenant-id: <tenant>
                servicebus:
                  namespace: ${SERVICEBUS_NAMESPACE}
              function:
                definition: consume;supply
              stream:
                bindings:
                  consume-in-0:
                    destination: ${SERVICEBUS_ENTITY_NAME}
                    # If you use Service Bus Topic, add the following configuration
                    # group: ${SUBSCRIPTION_NAME}
                  supply-out-0:
                    destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
                servicebus:
                  bindings:
                    consume-in-0:
                      consumer:
                        auto-complete: false
                    supply-out-0:
                      producer:
                        entity-type: queue # set as "topic" if you use Service Bus Topic
      

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

  • 对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:

        spring:
          cloud:
            azure:
              credential:
                managed-identity-enabled: true
                client-id: ${MANAGED_IDENTITY_CLIENT_ID} # Only needed when using a user-assigned managed identity
              servicebus:
                namespace: ${SERVICEBUS_NAMESPACE}
            function:
              definition: consume;supply
            stream:
              bindings:
                consume-in-0:
                  destination: ${SERVICEBUS_ENTITY_NAME}
                  # If you use Service Bus Topic, add the following configuration
                  # group: ${SUBSCRIPTION_NAME}
                supply-out-0:
                  destination: ${SERVICEBUS_ENTITY_NAME_SAME_AS_ABOVE}
              servicebus:
                bindings:
                  consume-in-0:
                    consumer:
                      auto-complete: false
                  supply-out-0:
                    producer:
                      entity-type: queue # set as "topic" if you use Service Bus Topic
    
  1. 定义供应商和使用者。

    @Bean
    public Consumer<Message<String>> consume() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message received: '{}'", message.getPayload());
    
            checkpointer.success()
                    .doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(error -> LOGGER.error("Exception found", error))
                    .block();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply() {
        return () -> {
            LOGGER.info("Sending message, sequence " + i);
            return MessageBuilder.withPayload("Hello world, " + i++).build();
        };
    }
    

分区键支持

绑定器通过允许在消息标头中设置分区键和会话 ID,支持服务总线分区。 本部分介绍如何设置消息的分区键。

Spring Cloud Stream 提供分区键 SpEL 表达式属性 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression。 例如,将此属性设置为 "'partitionKey-' + headers[<message-header-key>]" 并添加一个名为 message-header-key 的标头。 Spring Cloud Stream 在计算表达式以分配分区键时使用此标头的值。 以下代码提供示例生成者:

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader("<message-header-key>", value.length() % 4)
            .build();
    };
}

会话支持

绑定器支持服务总线的消息会话。 可以通过消息标头设置消息的会话 ID。

@Bean
public Supplier<Message<String>> generate() {
    return () -> {
        String value = "random payload";
        return MessageBuilder.withPayload(value)
            .setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session ID")
            .build();
    };
}

注意

根据服务总线分区,会话 ID 的优先级高于分区键。 因此,当同时设置和ServiceBusMessageHeaders#PARTITION_KEY标头时ServiceBusMessageHeaders#SESSION_ID,会话 ID 的值最终用于覆盖分区键的值。

处理错误消息

  • 处理出站绑定错误消息

    默认情况下,Spring Integration 将创建名为 errorChannel 的全局错误通道。 配置以下消息终结点以处理出站绑定错误消息。

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handleError(ErrorMessage message) {
        LOGGER.error("Handling outbound binding error: " + message);
    }
    
  • 处理入站绑定错误消息

    Spring Cloud Stream 服务总线 Binder 支持三种处理入站消息绑定错误的解决方案:绑定器错误处理程序、自定义错误通道和处理程序。

    绑定器错误处理程序

    默认绑定程序错误处理程序处理入站绑定。 启用后 spring.cloud.stream.servicebus.bindings.<binding-name>.consumer.requeue-rejected ,使用此处理程序将失败的消息发送到死信队列。 否则,将放弃失败的消息。 除了配置特定于绑定的错误通道之外,绑定器错误处理程序始终生效,无论是否有其他自定义错误处理程序或通道。

    错误通道

    Spring Cloud Stream 为每个入站绑定提供错误通道。 将一个 ErrorMessage 发送到错误通道。 有关详细信息,请参阅 Spring Cloud Stream 文档中的“处理错误 ”。

    • 默认错误通道

      可以使用名为errorChannel > 的全局错误通道来使用所有入站绑定错误消息。 若要处理这些消息,请配置以下消息终结点:

      @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      
    • 特定于绑定的错误通道

      可以使用特定的错误通道来使用优先级高于默认错误通道的特定入站绑定错误消息。 若要处理这些消息,请配置以下消息终结点:

      // Replace destination with spring.cloud.stream.bindings.<input-binding-name>.destination
      // Replace group with spring.cloud.stream.bindings.<input-binding-name>.group
      @ServiceActivator(inputChannel = "{destination}.{group}.errors")
      public void handleError(ErrorMessage message) {
          LOGGER.error("Handling inbound binding error: " + message);
      }
      

      注意

      特定于绑定的错误通道与其他提供的错误处理程序和通道互斥。

    错误处理程序

    Spring Cloud Stream 公开一种机制,用于通过添加 Consumer 接受 ErrorMessage 实例的处理程序来提供自定义错误处理程序。 有关详细信息,请参阅 Spring Cloud Stream 文档中的错误处理

    注意

    配置任何绑定错误处理程序后,它可以处理默认错误通道和绑定器错误处理程序。

    • 绑定默认错误处理程序

      配置单个 Consumer bean 以使用所有入站绑定错误消息。 以下默认函数订阅每个入站绑定错误通道:

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      还需要将 spring.cloud.stream.default.error-handler-definition 属性设置为函数名称。

    • 特定于绑定的错误处理程序

      Consumer将 bean 配置为使用特定的入站绑定错误消息。 以下函数订阅优先级高于绑定默认错误处理程序的特定入站绑定错误通道。

      @Bean
      public Consumer<ErrorMessage> myDefaultHandler() {
          return message -> {
              // consume the error message
          };
      }
      

      还需要将 spring.cloud.stream.bindings.<input-binding-name>.error-handler-definition 属性设置为函数名称。

服务总线邮件头

有关支持的基本消息标头,请参阅 Spring Integration 的 Spring Cloud Azure 支持的 服务总线 消息标头部分。

注意

设置分区键时,消息标头的优先级高于 Spring Cloud Stream 属性。 因此 spring.cloud.stream.bindings.<binding-name>.producer.partition-key-expression ,仅在未 ServiceBusMessageHeaders#SESSION_ID 配置任何标头和 ServiceBusMessageHeaders#PARTITION_KEY 标头时才生效。

多个绑定器支持

使用多个绑定器也支持连接到多个服务总线命名空间。 此示例采用连接字符串作为示例。 还支持服务主体和托管标识的凭据,用户可以在每个绑定器的环境设置中设置相关属性。

  1. 若要使用 ServiceBus 的多个绑定器,请在 application.yml 文件中配置以下属性

    spring:
      cloud:
        function:
          definition: consume1;supply1;consume2;supply2
        stream:
          bindings:
            consume1-in-0:
              destination: ${SERVICEBUS_TOPIC_NAME}
              group: ${SUBSCRIPTION_NAME}
            supply1-out-0:
              destination: ${SERVICEBUS_TOPIC_NAME_SAME_AS_ABOVE}
            consume2-in-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME}
            supply2-out-0:
              binder: servicebus-2
              destination: ${SERVICEBUS_QUEUE_NAME_SAME_AS_ABOVE}
          binders:
            servicebus-1:
              type: servicebus
              default-candidate: true
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_01_CONNECTION_STRING}
            servicebus-2:
              type: servicebus
              default-candidate: false
              environment:
                spring:
                  cloud:
                    azure:
                      servicebus:
                        connection-string: ${SERVICEBUS_NAMESPACE_02_CONNECTION_STRING}
          servicebus:
            bindings:
              consume1-in-0:
                consumer:
                  auto-complete: false
              supply1-out-0:
                producer:
                  entity-type: topic
              consume2-in-0:
                consumer:
                  auto-complete: false
              supply2-out-0:
                producer:
                  entity-type: queue
          poller:
            initial-delay: 0
            fixed-delay: 1000
    

    注意

    前面的应用程序文件演示如何为所有绑定配置应用程序的单个默认轮询器。 如果要为特定绑定配置轮询器,可以使用配置,例如 spring.cloud.stream.bindings.<binding-name>.producer.poller.fixed-delay=3000

  2. 我们需要定义两个供应商和两个消费者

    @Bean
    public Supplier<Message<String>> supply1() {
        return () -> {
            LOGGER.info("Sending message1, sequence1 " + i);
            return MessageBuilder.withPayload("Hello world1, " + i++).build();
        };
    }
    
    @Bean
    public Supplier<Message<String>> supply2() {
        return () -> {
            LOGGER.info("Sending message2, sequence2 " + j);
            return MessageBuilder.withPayload("Hello world2, " + j++).build();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume1() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message1 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    }
    
    @Bean
    public Consumer<Message<String>> consume2() {
        return message -> {
            Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
            LOGGER.info("New message2 received: '{}'", message);
            checkpointer.success()
                    .doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
                    .doOnError(e -> LOGGER.error("Error found", e))
                    .block();
        };
    
    }
    

资源预配

服务总线绑定器支持预配队列、主题和订阅,用户可以使用以下属性来启用预配。

spring:
  cloud:
    azure:
      credential:
        tenant-id: <tenant>
      profile:
        subscription-id: ${AZURE_SUBSCRIPTION_ID}
      servicebus:
        resource:
          resource-group: ${AZURE_SERVICEBUS_RESOURECE_GROUP}
    stream:
      servicebus:
        bindings:
          <binding-name>:
            consumer:
              entity-type: ${SERVICEBUS_CONSUMER_ENTITY_TYPE}

注意

允许 tenant-id 的值包括: commonorganizationsconsumers或租户 ID。 有关这些值的详细信息,请参阅 错误AADSTS50020的错误终结点(个人和组织帐户) 部分 - 来自标识提供者的用户帐户不存在于租户中。 有关转换单租户应用的信息,请参阅 Microsoft Entra ID 上的“将单租户应用转换为多租户”。

示例

有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库。