在 Spring 应用程序中使用事件中心

本文介绍如何在使用 Spring 框架生成的 Java 应用程序中使用 Azure 事件中心。

Azure 事件中心是大数据流式处理平台和事件引入服务。 它可以每秒接收和处理数百万个事件。 可以使用任何实时分析提供程序或批处理/存储适配器转换和存储发送到事件中心的数据。

Spring Cloud Azure 提供了各种模块,用于使用 Spring 框架向事件中心发送消息以及从事件中心接收消息。

可以单独使用以下模块,也可以将它们组合用于不同的用例:

先决条件

  • Azure 事件中心实例。 有关详细信息,请参阅快速入门:使用 Azure 门户创建事件中心

  • 用于事件中心检查点的 Azure 存储帐户。 有关详细信息,请参阅创建存储帐户

  • Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请记住选择 Maven 项目,并在依赖项下添加 Spring Web 依赖项,然后选择 Java 版本 8 或更高版本。

注意

若要授予帐户对 Azure 事件中心中资源的访问权限,请将 Azure Event Hubs Data ReceiverAzure Event Hubs Data Sender 角色分配给当前使用的 Microsoft Entra 帐户。 然后,在 Azure 存储帐户中,将 Storage Blob Data Contributor 角色分配给当前正在使用的 Microsoft Entra 帐户。 有关授予访问权限角色的详细信息,请参阅使用 Azure 门户分配 Azure 角色使用 Microsoft Entra ID 授权访问事件中心资源

重要

要完成本教程中的步骤,需要 Spring Boot 版本 2.5 或更高版本。

准备本地环境

在本教程中,配置和代码没有任何身份验证操作。 但连接到 Azure 服务需要进行身份验证。 要完成身份验证,需要使用 Azure 标识客户端库。 Spring Cloud Azure 使用 Azure 标识库提供的 DefaultAzureCredential 来帮助获取凭据,而无需更改任何代码。

DefaultAzureCredential 支持多种身份验证方法,并确定应在运行时使用哪种方法。 通过这种方法,你的应用可在不同环境(例如本地或生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅对 Azure 托管的 Java 应用程序进行身份验证DefaultAzureCredential 部分。

若要使用 Azure CLI、IntelliJ 或其他方法在本地开发环境中完成身份验证,请参阅 Java 开发环境中的 Azure 身份验证。 若要在 Azure 托管环境中完成身份验证,建议使用托管标识。 有关详细信息,请参阅什么是 Azure 资源的托管标识?

使用 Spring Cloud Azure Event Hubs Starter

Spring Cloud Azure Event Hubs Starter 模块使用 Spring Boot 框架导入事件中心 Java 客户端库。 可以在非互斥模式中使用 Spring Cloud Azure 和 Azure SDK。 因此,可以在 Spring 应用程序中继续使用事件中心 Java 客户端 API。

添加依赖项

要安装 Spring Cloud Azure Event Hubs Starter 模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 物料清单 (BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0。 此物料清单 (BOM) 应在 <dependencyManagement> pom.xml 文件的 部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure

  • Spring Cloud Azure 事件中心项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-eventhubs</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

本指南介绍如何在 Spring 应用程序的上下文中使用事件中心 Java 客户端。 下面介绍以下两个选项:

  • 使用 Spring Boot 自动配置,并从 Spring 上下文中使用现成的客户端(推荐)。
  • 以编程方式生成客户端。

从 Spring IoC 容器自动连接客户端 bean 的方式有以下优点,这可以在使用事件中心客户端开发时为你提供更灵活、更高效的体验:

  • 它应用外部化配置,这样你就可以在不同的环境中使用相同的应用程序代码。
  • 可以将学习生成器模式并将此客户端注册到应用程序上下文的过程委托给 Spring Boot 框架。 通过此委托,你可以专注于如何根据自己的业务需求使用客户。
  • 可以使用运行状况指示器以一种简单的方式来检查应用程序和内部组件的状态和运行状况。

以下部分提供了代码示例,演示如何使用 EventProcessorClientEventHubProducerClient 结合使用这两种替代方法。

注意

适用于 Azure 事件中心的 Azure Java SDK 提供了多个客户端与事件中心交互。 入门程序还为所有 Event Hubs 客户端以及客户端构建器提供自动配置。 本文仅使用 EventProcessorClientEventHubProducerClient 作为示例。

使用 Spring Boot 自动配置

若要向事件中心发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 使用以下属性设置配置事件中心命名空间和事件中心名称:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.event-hub-name=<your-event-hub-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    spring.cloud.azure.eventhubs.processor.consumer-group=$Default
    
  2. 创建一个新 EventHubProcessorClientConfiguration Java 类,如以下示例所示。 此类用于注册 EventProcessorClient 的消息和错误处理程序。

    import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
    import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsRecordMessageListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class EventHubProcessorClientConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubProcessorClientConfiguration.class);
    
        @Bean
        EventHubsRecordMessageListener processEvent() {
            return eventContext->LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
                eventContext.getEventData().getBodyAsString());
        }
    
        @Bean
        EventHubsErrorHandler processError() {
            return errorContext->LOGGER.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        }
    
    }
    
  3. 在 Spring 应用程序中注入 EventProcessorClientEventHubProducerClient,并调用相关 API 以发送和接收消息,如以下示例所示:

    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class EventHubClientApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
        private final EventHubProducerClient eventHubProducerClient;
        private final EventProcessorClient eventProcessorClient;
    
        public EventHubClientApplication(EventHubProducerClient eventHubProducerClient,
                                         EventProcessorClient eventProcessorClient) {
            this.eventHubProducerClient = eventHubProducerClient;
            this.eventProcessorClient = eventProcessorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            eventProcessorClient.start();
            // Wait for the processor client to be ready
            TimeUnit.SECONDS.sleep(10);
    
            eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
            LOGGER.info("Successfully sent a message to Event Hubs.");
            eventHubProducerClient.close();
            LOGGER.info("Skip stopping and closing the processor since the processor may not complete the receiving process yet.");
        }
    
    }
    
  4. 启动应用程序。 你将看到与以下示例类似的日志:

    Successfully sent a message to Event Hubs.
    ...
    Processing event from partition 0 with sequence number 0 with body: Hello World
    ...
    Stopping and closing the processor.
    

