如何使用适用于 Azure 服务总线 JMS 的 Spring Boot Starter

本文介绍如何使用适用于 Azure 服务总线 JMS 的 Spring Boot Starter 从服务总线 queuestopics 收发消息。

Azure 提供了一个异步消息平台,称为 Azure 服务总线(“服务总线”),该平台基于高级消息队列协议 1.0(“AMQP 1.0”)标准。 服务总线可用于各种受支持的 Azure 平台。

适用于 Azure 服务总线 JMS 的 Spring Boot Starter 提供 Spring JMS 与服务总线的集成。

以下视频介绍如何使用 JMS 2.0 将 Spring JMS 应用程序与 Azure 服务总线 集成。


必备条件

在本文中,需要满足以下先决条件:

  1. Azure 订阅;如果没有 Azure 订阅,可激活 MSDN 订阅者权益或注册免费帐户

  2. 支持的 Java 开发工具包 (JDK) 8 或更高版本。 有关在 Azure 上进行开发时可供使用的 JDK 的详细信息,请参阅 Azure 和 Azure Stack 上的 Java 支持

  3. Apache Maven 3.2 或更高版本。

  4. 如果已有一个已配置的服务总线队列或服务总线主题,请确保服务总线命名空间满足以下要求:

    1. 允许来自所有网络的访问
    2. 是高级版(或更高版本)
    3. 具有一个有关队列和主题的读/写访问权限的访问策略
  5. 如果没有已配置的服务总线队列或服务总线主题,请使用 Azure 门户创建服务总线队列创建服务总线主题。 确保该命名空间满足上一步中指定的要求。 另外,请记下该命名空间中的连接字符串,因为本教程的测试应用需要用到它。

  6. 如果没有 Spring Boot 应用程序,请使用 Spring Initializr 创建一个 Maven项目。 请记住选择 Maven 项目 ,并在 “依赖项”下添加 Web 依赖项,然后选择 811 以用于 Java 版本。

重要

完成本文中的步骤需要 Spring Boot 2.5 或更高版本。

使用 Azure 服务总线 JMS Starter

  1. 在应用的父目录中找到 pom.xml 文件,例如:

    C:\SpringBoot\servicebus\pom.xml

    /users/example/home/servicebus/pom.xml

  2. 在文本编辑器中打开 pom.xml 文件 。

  3. 将 Spring Boot Azure 服务总线 JMS Starter 添加到 <dependencies> 的列表:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-servicebus-jms</artifactId>
      <version>4.4.0</version>
    </dependency>
    
  4. 保存并关闭 pom.xml 文件。

针对服务总线配置应用

在本部分中,了解如何将应用配置为使用服务总线队列或服务总线主题。

使用服务总线队列

  1. 在应用的 resources 目录中找到 application.properties,例如:

    C:\SpringBoot\servicebus\application.properties

    /users/example/home/servicebus/application.properties

  2. 在文本编辑器中打开 application.properties 文件 。

  3. 将以下代码追加到 application.properties 文件的末尾。 将占位符值替换为服务总线的适当值,并且不要在值周围加引号。

    spring.jms.servicebus.connection-string=<ServiceBusNamespaceConnectionString>
    spring.jms.servicebus.idle-timeout=<IdleTimeout>
    spring.jms.servicebus.pricing-tier=<ServiceBusPricingTier> 
    

    字段说明

    字段 说明
    spring.jms.servicebus.connection-string 指定从 Azure 门户内你的服务总线命名空间中获取的连接字符串。
    spring.jms.servicebus.idle-timeout 指定空闲持续时间。
    spring.jms.servicebus.pricing-tier 指定服务总线的定价层。 支持的值为:“高级”、“标准”和“基本”。 高级层使用 Java Message Service (JMS) 2.0,而标准层和基本层使用 JMS 1.0 与 Azure 服务总线交互。
  4. 保存并关闭 application.properties 文件 。

