通过


在 Spring 应用程序中使用Azure Storage队列

本文演示如何在使用 Spring FrameworkAzure Storage Queue>。

Azure Storage队列实现基于云的队列,以实现分布式应用程序组件之间的通信。 每个队列维护一个可由发送方组件添加的、由接收方组件处理的消息列表。 使用队列时,应用程序可根据需求立即缩放。

Spring Cloud Azure提供了各种模块,用于使用 Spring 框架向Azure Storage队列发送消息以及从中接收消息。 可以单独使用这些模块,也可以将它们合并为不同的用例,如以下列表所述:

先决条件

  • Azure 存储帐户和 Azure 队列。 如果没有这些资源,请先创建存储帐户,然后创建队列。 有关详细信息,请参阅 创建存储帐户创建队列部分Quickstart:创建队列并使用 Azure 门户添加消息。

  • Spring Boot 应用程序。 如果没有,请使用 Spring Initializr 创建一个 Maven 项目。 请务必选择 Maven Project,在 Dependencies 下,添加 Spring Web 依赖项,然后选择Java版本 8 或更高版本。

注意

若要授予帐户对资源的访问权限,请在新创建的 Azure Storage 帐户中,将 Storage Queue Data Contributor 角色分配给当前正在使用的 Microsoft Entra 帐户。 有关详细信息,请参阅使用 Azure 门户分配 Azure 角色

重要

要完成本教程中的步骤,需要 Spring Boot 版本 2.5 或更高版本。

准备本地环境

在本教程中,配置和代码没有任何身份验证操作。 但是,连接到Azure服务需要身份验证。 若要完成身份验证,需要使用Azure标识客户端库。 Spring Cloud Azure 通过 Azure 标识库使用 DefaultAzureCredential,帮助你在不进行任何代码更改的情况下获取凭据。

DefaultAzureCredential 支持多种身份验证方法,并确定应在运行时使用哪种方法。 通过这种方法,你的应用可在不同环境(例如本地或生产环境)中使用不同的身份验证方法,而无需实现特定于环境的代码。 有关详细信息,请参阅《Authenticate Azure 托管 Java 应用程序》DefaultAzureCredential部分。

若要使用 Azure CLI、IntelliJ 或其他方法在本地开发环境中完成身份验证,请参阅 Java 开发环境中Azure身份验证。 若要在Azure托管环境中完成身份验证,建议使用托管标识。 有关详细信息,请参阅 Azure 资源的托管标识是什么?

使用 Spring Cloud Azure Storage Queue 启动器

Spring Cloud Azure Storage Queue Starter 模块通过 Spring Boot 框架导入 Azure Storage 队列的 Java 客户端库。 可以在非互斥模式中使用 Spring Cloud Azure和Azure SDK。 因此,可以在 Spring 应用程序中继续使用存储队列Java客户端 API。

添加依赖项

若要安装 Spring Cloud Azure Storage 队列初学者模块,请将以下依赖项添加到 pom.xml 文件:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.1.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 4.0.x,请确保将 spring-cloud-azure-dependencies 版本设置为 7.1.0

    如果使用 Spring Boot 3.5.x,请确保将 spring-cloud-azure-dependencies 版本设置为 6.1.0

    如果使用 Spring Boot 3.1.x-3.5.x,请确保将 spring-cloud-azure-dependencies 版本设置为 5.25.0

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0

    应在 <dependencyManagement> 文件的 部分中配置此材料清单(BOM)。 这可确保所有 Spring Cloud Azure依赖项都使用相同的版本。

    有关此 BOM 所用版本的更多信息,请参阅 我应该使用哪个 Spring Cloud Azure 版本

  • Spring Cloud Azure Queue Storage 队列项目:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-storage-queue</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

本部分介绍如何在 Spring 应用程序的上下文中使用Azure Queue Storage客户端。 你有以下两种选择:

  • 使用 Spring Boot 自动配置,并从 Spring 上下文中使用现成的客户端(推荐)。
  • 以编程方式生成客户端。

使用自动配置,可以从 Spring 控制反转 (IoC) 容器自动连接客户端 bean。 在使用存储队列客户端进行开发时,这种方法为你提供了更灵活、更高效的体验。 自动配置具有以下优势:

  • 自动配置使用外部化配置,这样你就可以在不同的环境中使用相同的应用程序代码。

  • 可以将学习生成器模式并将客户端注册到应用程序上下文的过程委托给 Spring Boot 框架。 你只关注如何根据自己的业务需求使用客户端。

  • 可以使用运行状况指示器来检查应用程序和内部组件的状态和运行状况。

以下部分中的代码示例介绍如何将 QueueClient 与所描述的两个替代方案一起使用。

提示

