本文演示如何使用 Spring Cloud Stream Binder 从 服务总线 queues 和 topics 发送和接收消息。
Azure提供一个名为 Azure 服务总线(“服务总线”)的异步消息传送平台,该平台基于 Advanced Message Queueing Protocol 1.0 (“AMQP 1.0”) 标准。 服务总线可以用于所有受支持的Azure平台。
先决条件
Azure订阅 - 免费创建一个订阅。
Java开发工具包(JDK)版本 8 或更高版本。
Apache Maven 版本 3.2 或更高版本。
cURL 或类似的 HTTP 实用工具来测试功能。
Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择 Maven Project, 在 Dependencies 下,添加 Spring Web 和 Azure Support 依赖项,然后选择Java版本 8 或更高版本。
注意
若要授予帐户对Azure 服务总线资源的访问权限,请将 Azure 服务总线 Data Sender 和 Azure 服务总线 Data Receiver 角色分配给当前正在使用的 Microsoft Entra 帐户。 有关授予访问角色的详细信息,请参阅在 Azure 门户中分配 Azure 角色和使用 Microsoft Entra ID 认证及授权应用程序以访问 Azure 服务总线实体。
重要
要完成本文中的步骤,需要 Spring Boot 版本 2.5 或更高版本。
从Azure 服务总线发送和接收消息
在 Azure 服务总线 上使用队列或主题,可以使用 Spring Cloud Azure Stream Binder 服务总线 发送和接收消息。
若要安装 Spring Cloud Azure Stream Binder 服务总线 模块,请将以下依赖项添加到 pom.xml 文件中:
Spring Cloud Azure 材料清单(BOM):
<dependencyManagement> <dependencies> <dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-dependencies</artifactId> <version>7.2.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>注意
如果使用 Spring Boot 4.0.x,请确保将
spring-cloud-azure-dependencies版本设置为7.2.0。如果使用 Spring Boot 3.5.x,请确保将
spring-cloud-azure-dependencies版本设置为6.2.0。如果使用 Spring Boot 3.1.x-3.5.x,请确保将
spring-cloud-azure-dependencies版本设置为5.25.0。如果使用 Spring Boot 2.x,请确保将
spring-cloud-azure-dependencies版本设置为4.20.0。应在
<dependencyManagement>文件的 部分中配置此材料清单(BOM)。 这可确保所有 Spring Cloud Azure依赖项都使用相同的版本。有关用于此 BOM 的版本的详细信息,请参阅 我应该使用哪个版本的 Spring Cloud Azure?。
Spring Cloud Azure Stream Binder 服务总线构件:
<dependency> <groupId>com.azure.spring</groupId> <artifactId>spring-cloud-azure-stream-binder-servicebus</artifactId> </dependency>
编写应用程序代码
使用以下步骤将应用程序配置为使用服务总线队列或主题发送和接收消息。
在配置文件
application.properties中配置服务总线凭据。spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE} spring.cloud.stream.bindings.consume-in-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.bindings.supply-out-0.destination=${AZURE_SERVICEBUS_QUEUE_NAME} spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete=false spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type=queue spring.cloud.function.definition=consume;supply; spring.cloud.stream.poller.fixed-delay=60000 spring.cloud.stream.poller.initial-delay=0下表描述了配置中的字段:
字段 说明 spring.cloud.azure.servicebus.namespace指定从Azure门户在服务总线中获取的命名空间。 spring.cloud.stream.bindings.consume-in-0.destination指定本教程中使用的服务总线队列或服务总线主题。 spring.cloud.stream.bindings.supply-out-0.destination指定与输入目的地相同的值。 spring.cloud.stream.servicebus.bindings.consume-in-0.consumer.auto-complete指定是否自动解决消息。 如果设置为 false,则会添加Checkpointer的消息标头,从而使开发人员能够手动处理消息。spring.cloud.stream.servicebus.bindings.supply-out-0.producer.entity-type指定输出绑定的实体类型,可以是 queue或topic。spring.cloud.function.definition指定要将哪个功能 bean 绑定到由绑定公开的外部目标。 spring.cloud.stream.poller.fixed-delay为默认轮询器指定固定延迟(以毫秒为单位)。 默认值为 1000 L。 推荐的值是60000。spring.cloud.stream.poller.initial-delay指定定期触发器的初始延迟。 默认值为 0。编辑启动类文件以显示以下内容。
import com.azure.spring.messaging.checkpoint.Checkpointer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import reactor.core.publisher.Flux; import reactor.core.publisher.Sinks; import java.util.function.Consumer; import java.util.function.Supplier; import static com.azure.spring.messaging.AzureHeaders.CHECKPOINTER; @SpringBootApplication public class ServiceBusQueueBinderApplication implements CommandLineRunner { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBusQueueBinderApplication.class); private static final Sinks.Many<Message<String>> many = Sinks.many().unicast().onBackpressureBuffer(); public static void main(String[] args) { SpringApplication.run(ServiceBusQueueBinderApplication.class, args); } @Bean public Supplier<Flux<Message<String>>> supply() { return ()->many.asFlux() .doOnNext(m->LOGGER.info("Manually sending message {}", m)) .doOnError(t->LOGGER.error("Error encountered", t)); } @Bean public Consumer<Message<String>> consume() { return message->{ Checkpointer checkpointer = (Checkpointer) message.getHeaders().get(CHECKPOINTER); LOGGER.info("New message received: '{}'", message.getPayload()); checkpointer.success() .doOnSuccess(s->LOGGER.info("Message '{}' successfully checkpointed", message.getPayload())) .doOnError(e->LOGGER.error("Error found", e)) .block(); }; } @Override public void run(String... args) { LOGGER.info("Going to add message {} to Sinks.Many.", "Hello World"); many.emitNext(MessageBuilder.withPayload("Hello World").build(), Sinks.EmitFailureHandler.FAIL_FAST); } }提示
在本教程中,配置或代码中没有身份验证操作。 但是,连接到Azure服务需要身份验证。 若要完成身份验证,需要使用Azure标识。 Spring Cloud Azure 使用
DefaultAzureCredential,这是 Azure 标识库提供的一项功能,帮助您无需进行任何代码更改即可获取凭据。DefaultAzureCredential支持多种身份验证方法,并确定应在运行时使用哪种方法。 通过这种方法,你的应用可在不同环境(例如本地与生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅 DefaultAzureCredential。若要在本地开发环境中完成身份验证,可以使用 Azure CLI、Visual Studio Code、PowerShell 或其他方法。 有关详细信息,请参阅Java开发环境中Azure身份验证。 若要在Azure托管环境中完成身份验证,建议使用用户分配的托管标识。 有关详细信息,请参阅 Azure 资源的托管标识是什么?
启动应用程序。 类似以下示例的消息将被发布在你的应用程序日志中:
New message received: 'Hello World' Message 'Hello World' successfully checkpointed
后续步骤
适用于 Spring 开发人员的 AzureSpring Cloud Azure Stream Binder 服务总线 示例