以编程方式生成客户端

你可以自行生成客户端 bean,但该过程很复杂。 在 Spring Boot 应用程序中,必须管理属性、了解生成器模式,并将客户端注册到 Spring 应用程序上下文。 以下步骤演示如何做到这一点:

  1. 创建一个新 EventHubClientConfiguration Java 类,如以下示例所示。 此类用于声明 EventProcessorClientEventHubProducerClient bean。 请务必用实际值替换 <your event-hubs-namespace><your-event-hub-name><your-storage-account-name><your-storage-account-container-name> 占位符。

    import com.azure.identity.DefaultAzureCredentialBuilder;
    import com.azure.messaging.eventhubs.EventHubClientBuilder;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import com.azure.messaging.eventhubs.EventProcessorClientBuilder;
    import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
    import com.azure.messaging.eventhubs.models.ErrorContext;
    import com.azure.messaging.eventhubs.models.EventContext;
    import com.azure.storage.blob.BlobContainerAsyncClient;
    import com.azure.storage.blob.BlobContainerClientBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class EventHubClientConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientConfiguration.class);
        private static final String EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "<your event-hubs-namespace>.servicebus.windows.net";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private static final String STORAGE_ACCOUNT_ENDPOINT = "https://<your-storage-account-name>.blob.core.windows.net";
        private static final String STORAGE_CONTAINER_NAME = "<your-storage-account-container-name>";
    
        @Bean
        EventHubClientBuilder eventHubClientBuilder() {
            return new EventHubClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME,
                new DefaultAzureCredentialBuilder()
                    .build());
        }
    
        @Bean
        BlobContainerClientBuilder blobContainerClientBuilder() {
            return new BlobContainerClientBuilder().credential(new DefaultAzureCredentialBuilder()
                                                       .build())
                                                   .endpoint(STORAGE_ACCOUNT_ENDPOINT)
                                                   .containerName(STORAGE_CONTAINER_NAME);
        }
    
        @Bean
        BlobContainerAsyncClient blobContainerAsyncClient(BlobContainerClientBuilder blobContainerClientBuilder) {
            return blobContainerClientBuilder.buildAsyncClient();
        }
    
        @Bean
        EventProcessorClientBuilder eventProcessorClientBuilder(BlobContainerAsyncClient blobContainerAsyncClient) {
            return new EventProcessorClientBuilder().credential(EVENT_HUB_FULLY_QUALIFIED_NAMESPACE, EVENT_HUB_NAME,
                                                        new DefaultAzureCredentialBuilder()
                                                            .build())
                                                    .consumerGroup(CONSUMER_GROUP)
                                                    .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient))
                                                    .processEvent(EventHubClientConfiguration::processEvent)
                                                    .processError(EventHubClientConfiguration::processError);
        }
    
        @Bean
        EventHubProducerClient eventHubProducerClient(EventHubClientBuilder eventHubClientBuilder) {
            return eventHubClientBuilder.buildProducerClient();
    
        }
    
        @Bean
        EventProcessorClient eventProcessorClient(EventProcessorClientBuilder eventProcessorClientBuilder) {
            return eventProcessorClientBuilder.buildEventProcessorClient();
        }
    
        public static void processEvent(EventContext eventContext) {
            LOGGER.info("Processing event from partition {} with sequence number {} with body: {}",
                eventContext.getPartitionContext().getPartitionId(), eventContext.getEventData().getSequenceNumber(),
                eventContext.getEventData().getBodyAsString());
        }
    
        public static void processError(ErrorContext errorContext) {
            LOGGER.info("Error occurred in partition processor for partition {}, {}",
                errorContext.getPartitionContext().getPartitionId(),
                errorContext.getThrowable());
        }
    
    }
    
  2. 在 Spring 应用程序中注入 EventProcessorClientEventHubProducerClient,如以下示例所示:

    import com.azure.messaging.eventhubs.EventData;
    import com.azure.messaging.eventhubs.EventHubProducerClient;
    import com.azure.messaging.eventhubs.EventProcessorClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import java.util.Collections;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootApplication
    public class EventHubClientApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubClientApplication.class);
        private final EventHubProducerClient eventHubProducerClient;
        private final EventProcessorClient eventProcessorClient;
    
        public EventHubClientApplication(EventHubProducerClient eventHubProducerClient,
                                         EventProcessorClient eventProcessorClient) {
            this.eventHubProducerClient = eventHubProducerClient;
            this.eventProcessorClient = eventProcessorClient;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) throws Exception {
            eventProcessorClient.start();
            // Wait for the processor client to be ready
            TimeUnit.SECONDS.sleep(10);
    
            eventHubProducerClient.send(Collections.singletonList(new EventData("Hello World")));
            LOGGER.info("Successfully sent a message to Event Hubs.");
            eventHubProducerClient.close();
            LOGGER.info("Stopping and closing the processor");
            eventProcessorClient.stop();
        }
    
    }
    
  3. 启动应用程序。 你将看到与以下示例类似的日志:

    Successfully sent a message to Event Hubs.
    ...
    Processing event from partition 0 with sequence number 0 with body: Hello World
    ...
    Stopping and closing the processor.
    