Azure Java SDK for Storage Queue 提供了多个客户端来与存储队列交互。 启动器还为所有存储队列客户端和客户端生成器提供自动配置。 本文仅以 QueueClient 为例。

使用 Spring Boot 自动配置

若要向Azure Storage队列发送消息并从中接收消息,请使用以下步骤配置应用程序:

  1. 配置存储帐户名称和队列名称,如以下示例所示:

    spring.cloud.azure.storage.queue.account-name=<your-storage-account-name>
    spring.cloud.azure.storage.queue.queue-name=<your-storage-queue-name>
    
  2. 在 Spring 应用程序中注入 QueueClient,并调用相关 API 以发送消息,如以下示例所示:

    import com.azure.storage.queue.QueueClient;
    import com.azure.storage.queue.models.QueueMessageItem;
    import com.azure.storage.queue.models.SendMessageResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class StorageQueueClientApplication implements CommandLineRunner {
    
        private final static Logger logger = LoggerFactory.getLogger(StorageQueueClientApplication.class);
    
        @Autowired
        private QueueClient queueClient;
    
        public static void main(String[] args) {
            SpringApplication.run(StorageQueueClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
         // Using the QueueClient object, call the create method to create the queue in your storage account.
            queueClient.create();
            SendMessageResult sendMessageResult = queueClient.sendMessage("Hello world");
            logger.info("Send message id: {}", sendMessageResult.getMessageId());
    
            QueueMessageItem queueMessageItem = queueClient.receiveMessage();
            logger.info("Received message: {}", new String(queueMessageItem.getBody().toBytes()));
        }
    
    }
    
  3. 启动应用程序。 启动后,应用程序生成类似于以下示例的日志:

    Send message id: ...
    Received message: Hello world
    

以编程方式生成客户端

你可以自行生成客户端 bean,但该过程很复杂。 在 Spring Boot 应用程序中,必须管理属性、了解生成器模式,并将客户端注册到 Spring 应用程序上下文。 以下步骤演示如何做到这一点。

  1. 在 Spring 应用程序中以编程方式生成客户端,如以下示例所示。 请确保将 <storage-account-name> 占位符替换为你自己的值。

    import com.azure.identity.DefaultAzureCredentialBuilder;
    import com.azure.storage.queue.QueueClient;
    import com.azure.storage.queue.QueueClientBuilder;
    import com.azure.storage.queue.models.QueueMessageItem;
    import com.azure.storage.queue.models.SendMessageResult;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class StorageQueueClientApplication implements CommandLineRunner {
    
        private final static String queueName = "test-queue";
        private final static String endpoint = "https://<storage-account-name>.queue.core.windows.net/";
        private final static Logger logger = LoggerFactory.getLogger(StorageQueueClientApplication.class);
    
        QueueClient queueClient = new QueueClientBuilder()
            .endpoint(endpoint)
            .queueName(queueName)
            .credential(new DefaultAzureCredentialBuilder().build())
            .buildClient();
    
        public static void main(String[] args) {
            SpringApplication.run(StorageQueueClientApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
         // Using the QueueClient object, call the create method to create the queue in your storage account.
            queueClient.create();
            SendMessageResult sendMessageResult = queueClient.sendMessage("Hello world");
            logger.info("Send message id: {}", sendMessageResult.getMessageId());
    
            QueueMessageItem queueMessageItem = queueClient.receiveMessage();
            logger.info("Received message: {}", new String(queueMessageItem.getBody().toBytes()));
        }
    
    }
    
  2. 启动应用程序。 启动后,应用程序生成类似于以下示例的日志:

    Send message id: ...
    Received message: Hello world
    

以下列表显示了此代码不灵活或不优雅的原因:

  • 存储帐户和队列名称是硬编码的。
  • 如果使用 @Value 从 Spring 环境获取配置,则 application.properties 文件中不能有 IDE 提示。
  • 如果有微服务方案,则必须复制每个项目中的代码,这很容易出错,也很难保持一致。

幸运的是,使用 Spring Cloud Azure 不需要手动创建客户端 Beans。 相反,可以直接注入它们,并使用已熟悉的配置属性来配置存储队列。 有关详细信息,请参阅 Spring Cloud Azure配置属性

Spring Cloud Azure还为不同的方案提供以下全局配置。 有关详细信息,请参阅 Spring Cloud Azure全局配置属性

  • 代理选项。
  • 重试选项。

还可以连接到不同的Azure云。 有关详细信息,请参阅 连接到不同的 Azure 云

使用 Spring Messaging 处理 Azure 存储队列

Spring Messaging messaging Azure Storage Queue 模块支持具有 Azure Queue Storage 的 Spring Messaging 框架。

如果使用 Spring Messaging Azure Storage Queue,则可以使用 StorageQueueTemplate 功能以异步和同步方式将消息发送到存储队列。

以下部分介绍如何使用 Spring Messaging Azure Storage 队列向存储队列发送消息并从中接收消息。

添加依赖项

若要安装 Spring Messaging Azure Storage Queue 模块,请将以下依赖项添加到 pom.xml 文件中:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.1.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 4.0.x,请确保将 spring-cloud-azure-dependencies 版本设置为 7.1.0

    如果使用 Spring Boot 3.5.x,请确保将 spring-cloud-azure-dependencies 版本设置为 6.1.0

    如果使用 Spring Boot 3.1.x-3.5.x,请确保将 spring-cloud-azure-dependencies 版本设置为 5.25.0

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0

    应在 <dependencyManagement> 文件的 部分中配置此材料清单(BOM)。 这可确保所有 Spring Cloud Azure依赖项都使用相同的版本。

    有关此 BOM 所用版本的更多信息,请参阅 我应该使用哪个 Spring Cloud Azure 版本

  • Spring Cloud Azure 启动器和 Spring Messaging Storage Queue 工件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter</artifactId>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-messaging-azure-storage-queue</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

使用以下步骤配置应用程序并对其进行编码:

  1. 为存储队列配置Azure Storage帐户名称,如以下示例所示:

    spring.cloud.azure.storage.queue.account-name=<your-storage-account-name>
    
  2. 将发送方和接收方连接起来,使用 Spring 发送和接收消息,如以下示例所示。 请确保将 <storage-queue-name> 占位符替换为你自己的值。

    import com.azure.spring.messaging.AzureHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.storage.queue.core.StorageQueueTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import java.time.Duration;
    
    @SpringBootApplication
    public class StorageQueueMessagingApplication implements CommandLineRunner {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(StorageQueueMessagingApplication.class);
        private static final String STORAGE_QUEUE_NAME = "<storage-queue-name>";
    
        @Autowired
        StorageQueueTemplate storageQueueTemplate;
    
        public static void main(String[] args) {
            SpringApplication.run(StorageQueueMessagingApplication.class, args);
        }
    
        @Override
        public void run(String... args) {
            storageQueueTemplate
                .sendAsync(STORAGE_QUEUE_NAME, MessageBuilder.withPayload("Hello world").build())
                .subscribe();
            LOGGER.info("Message was sent successfully.");
    
            Message<?> message = storageQueueTemplate.receiveAsync(STORAGE_QUEUE_NAME, Duration.ofSeconds(30)).block();
            LOGGER.info("Received message: {}", new String((byte[]) message.getPayload()));
        }
    
    }
    
  3. 启动应用程序。 启动后,应用程序生成类似于以下示例的日志:

    Message was sent successfully.
    ...
    Received message: Hello World
    

使用 Spring Integration Azure Storage 队列

Spring Integration Azure Storage Queue 模块支持包含存储队列的 Spring Integration 框架。

如果 Spring 应用程序使用 Spring 集成消息通道,则可以使用通道适配器在消息通道和存储队列之间路由消息。 入站通道适配器将消息从存储队列转发到消息通道。 出站通道适配器将消息从消息通道发布到存储队列。

以下部分介绍如何使用 Spring Integration Azure Storage Queue 向/从存储队列发送和接收消息。

添加依赖项

若要安装 Spring Integration Azure Storage Queue 模块,请将以下依赖项添加到 pom.xml 文件中:

  • Spring Cloud Azure 材料清单(BOM):

    <dependencyManagement>
       <dependencies>
         <dependency>
           <groupId>com.azure.spring</groupId>
           <artifactId>spring-cloud-azure-dependencies</artifactId>
           <version>7.1.0</version>
           <type>pom</type>
           <scope>import</scope>
           </dependency>
       </dependencies>
    </dependencyManagement>
    

    注意

    如果使用 Spring Boot 4.0.x,请确保将 spring-cloud-azure-dependencies 版本设置为 7.1.0

    如果使用 Spring Boot 3.5.x,请确保将 spring-cloud-azure-dependencies 版本设置为 6.1.0

    如果使用 Spring Boot 3.1.x-3.5.x,请确保将 spring-cloud-azure-dependencies 版本设置为 5.25.0

    如果使用 Spring Boot 2.x,请确保将 spring-cloud-azure-dependencies 版本设置为 4.20.0

    应在 <dependencyManagement> 文件的 部分中配置此材料清单(BOM)。 这可确保所有 Spring Cloud Azure依赖项都使用相同的版本。

    有关此 BOM 所用版本的更多信息,请参阅 我应该使用哪个 Spring Cloud Azure 版本

  • Spring Integration Azure Storage Queue 工件:

    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-starter-integration-storage-queue</artifactId>
    </dependency>
    

编写应用程序代码以发送和接收消息

使用以下步骤配置应用程序并对其进行编码:

  1. 为存储队列配置Azure Storage帐户名称。

    spring.cloud.azure.storage.queue.account-name=<your-storage-account-name>
    
  2. 创建一个新的 QueueReceiveConfiguration Java 类,如以下示例所示。 此类用于定义消息接收方。 请确保将 <storage-queue-name> 占位符替换为你自己的值。

    import com.azure.spring.integration.storage.queue.inbound.StorageQueueMessageSource;
    import com.azure.spring.messaging.AzureHeaders;
    import com.azure.spring.messaging.checkpoint.Checkpointer;
    import com.azure.spring.messaging.storage.queue.core.StorageQueueTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.InboundChannelAdapter;
    import org.springframework.integration.annotation.Poller;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.handler.annotation.Header;
    
    @Configuration
    public class QueueReceiveConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(QueueReceiveConfiguration.class);
        private static final String STORAGE_QUEUE_NAME = "<storage-queue-name>";
        private static final String INPUT_CHANNEL = "input";
    
        @Bean
        @InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
        public StorageQueueMessageSource storageQueueMessageSource(StorageQueueTemplate storageQueueTemplate) {
            return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueTemplate);
        }
    
        @ServiceActivator(inputChannel = INPUT_CHANNEL)
        public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
            String message = new String(payload);
            LOGGER.info("Received message: {}", message);
        }
    
    }
    
  3. 创建一个新的 QueueSendConfiguration Java 类,如以下示例所示。 此类用于定义消息发送方。 请确保将 <storage-queue-name> 占位符替换为你自己的值。

    import com.azure.spring.integration.core.handler.DefaultMessageHandler;
    import com.azure.spring.messaging.storage.queue.core.StorageQueueTemplate;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.annotation.ServiceActivator;
    import org.springframework.messaging.MessageHandler;
    import org.springframework.util.concurrent.ListenableFutureCallback;
    
    @Configuration
    public class QueueSendConfiguration {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(QueueSendConfiguration.class);
        private static final String STORAGE_QUEUE_NAME = "<storage-queue-name>";
        private static final String OUTPUT_CHANNEL = "output";
    
        @Bean
        @ServiceActivator(inputChannel = OUTPUT_CHANNEL)
        public MessageHandler messageSender(StorageQueueTemplate storageQueueTemplate) {
            DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueTemplate);
            handler.setSendCallback(new ListenableFutureCallback<Void>() {
                @Override
                public void onSuccess(Void result) {
                    LOGGER.info("Message was sent successfully.");
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.info("There was an error sending the message.");
                }
            });
            return handler;
        }
    
        @MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
        public interface StorageQueueOutboundGateway {
            void send(String text);
        }
    
    }
    
  4. 将发送方和接收方连接起来,用 Spring 发送和接收消息。

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.integration.config.EnableIntegration;
    
    @SpringBootApplication
    @EnableIntegration
    @Configuration(proxyBeanMethods = false)
    public class StorageQueueIntegrationApplication {
    
        public static void main(String[] args) {
            ConfigurableApplicationContext applicationContext = SpringApplication.run(StorageQueueIntegrationApplication.class, args);
            QueueSendConfiguration.StorageQueueOutboundGateway storageQueueOutboundGateway = applicationContext.getBeanQueueSendConfiguration.StorageQueueOutboundGateway.class);
            storageQueueOutboundGateway.send("Hello World");
        }
    
    }
    

    提示

    请记住添加 @EnableIntegration 批注,这将启用 Spring 集成基础结构。

  5. 启动应用程序。 启动后,应用程序生成类似于以下示例的日志:

    Message was sent successfully.
    Received message: Hello World
    

部署到Azure Spring Apps

现在,你已在本地运行 Spring Boot 应用程序,是时候将其转移到生产环境了。 Azure Spring Apps可以轻松地将 Spring Boot 应用程序部署到Azure,而无需更改任何代码。 该服务管理 Spring 应用程序的基础结构,让开发人员可以专注于代码。 Azure Spring Apps使用全面的监视和诊断、配置管理、服务发现、CI/CD 集成、蓝绿部署等提供生命周期管理。 若要将应用程序部署到 Azure Spring Apps,请参阅 将第一个应用程序部署到 Azure Spring Apps

后续步骤

适用于 Spring 开发人员的 AzureSpring Cloud Azure Storage 队列示例

另请参阅

有关适用于 Microsoft Azure 的 Spring Boot 初学者的详细信息,请参阅 什么是 Spring Cloud Azure?