使用服务总线主题

  1. 在应用的 resources 目录中找到 application.properties,例如:

    C:\SpringBoot\servicebus\application.properties

    /users/example/home/servicebus/application.properties

  2. 在文本编辑器中打开 application.properties 文件 。

  3. 将以下代码追加到 application.properties 文件的末尾。 将占位符值替换为服务总线的适当值,并且不要在值周围加引号。

    spring.jms.servicebus.connection-string=<ServiceBusNamespaceConnectionString>
    spring.jms.servicebus.topic-client-id=<ServiceBusSubscriptionID>
    spring.jms.servicebus.idle-timeout=<IdleTimeout>
    spring.jms.servicebus.pricing-tier=<ServiceBusPricingTier> 
    

    字段说明

    字段 说明
    spring.jms.servicebus.connection-string 指定从 Azure 门户内你的服务总线命名空间中获取的连接字符串。
    spring.jms.servicebus.topic-client-id 指定 JMS 客户端 ID,它是 Azure 门户中的服务总线订阅 ID。
    spring.jms.servicebus.idle-timeout 指定空闲持续时间。
    spring.jms.servicebus.pricing-tier 指定服务总线的定价层。 支持的值为:“高级”、“标准”和“基本”。 高级层使用 Java Message Service (JMS) 2.0,而标准层和基本层使用 JMS 1.0 与 Azure 服务总线交互。
  4. 保存并关闭 application.properties 文件。

实现基本的服务总线功能

在本部分中,创建所需的 Java 类,用于将消息发送到服务总线队列或服务总线主题,并从相应的队列或主题订阅接收消息。

修改主应用程序类

  1. 在应用的程序包目录中找到主应用程序 Java 文件,例如:

    C:\SpringBoot\servicebus\src\main\java\com\wingtiptoys\servicebus\ServiceBusJmsStarterApplication.java

    /users/example/home/servicebus/src/main/java/com/wingtiptoys/servicebus/ServiceBusJmsStarterApplication.java

  2. 在文本编辑器中打开主应用程序 Java 文件。

  3. 将以下代码添加到该文件:

     package com.wingtiptoys.servicebus;
    
     import org.springframework.boot.SpringApplication;
     import org.springframework.boot.autoconfigure.SpringBootApplication;
    
     @SpringBootApplication
     public class ServiceBusJmsStarterApplication {
    
         public static void main(String[] args) {
             SpringApplication.run(ServiceBusJmsStarterApplication.class, args);
         }
     }
    
  4. 保存并关闭该文件。