以下列表显示了此代码不够灵活或优雅的原因:

  • 事件中心命名空间和事件中心名称是硬编码的。
  • 如果使用 @Value 从 Spring 环境获取配置,则 application.properties 文件中不能有 IDE 提示。
  • 如果有微服务方案,则必须复制每个项目中的代码,这很容易出错,也很难保持一致。

幸运的是,使用 Spring Cloud Azure 不需要自行构建客户端 bean。 相反,可以直接注入它们,并使用已熟悉的配置属性来配置存储队列。 有关详细信息,请参阅 Spring Cloud Azure 配置

Spring Cloud Azure 还为不同方案提供以下全局配置。 有关详细信息,请参阅 Spring Cloud Azure 配置Azure 服务 SDK 的全局配置部分。

  • 代理选项。
  • 重试选项。
  • AMQP 传输客户端选项。

还可以连接到不同的 Azure 云。 有关详细信息,请参阅连接到不同的 Azure 云

使用 Spring Messaging Azure 事件中心

Spring Messaging Azure 事件中心模块通过事件中心为 Spring Messaging 框架提供支持。

如果使用 Spring Messaging Azure 事件中心,则可以使用以下功能:

  • EventHubsTemplate:以异步和同步方式将消息发送到事件中心。
  • @EventHubsListener:将方法标记为目的地上的事件中心消息监听器的目标。

本指南介绍如何使用 Spring Messaging Azure 事件中心向事件中心发送消息以及从事件中心接收消息。

添加依赖项

