具有 Azure 事件中心 的 Spring Cloud Stream

本教程演示如何在 Spring Boot 应用程序中使用 Azure 事件中心 和 Spring Cloud Stream Binder Eventhubs 发送和接收消息。

先决条件

  • Azure 订阅 - 免费创建订阅

  • Java 开发工具包 (JDK) 版本 8 或更高版本。

  • Apache Maven 版本 3.2 或更高版本。

  • 用来测试功能的 cURL 或类似的 HTTP 实用工具。

  • Azure 事件中心。 如果没有事件中心,请使用Azure 门户创建事件中心。

  • 事件中心的Azure 存储帐户检查点。 如果你还没有存储帐户,请创建一个存储帐户

  • Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择 Maven 项目,并在“依赖项添加 Spring WebAzure 支持依赖项,然后选择 Java 版本 8 或更高版本。

注意

若要授予帐户对资源的访问权限,请在Azure 事件中心中将帐户和Azure Event Hubs Data Sender角色分配给Azure Event Hubs Data Receiver当前正在使用的 Microsoft Entra 帐户。 然后,在Azure 存储帐户中,将Storage Blob Data Contributor角色分配给当前正在使用的 Microsoft Entra 帐户。 有关授予访问权限角色的详细信息,请参阅使用 microsoft Entra ID 使用 Azure 门户 分配 Azure 角色并授权访问事件中心资源。

重要

完成本教程中的步骤需要 Spring Boot 2.5 或更高版本。

发送和接收来自Azure 事件中心的消息

使用 Azure 存储 帐户和 Azure 事件中心,可以使用 Spring Cloud Azure Stream Binder 事件中心发送和接收消息。

若要安装 Spring Cloud Azure Stream Binder 事件中心模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.azure.spring</groupId>
          <artifactId>spring-cloud-azure-dependencies</artifactId>
          <version>5.11.0</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.17.0。 此材料清单(BOM)应在pom.xml文件的部分中进行配置<dependencyManagement>。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅 我应使用哪个版本的 Spring Cloud Azure。

  • Spring Cloud Azure Stream Binder 事件中心项目:

    <dependency>
       <groupId>com.azure.spring</groupId>
       <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

编写应用程序代码

使用以下步骤将应用程序配置为使用Azure 事件中心生成和使用消息。

  1. 通过将以下属性添加到 application.properties 文件来配置事件中心凭据。

     spring.cloud.azure.eventhubs.namespace=${AZURE_EVENTHUBS_NAMESPACE}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=${AZURE_STORAGE_ACCOUNT_NAME}
     spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=${AZURE_STORAGE_CONTAINER_NAME}
     spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.bindings.consume-in-0.group=${AZURE_EVENTHUB_CONSUMER_GROUP}
     spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_EVENTHUB_NAME}
     spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
     spring.cloud.function.definition=consume;supply;
     spring.cloud.stream.poller.initial-delay=0
     spring.cloud.stream.poller.fixed-delay=1000
    

    下表描述了配置中的字段:

    字段 说明
    spring.cloud.azure.eventhubs.namespace 指定在事件中心从Azure 门户获取的命名空间。
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name 指定在本教程中创建的存储帐户。
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name 指定存储帐户的容器。
    spring.cloud.stream.bindings.consume-in-0.destination 指定本教程中使用的事件中心。
    spring.cloud.stream.bindings.consume-in-0.group 指定事件中心实例中的使用者组。
    spring.cloud.stream.bindings.supply-out-0.destination 指定本教程中使用的相同事件中心。
    spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode 指定 MANUAL
    spring.cloud.function.definition 指定要将哪个功能 bean 绑定到由绑定公开的外部目标。
    spring.cloud.stream.poller.initial-delay 指定定期触发器的初始延迟。 默认值为 0
    spring.cloud.stream.poller.fixed-delay 为默认轮询程序指定固定延迟(以毫秒为单位)。 默认值为 1000 L
  2. 编辑启动类文件以显示以下内容。

    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.eventhubs.support.EventHubsHeaders;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Sinks;
    import java.util.function.Consumer;
    import java.util.function.Supplier;
    import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER;
    
    @SpringBootApplication
    public class EventHubBinderApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubBinderApplication.class);
        private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer();
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubBinderApplication.class, args);
        }
    
        @Bean
        public Supplier<Flux<Message<String>>> supply() {
            return ()->many.asFlux()
                           .doOnNext(m->LOGGER.info("Manually sending message {}", m))
                           .doOnError(t->LOGGER.error("Error encountered", t));
        }
    
        @Bean
        public Consumer<Message<String>> consume() {
            return message->{
                Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER);
                LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued "
                        +"time: {}",
                    message.getPayload(),
                    message.getHeaders().get(EventHubsHeaders.PARTITION_KEY),
                    message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER),
                    message.getHeaders().get(EventHubsHeaders.OFFSET),
                    message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)
                );
                checkpointer.success()
                            .doOnSuccess(success->LOGGER.info("Message '{}' successfully checkpointed",
                                message.getPayload()))
                            .doOnError(error->LOGGER.error("Exception found", error))
                            .block();
            };
        }
    
        @Override
        public void run(String... args) {
            LOGGER.info("Going to add message {} to sendMessage.", "Hello World");
            many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST);
        }
    
    }
    

    提示

    在本教程中,配置或代码中没有身份验证操作。 但是,连接到 Azure 服务需要身份验证。 要完成身份验证,需要使用 Azure 标识。 Spring Cloud Azure 使用 DefaultAzureCredentialAzure 标识库提供的帮助获取凭据,而无需进行任何代码更改。

    DefaultAzureCredential 支持多种身份验证方法,并确定应在运行时使用哪种方法。 此方法使应用能够在不同环境(如本地环境和生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 DefaultAzureCredential

    若要在本地开发环境中完成身份验证,可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 有关详细信息,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用用户分配的托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?

  3. 启动应用程序。 此类消息将发布到应用程序日志中,如以下示例输出中所示:

    New message received: 'Hello World', partition key: 107207233, sequence number: 458, offset: 94256, enqueued time: 2023-02-17T08:27:59.641Z
    Message 'Hello World!' successfully checkpointed
    

部署到 Azure Spring Apps

现在,你已在本地运行 Spring Boot 应用程序,现在可以将其移动到生产环境。 使用 Azure Spring Apps 可以轻松地将 Spring Boot 应用程序部署到 Azure,而无需进行任何代码更改。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅将 第一个应用程序部署到 Azure Spring Apps

后续步骤