本文介绍如何使用 Spring Cloud Azure 和 Spring Messaging Azure 事件中心。 Spring Framework 为与消息传送系统集成提供了广泛的支持。
Spring Messaging Azure 事件中心
关键概念
Azure 事件中心是云中的本机数据流式处理服务,每秒可以流式传输数百万个事件,延迟较低,从任何源流式传输到任何目标。 适用于 Azure 事件中心的 Spring Messaging 项目将核心 Spring 概念应用于基于事件中心的消息传送解决方案的开发。 它提供了一个 模板 作为发送消息的高级抽象。 它还支持消息驱动的普通旧 Java 对象(POJO),其中包含 @EventHubsListener 批注和 侦听器容器。 这些库可促进使用依赖项注入和声明性配置。 在所有这些情况下,可以在 Spring Framework 和 Spring AMQP 中的 RabbitMQ 支持中看到 JMS 支持相似之处。
依赖项设置
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-cloud-azure-starter</artifactId>
</dependency>
<dependency>
<groupId>com.azure.spring</groupId>
<artifactId>spring-messaging-azure-eventhubs</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
</dependency>
配置
该库为 EventHubsTemplate 和 @EventHubsListener提供以下配置选项:
| 财产 | 类型 | 说明 |
|---|---|---|
| spring.cloud.azure.message-converter.isolated-object-mapper | 布尔型 | 隔离的 ObjectMapper bean 是否用于事件中心消息转换器。 默认启用。 |
| spring.cloud.azure.eventhubs.enabled | 布尔型 | 是否启用 Azure 事件中心。 |
| spring.cloud.azure.eventhubs.connection-string | 字符串 | 事件中心命名空间连接字符串值。 |
| spring.cloud.azure.eventhubs.namespace | 字符串 | 事件中心命名空间值,它是 FQDN 的前缀。 FQDN 应由 NamespaceName.DomainName 组成 |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-name | 字符串 | 存储帐户的名称。 |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.account-key | 字符串 | 存储帐户访问密钥。 |
| spring.cloud.azure.eventhubs.processor.checkpoint-store.container-name | 字符串 | 存储容器名称。 |
基本用法
自定义事件中心消息转换器
可通过两种方式配置事件中心消息转换器:
将以下属性配置为让默认事件中心消息转换器使用
ObjectMapperbean,可以是自定义ObjectMapperbean 或由 Spring Boot 管理的 bean:spring: cloud: azure: message-converter: isolated-object-mapper: false直接定义事件中心消息转换器 bean:
@Bean AzureMessageConverter<EventData, EventData> eventHubsMessageConverter() { JsonMapper jsonMapper = JsonMapper.builder().addModule(new JavaTimeModule()).build(); return new EventHubsMessageConverter(jsonMapper); }
将消息发送到 Azure 事件中心
使用以下步骤发送消息:
使用以下方法之一填写凭据配置选项:
对于
DefaultAzureCredential凭据,请在 application.yml 文件中配置以下属性:spring: cloud: azure: eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT}对于凭据作为连接字符串,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: eventhubs: connection-string: ${AZURE_EVENT_HUBS_CONNECTION_STRING} processor: checkpoint-store: container-name: ${CHECKPOINT-CONTAINER} account-name: ${CHECKPOINT-STORAGE-ACCOUNT} account-key: ${CHECKPOINT-ACCESS-KEY}对于作为托管标识的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: managed-identity-enabled: true client-id: ${AZURE_CLIENT_ID} eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}对于作为服务主体的凭据,请在 application.yml 文件中配置以下属性:
spring: cloud: azure: credential: client-id: ${AZURE_CLIENT_ID} client-secret: ${AZURE_CLIENT_SECRET} profile: tenant-id: <tenant> eventhubs: namespace: ${AZURE_EVENT_HUBS_NAMESPACE} processor: checkpoint-store: container-name: ${CONTAINER_NAME} account-name: ${ACCOUNT_NAME}
注释
tenant-id允许以下值:common、organizations、consumers或租户 ID。 有关这些值的详细信息,请参阅错误 AADSTS50020 - 来自标识提供者的用户帐户不存在于租户中部分中的使用了错误的终结点(个人和组织帐户)部分。 有关转换单租户应用的信息,请参阅 在 Microsoft Entra ID上将单租户应用转换为多租户。
EventHubsTemplate已自动配置,你可以将其直接连接到自己的豆类,如以下示例所示:@Component public class MyBean { private final EventHubsTemplate eventHubsTemplate; public MyBean(EventHubsTemplate eventHubsTemplate) { this.eventHubsTemplate = eventHubsTemplate; } public void someMethod() { this.eventHubsTemplate.sendAsync('EVENT_HUB_NAME', MessageBuilder.withPayload("Hello world").build()).subscribe(); } }
从 Azure 事件中心接收消息
使用以下步骤接收消息:
填写凭据配置选项。
添加
@EnableAzureMessaging批注,如以下示例所示。 此批注触发使用@EventHubsListener注释的方法的发现,从而在掩护下创建消息侦听器容器。@SpringBootApplication @EnableAzureMessaging public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }注释
为了避免重复,自版本
5.21.0起,Spring Cloud Azure 自动配置已启用批注@EnableAzureMessaging。当事件中心基础结构存在时,可以使用
@EventHubsListener批注任何 bean 来创建侦听器终结点。 以下组件在EVENT_HUB_NAME事件中心和$Default使用者组上创建侦听器终结点:@Component public class MyBean { @EventHubsListener(destination = "EVENT_HUB_NAME", group = "$Default") public void processMessage(String content) { // ... } }
示例
有关详细信息,请参阅 GitHub 上的 azure-spring-boot-samples 存储库