要安装 Spring Messaging Azure 事件中心模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 物料清单 (BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0。 此物料清单 (BOM) 应在 <dependencyManagement> pom.xml 文件的 部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure

  • Spring Cloud Azure Starter、Spring Messaging 事件中心和 Azure 事件中心检查点存储项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-messaging-azure-eventhubs</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure</groupId>
      <artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

若要向事件中心发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 使用以下属性设置配置事件中心命名空间和存储 Blob:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. 创建一个新 ConsumerService Java 类,如以下示例所示。 此类用于定义消息接收方。 请务必用实际值替代 <your-event-hub-name> 占位符。

    import com.azure.spring.messaging.eventhubs.implementation.core.annotation.EventHubsListener;
    import org.springframework.stereotype.Service;
    
    @Service
    public class ConsumerService {
    
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$DEFAULT";
    
        @EventHubsListener(destination = EVENT_HUB_NAME, group = CONSUMER_GROUP)
        public void handleMessageFromEventHub(String message) {
            System.out.printf("New message received: %s%n", message);
        }
    
    }
    
  3. 将发送方和接收方连接起来,使用 Spring 发送和接收消息,如以下示例所示。 请务必用实际值替代 <your-event-hub-name> 占位符。

    import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
    import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.messaging.support.MessageBuilder;
    
    @SpringBootApplication
    @EnableAzureMessaging
    public class EventHubMessagingApplication {
    
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubMessagingApplication.class);
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubMessagingApplication.class);
            EventHubsTemplate eventHubsTemplate = applicationContext.getBean(EventHubsTemplate.class);
            LOGGER.info("Sending a message to the Event Hubs.");
            eventHubsTemplate.sendAsync(EVENT_HUB_NAME, MessageBuilder.withPayload("Hello world").build()).subscribe();
        }
    
    }
    

    提示

    请记住添加 @EnableAzureMessaging 注释,这会触发发现用 @EventHubsListener 注释的方法,从而在幕后创建消息侦听器容器。

  4. 启动应用程序。 你将看到与以下示例类似的日志:

    Sending a message to the Event Hubs.
    New message received: Hello world
    

使用 Spring 集成 Azure 事件中心

Spring 集成 Azure 事件中心模块通过事件中心为 Spring 集成框架提供支持。

如果 Spring 应用程序使用 Spring 集成消息通道,则可以使用通道适配器在消息通道和事件中心之间路由消息。

入站通道适配器将消息从事件中心转发到消息通道。 出站通道适配器将消息从消息通道发布到事件中心。

本指南介绍如何使用 Spring 集成 Azure 事件中心向事件中心发送消息以及从事件中心接收消息。

添加依赖项

要安装 Spring Cloud Azure Event Hubs Integration Starter 模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 物料清单 (BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0。 此物料清单 (BOM) 应在 <dependencyManagement> pom.xml 文件的 部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure

  • Spring Cloud Azure Event Hubs Integration 项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-integration-eventhubs</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

若要向事件中心发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 使用以下属性设置配置事件中心命名空间和存储 Blob:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. 创建一个新 MessageReceiveConfiguration Java 类,如以下示例所示。 此类用于定义消息接收方。 请务必用实际值替代 <your-event-hub-name> 占位符。

    import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
    import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
    import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
    import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
    import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
    import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.messaging.MessageChannel;
    
    @Configuration
    public class MessageReceiveConfiguration {
    
        private static final String INPUT_CHANNEL = "input";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageReceiveConfiguration.class);
    
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload) {
            String message = new String(payload);
            LOGGER.info("New message received: {}", message);
        }
    
        @Bean
        public EventHubsMessageListenerContainer messageListenerContainer(EventHubsProcessorFactory processorFactory) {
            EventHubsContainerProperties containerProperties = new EventHubsContainerProperties();
            containerProperties.setEventHubName(EVENT_HUB_NAME);
            containerProperties.setConsumerGroup(CONSUMER_GROUP);
            containerProperties.setCheckpointConfig(new CheckpointConfig(CheckpointMode.MANUAL));
            return new EventHubsMessageListenerContainer(processorFactory, containerProperties);
        }
    
        @Bean
        public EventHubsInboundChannelAdapter messageChannelAdapter(@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
                                                                    EventHubsMessageListenerContainer listenerContainer) {
            EventHubsInboundChannelAdapter adapter = new EventHubsInboundChannelAdapter(listenerContainer);
            adapter.setOutputChannel(inputChannel);
            return adapter;
        }
    
        @Bean
        public MessageChannel input() {
            return new DirectChannel();
        }
    
    }
    
  3. 创建一个新 MessageSendConfiguration Java 类,如以下示例所示。 此类用于定义消息发送方。 请务必用实际值替代 <your-event-hub-name> 占位符。

    import com.azure.spring.integration.core.handler.DefaultMessageHandler;
    import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @Configuration
    public class MessageSendConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(MessageSendConfiguration.class);
        private static final String OUTPUT_CHANNEL = "output";
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(EventHubsTemplate eventHubsTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(EVENT_HUB_NAME, eventHubsTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("There was an error sending the message.", ex);
                }
            });
    
            return handler;
        }
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface EventHubOutboundGateway {
            void send(String text);
        }
    
    }
    
  4. 将发送方和接收方连接起来,使用 Spring 发送和接收消息,如以下示例所示:

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.config.EnableIntegration;
    
    @SpringBootApplication
    @EnableIntegration
    @Configuration(proxyBeanMethods = false)
    public class EventHubIntegrationApplication {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(EventHubIntegrationApplication.class, args);
            MessageSendConfiguration.EventHubOutboundGateway outboundGateway = applicationContext.getBean(MessageSendConfiguration.EventHubOutboundGateway.class);
            outboundGateway.send("Hello World");
        }
    }
    

    提示

    请记住添加 @EnableIntegration 批注,这将启用 Spring 集成基础结构。

  5. 启动应用程序。 你将看到与以下示例类似的日志:

    Message was sent successfully.
    New message received: Hello World
    