定义测试 Java 类

  1. 使用文本编辑器,在应用的包目录中创建名为 User.java 的 Java 文件。

  2. 定义一个泛型用户类,以存储和检索用户名:

    package com.wingtiptoys.servicebus;
    
    import java.io.Serializable;
    
    // Define a generic User class.
    public class User implements Serializable {
    
        private static final long serialVersionUID = -295422703255886286L;
    
        private String name;
    
        public User() {
        }
    
        public User(String name) {
            setName(name);
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
    }
    

    实现 Serializable 是为了使用 Spring 框架的 JmsTemplate 中的 send 方法。 否则,应定义自定义的 MessageConverter bean,以将内容序列化为文本格式的 json。 有关 MessageConverter 的详细信息,请参阅官方的 Spring JMS Starter 项目

  3. 保存并关闭 User.java 文件 。

为消息发送控制器创建新类

  1. 使用文本编辑器,在应用的包目录中创建名为 SendController.java 的 Java 文件

  2. 将以下代码添加到新文件:

    package com.wingtiptoys.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SendController {
    
        private static final String DESTINATION_NAME = "<DestinationName>";
    
        private static final Logger logger = LoggerFactory.getLogger(SendController.class);
    
        @Autowired
        private JmsTemplate jmsTemplate;
    
        @PostMapping("/messages")
        public String postMessage(@RequestParam String message) {
            logger.info("Sending message");
            jmsTemplate.convertAndSend(DESTINATION_NAME, new User(message));
            return message;
        }
    }
    

    注意

    <DestinationName> 替换为在服务总线命名空间中配置的你自己的队列名称或主题名称。

  3. 保存并关闭 SendController.java 。

为消息接收控制器创建类

从服务总线队列接收消息

  1. 使用文本编辑器,在应用的包目录中创建名为 QueueReceiveController.java 的 Java 文件

  2. 将以下代码添加到新文件:

    package com.wingtiptoys.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class QueueReceiveController {
    
        private static final String QUEUE_NAME = "<ServiceBusQueueName>";
    
        private final Logger logger = LoggerFactory.getLogger(QueueReceiveController.class);
    
        @JmsListener(destination = QUEUE_NAME, containerFactory = "jmsListenerContainerFactory")
        public void receiveMessage(User user) {
            logger.info("Received message: {}", user.getName());
        }
    }
    

    注意

    <ServiceBusQueueName> 替换为在服务总线命名空间中配置的你自己的队列名称。

  3. 保存并关闭 QueueReceiveController.java 文件。

从服务总线订阅接收消息

  1. 使用文本编辑器,在应用的包目录中创建名为 TopicReceiveController.java 的 Java 文件。

  2. 将以下代码添加到新文件。 将 <ServiceBusTopicName> 占位符替换为在服务总线命名空间中配置的你自己的主题名称。 将 <ServiceBusSubscriptionName> 占位符替换为服务总线主题中的你自己的订阅名称。

    package com.wingtiptoys.servicebus;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class TopicReceiveController {
    
        private static final String TOPIC_NAME = "<ServiceBusTopicName>";
    
        private static final String SUBSCRIPTION_NAME = "<ServiceBusSubscriptionName>";
    
        private final Logger logger = LoggerFactory.getLogger(TopicReceiveController.class);
    
        @JmsListener(destination = TOPIC_NAME, containerFactory = "topicJmsListenerContainerFactory",
                subscription = SUBSCRIPTION_NAME)
        public void receiveMessage(User user) {
            logger.info("Received message: {}", user.getName());
        }
    }
    
  3. 保存并关闭 TopicReceiveController.java 文件。

可选的服务总线功能

可以使用自定义的 MessageConverter bean 在 Java 对象与 JMS 消息之间进行转换。

设置消息的内容类型

下面的代码示例将 BytesMessage 内容类型设置为 application/json。 有关详细信息,请参阅消息、有效负载和序列化

package com.wingtiptoys.servicebus;

import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.qpid.jms.message.JmsBytesMessage;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import org.apache.qpid.proton.amqp.Symbol;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import org.springframework.stereotype.Component;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Session;
import java.io.IOException;

@Component
public class CustomMessageConverter extends MappingJackson2MessageConverter {

    private static final String TYPE_ID_PROPERTY = "_type";
    private static final Symbol CONTENT_TYPE = Symbol.valueOf("application/json");

    public CustomMessageConverter() {
        this.setTargetType(MessageType.BYTES);
        this.setTypeIdPropertyName(TYPE_ID_PROPERTY);
    }

    @Override
    protected BytesMessage mapToBytesMessage(Object object, Session session, ObjectWriter objectWriter)
        throws JMSException, IOException {
        final BytesMessage bytesMessage = super.mapToBytesMessage(object, session, objectWriter);
        JmsBytesMessage jmsBytesMessage = (JmsBytesMessage) bytesMessage;
        AmqpJmsMessageFacade facade = (AmqpJmsMessageFacade) jmsBytesMessage.getFacade();
        facade.setContentType(CONTENT_TYPE);
        return jmsBytesMessage;
    }
}

有关 MessageConverter 的详细信息,请参阅官方的 Spring JMS 指南

在 JmsTemplate 中设置会话 ID

启用了会话支持的实体(如启用了会话的服务总线队列)只能接收设置为 SessionId 有效值的消息。 若要向此类实体发送消息,请使用 JmsTemplate.convertAndSend 该方法设置映射到 SessionId 该属性的字符串属性“JMSXGroupID”,如以下示例所示:

@RestController
public class QueueSendController {

    private static final String QUEUE_NAME = "<DestinationName>";

    private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendController.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @PostMapping("/queue")
    public String postMessage(@RequestParam String message) {

        LOGGER.info("Sending message");

        jmsTemplate.convertAndSend(QUEUE_NAME, new User(message), jmsMessage -> {
            jmsMessage.setStringProperty("JMSXGroupID", "xxxeee");
            return jmsMessage;
        });
        return message;
    }
}

生成和测试应用程序

  1. 打开命令提示符,然后将目录更改为 pom.xml 的位置;例如:

    cd C:\SpringBoot\servicebus 
    

    -或-

    cd /users/example/home/servicebus 
    
  2. 使用 Maven 构建 Spring Boot 应用程序,然后运行该应用程序:

    mvn clean spring-boot:run
    
  3. 在应用程序运行后,你可以使用 curl 对其进行测试:

    curl -X POST localhost:8080/messages?message=hello
    

    此时会显示“发送消息”和“hello”已发布到应用程序日志:

    [nio-8080-exec-1] com.wingtiptoys.servicebus.SendController : Sending message
    [enerContainer-1] com.wingtiptoys.servicebus.ReceiveController : Received message: hello
    

清理资源

如果不再需要,请使用 Azure 门户删除本文中创建的资源,以避免产生意外的费用。

后续步骤