使用 Spring Cloud Azure Stream Event Hubs Binder

若要在 Spring Cloud Stream 应用程序中调用事件中心 API,请使用 Spring Cloud Azure Event Hubs Stream Binder 模块。

本指南介绍如何使用 Spring Cloud Stream Event Hubs Binder 向事件中心发送消息并从事件中心接收消息。

添加依赖项

要安装 Spring Cloud Azure Event Hubs Stream Binder 模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 物料清单 (BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0。 此物料清单 (BOM) 应在 <dependencyManagement> pom.xml 文件的 部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure

  • Spring Cloud Azure Event Hubs Stream Binder 项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

若要向事件中心发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 使用以下属性设置配置事件中心命名空间和存储 Blob:

    spring.cloud.azure.eventhubs.namespace=<your event-hubs-namespace>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name=<your-storage-account-name>
    spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name=<your-storage-account-container-name>
    
  2. 创建消息接收方。

    若要将应用程序用作事件接收器,请通过完成以下任务来配置输入绑定器:

    • 声明一个定义消息处理逻辑的 Consumer bean。 例如,以下 Consumer bean 被命名为 consume

      @Bean
      public Consumer<Message<String>> consume() {
           return message -> {
               System.out.printf("New message received: %s%n", message.getPayload());
           };
      }
      
    • 添加以下配置以指定要使用的 Event Hub 名称。 请务必用实际值替代 <your-event-hub-name> 占位符。

      # name for the above `Consumer` bean
      spring.cloud.stream.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name>
      spring.cloud.stream.bindings.consume-in-0.group=$Default
      spring.cloud.stream.eventhubs.bindings.consume-in-0.consumer.checkpoint.mode=MANUAL
      
  3. 创建消息发送方。

    若要将应用程序用作事件源,请通过完成以下任务来配置输出绑定器:

    • 定义一个 Supplier bean,用于定义消息在应用程序中的来源,如以下示例所示:

      @Bean
      public Supplier<Message<String>> supply() {
          return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • 添加以下配置以指定要发送的 Event Hub 名称。 请务必用实际值替代 <your-event-hub-name> 占位符。

      # "consume" is added from the above step
      spring.cloud.stream.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
      
  4. 启动应用程序。 你将看到与以下示例类似的日志:

    Sending a message.
    New message received: Hello world.
    

将 Spring Kafka 与 Azure 事件中心结合使用

事件中心提供了一个 Kafka 终结点,你现有的基于 Kafka 的应用程序可以使用它。 此方法提供了一个替代方案,无需运行自己的 Kafka 群集。 事件中心可与许多现有 Kafka 应用程序配合使用。 有关详细信息,请参阅适用于 Apache Kafka 的事件中心

本指南介绍如何使用事件中心和 Spring Kafka 向事件中心发送消息以及从事件中心接收消息。

添加依赖项

要安装 Spring Cloud Azure Starter 和 Spring Kafka 模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 物料清单 (BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0。 此物料清单 (BOM) 应在 <dependencyManagement> pom.xml 文件的 部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure

  • Spring Cloud Azure 启动器和 Spring Kafka 组件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

若要向事件中心发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 使用以下属性设置配置事件中心命名空间:

    spring.kafka.bootstrap-servers=<your event-hubs-namespace>.servicebus.windows.net:9093
    
  2. 使用 KafkaTemplate 发送消息,使用 @KafkaListener 接收消息,如下例所示。 请务必用实际值替代 <your-event-hub-name> 占位符。

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    
    @SpringBootApplication
    public class EventHubKafkaApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(EventHubKafkaApplication.class);
        private static final String EVENT_HUB_NAME = "<your-event-hub-name>";
        private static final String CONSUMER_GROUP = "$Default";
        private final KafkaTemplate<String, String> kafkaTemplate;
    
        public EventHubKafkaApplication(KafkaTemplate<String, String> kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
    
        public static void main(String[] args) {
            SpringApplication.run(EventHubKafkaApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
            kafkaTemplate.send(EVENT_HUB_NAME, "Hello World");
            LOGGER.info("Message was sent successfully.");
        }
    
        @KafkaListener(topics = EVENT_HUB_NAME, groupId = CONSUMER_GROUP)
        public void receive(String message) {
            LOGGER.info("New message received: {}", message);
        }
    
    }
    
  3. 启动应用程序。 你将看到与以下示例类似的日志:

    Message was sent successfully.
    New message received: Hello world
    

将 Spring Cloud Stream Kafka Binder 与 Azure 事件中心配合使用

Spring Cloud Stream 是一个框架,使应用程序开发人员能够编写消息驱动的微服务。 消息传递系统和 Spring Cloud Stream 之间的桥梁是通过绑定器抽象实现的。 几种消息传递系统都有绑定器,但最常用的绑定器之一是 Apache Kafka。

本指南介绍如何使用 Azure 事件中心和 Spring Cloud Stream Kafka Binder 向事件中心发送消息以及从事件中心接收消息。

添加依赖项

要安装 Spring Cloud Azure Starter 和 Spring Cloud Stream binder Kafka 模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 物料清单 (BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>5.22.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0。 此物料清单 (BOM) 应在 <dependencyManagement> pom.xml 文件的 部分进行配置。 这可确保所有 Spring Cloud Azure 依赖项都使用相同的版本。 有关用于此 BOM 的版本的详细信息,请参阅应使用哪个版本的 Spring Cloud Azure

  • Spring Cloud Azure starter 项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

若要向事件中心发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 使用以下属性设置配置 Kafka 代理:

    spring.cloud.stream.kafka.binder.brokers=<your event-hubs-namespace>.servicebus.windows.net:9093
    
  2. 创建消息接收方。

    若要将应用程序用作事件接收器,请通过完成以下任务来配置输入绑定器:

    • 声明一个定义消息处理逻辑的 Consumer bean。 例如,以下 Consumer bean 被命名为 consume

      @Bean
      public Consumer<Message<String>> consume() {
          return message -> {
              System.out.printf("New message received: %s%n", message.getPayload());
          };
      }
      
    • 添加以下配置以指定要使用的 Event Hub 名称。 请务必用实际值替代 <your-event-hub-name> 占位符。

      # name for the above `Consumer` bean
      spring.cloud.stream.function.definition=consume
      spring.cloud.stream.bindings.consume-in-0.destination=<your-event-hub-name>
      spring.cloud.stream.bindings.consume-in-0.group=$Default
      
  3. 创建消息发送方。

    若要将应用程序用作事件源,请通过完成以下任务来配置输出绑定器:

    • 定义一个 Supplier bean,用于定义消息在应用程序中的来源,如以下示例所示:

      @Bean
      public Supplier<Message<String>> supply() {
          return () -> {
              System.out.println("Sending a message.");
              return MessageBuilder.withPayload("Hello world").build();
          };
      }
      
    • 添加以下配置以指定要发送的 Event Hub 名称。 请务必用实际值替代 <your-event-hub-name> 占位符。

      # "consume" is added from the above step
      spring.cloud.stream.function.definition=consume;supply
      spring.cloud.stream.bindings.supply-out-0.destination=<your-event-hub-name>
      
  4. 启动应用程序。 你将看到与以下示例类似的日志:

    Sending a message.
    New message received: Hello world.
    

部署到 Azure Spring Apps

现在,你已在本地运行 Spring Boot 应用程序,是时候将其转移到生产环境了。 借助 Azure Spring Apps,可以轻松地将 Spring Boot 应用程序部署到 Azure,不需更改任何代码。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps 可以通过以下方法提供生命周期管理:综合性监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等。 若要将应用程序部署到 Azure Spring Apps,请参阅在 Azure Spring Apps 中部署你的第一个应用程序

后续步骤

另请参阅

有关可用于 Microsoft Azure 的其他 Spring Boot Starters 的更多信息,请参阅什么是 Spring Cloud Azure?