概述
本指南介绍如何使用 Azure 队列存储服务为常见方案编写代码。 这些示例是用 Java 编写的,并使用 适用于 Java 的 Azure 存储 SDK。 方案包括 插入、 速览、 获取和 删除 队列消息。 还介绍了 用于创建 和 删除 队列的代码。 有关队列的详细信息,请参阅 后续步骤 部分。
什么是队列存储?
Azure 队列存储是一项服务,用于存储大量可以通过 HTTP 或 HTTPS 通过经过身份验证的调用从世界上任何地方访问的消息。 单个队列消息的大小最多可为 64 KB,队列可以包含数以百万计的消息,消息总量取决于存储帐户的容量上限。 队列存储通常用于创建积压工作以异步处理。
队列服务概念
Azure 队列服务包含以下组件:
存储帐户: 对 Azure 存储的所有访问都通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述。
队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据。
消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。
URL 格式: 队列可以通过使用以下 URL 格式进行寻址:http://
<storage account>
.queue.core.windows.net/<queue>
以下 URL 指向图示中的队列:
http://myaccount.queue.core.windows.net/incoming-orders
创建 Azure 存储帐户
创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户。
还可以使用 Azure PowerShell、Azure CLI或用于 .NET的 azure 存储资源提供程序创建 Azure 存储帐户。
如果目前不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅 使用 Azurite 模拟器进行本地 Azure 存储开发。
创建 Java 应用程序
首先,验证开发系统是否满足 Azure 队列存储客户端库 v12 for Java 中列出的先决条件。
为了创建一个名为queues-how-to-v12
的 Java 应用程序,请按照以下步骤操作:
在控制台窗口中(如 cmd、PowerShell 或 Bash),使用 Maven 创建名称
queues-how-to-v12
为的新控制台应用。 键入以下命令mvn
以创建“hello world”Java 项目。mvn archetype:generate \ --define interactiveMode=n \ --define groupId=com.queues.howto \ --define artifactId=queues-howto-v12 \ --define archetypeArtifactId=maven-archetype-quickstart \ --define archetypeVersion=1.4
mvn archetype:generate ` --define interactiveMode=n ` --define groupId=com.queues.howto ` --define artifactId=queues-howto-v12 ` --define archetypeArtifactId=maven-archetype-quickstart ` --define archetypeVersion=1.4
生成项目的输出应如下所示:
[INFO] Scanning for projects... [INFO] [INFO] ------------------< org.apache.maven:standalone-pom >------------------- [INFO] Building Maven Stub Project (No POM) 1 [INFO] --------------------------------[ pom ]--------------------------------- [INFO] [INFO] >>> maven-archetype-plugin:3.1.2:generate (default-cli) > generate-sources @ standalone-pom >>> [INFO] [INFO] <<< maven-archetype-plugin:3.1.2:generate (default-cli) < generate-sources @ standalone-pom <<< [INFO] [INFO] [INFO] --- maven-archetype-plugin:3.1.2:generate (default-cli) @ standalone-pom --- [INFO] Generating project in Batch mode [INFO] ---------------------------------------------------------------------------- [INFO] Using following parameters for creating project from Archetype: maven-archetype-quickstart:1.4 [INFO] ---------------------------------------------------------------------------- [INFO] Parameter: groupId, Value: com.queues.howto [INFO] Parameter: artifactId, Value: queues-howto-v12 [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: package, Value: com.queues.howto [INFO] Parameter: packageInPathFormat, Value: com/queues/howto [INFO] Parameter: version, Value: 1.0-SNAPSHOT [INFO] Parameter: package, Value: com.queues.howto [INFO] Parameter: groupId, Value: com.queues.howto [INFO] Parameter: artifactId, Value: queues-howto-v12 [INFO] Project created from Archetype in dir: C:\queues\queues-howto-v12 [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 6.775 s [INFO] Finished at: 2020-08-17T15:27:31-07:00 [INFO] ------------------------------------------------------------------------
切换到新创建的
queues-howto-v12
目录。cd queues-howto-v12
安装软件包
在文本编辑器中打开 pom.xml
文件。 将以下依赖项元素添加到依赖项组。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-queue</artifactId>
<version>12.6.0</version>
</dependency>
配置应用程序以访问队列存储
将以下 import 语句添加到要在其中使用 Azure 存储 API 访问队列的 Java 文件的顶部:
// Include the following imports to use queue APIs
import com.azure.core.util.*;
import com.azure.storage.queue.*;
import com.azure.storage.queue.models.*;
设置 Azure 存储连接字符串
Azure 存储客户端使用存储连接字符串来访问数据管理服务。 获取 Azure 门户中列出的存储帐户的名称和主访问密钥。 在连接字符串中将它们作为 AccountName
和 AccountKey
的值使用。 此示例演示如何声明一个静态字段以保存连接字符串:
// Define the connection-string with your values
final String connectStr =
"DefaultEndpointsProtocol=https;" +
"AccountName=your_storage_account;" +
"AccountKey=your_storage_account_key";
以下示例假定你有一个 String
包含存储连接字符串的对象。
如何:创建队列
对象 QueueClient
包含用于与队列交互的操作。 以下代码创建一个 QueueClient
对象。 使用 QueueClient
对象创建要使用的队列。
public static String createQueue(String connectStr)
{
try
{
// Create a unique name for the queue
String queueName = "queue-" + java.util.UUID.randomUUID();
System.out.println("Creating queue: " + queueName);
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queue = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Create the queue
queue.create();
return queue.getQueueName();
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println("Error code: " + e.getErrorCode() + "Message: " + e.getMessage());
return null;
}
}
如何:向队列添加消息
若要将消息插入现有队列,请调用 sendMessage
该方法。 消息可以是字符串(采用 UTF-8 格式)或字节数组。 下面是将字符串消息发送到队列的代码。
public static void addQueueMessage
(String connectStr, String queueName, String messageText)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
System.out.println("Adding message to the queue: " + messageText);
// Add a message to the queue
queueClient.sendMessage(messageText);
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何查看下一条消息
可以通过调用 peekMessage
来查看队列前面的消息,而无需将其从队列中删除。
public static void peekQueueMessage
(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Peek at the first message
PeekedMessageItem peekedMessageItem = queueClient.peekMessage();
System.out.println("Peeked message: " + peekedMessageItem.getMessageText());
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:更改排队消息的内容
可以直接在队列中更改消息内容。 如果消息表示工作任务,则可以使用此功能更新状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 30 秒。 延长可见性超时后,客户端再延长 30 秒才能继续处理消息。 你还可以保留重试计数。 如果消息重试次数超过 n 次,则会将其删除。 每次处理时触发应用程序错误的消息,此方案可保护应用程序不受其影响。
下面的代码示例在消息队列中搜索,找到与搜索字符串匹配的第一条消息内容,修改消息内容,然后退出。
public static void updateQueueMessage
(String connectStr, String queueName,
String searchString, String updatedContents)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// The maximum number of messages to retrieve is 32
final int MAX_MESSAGES = 32;
// Iterate through the queue messages
for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES))
{
// Check for a specific string
if (message.getMessageText().equals(searchString))
{
// Update the message to be visible in 30 seconds
queueClient.updateMessage(message.getMessageId(),
message.getPopReceipt(),
updatedContents,
Duration.ofSeconds(30));
System.out.println(
String.format("Found message: \'%s\' and updated it to \'%s\'",
searchString,
updatedContents)
);
break;
}
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
下面的代码示例仅更新队列中的第一条可见消息。
public static void updateFirstQueueMessage
(String connectStr, String queueName, String updatedContents)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Get the first queue message
QueueMessageItem message = queueClient.receiveMessage();
// Check for a specific string
if (null != message)
{
// Update the message to be visible in 30 seconds
UpdateMessageResult result = queueClient.updateMessage(message.getMessageId(),
message.getPopReceipt(),
updatedContents,
Duration.ofSeconds(30));
System.out.println("Updated the first message with the receipt: " +
result.getPopReceipt());
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:获取队列长度
您可以获取队列中消息数量的估计值。
该方法 getProperties
返回多个值,包括队列中当前的消息数。 计数只是近似值,因为可以在请求后添加或删除消息。 该方法 getApproximateMessageCount
返回调用 getProperties
所检索的最后一个值,而不调用队列存储。
public static void getQueueLength(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
QueueProperties properties = queueClient.getProperties();
long messageCount = properties.getApproximateMessagesCount();
System.out.println(String.format("Queue length: %d", messageCount));
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何出列下一条消息
代码以两个步骤从队列中取出消息。 调用 receiveMessage
时,将得到队列中的下一条消息。 对从此队列读取消息的任何其他代码而言,从 receiveMessage
返回的消息将不可见。 默认情况下,此消息保持 30 秒不可见。 要完成从队列中移除消息,还必须调用 deleteMessage
。 如果代码无法处理消息,则此双重过程可确保可以获取相同的消息,然后重试。 代码在处理完消息后立即调用 deleteMessage
。
public static void dequeueMessage(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// Get the first queue message
QueueMessageItem message = queueClient.receiveMessage();
// Check for a specific string
if (null != message)
{
System.out.println("Dequeing message: " + message.getMessageText());
// Delete the message
queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
}
else
{
System.out.println("No visible messages in queue");
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
处理出队消息的其他选项
可通过两种方法自定义队列中的消息检索。 首先,获取一批消息(最多 32 条)。 其次,设置一个更长或更短的不可见超时时间,以便代码有足够时间来完全处理每条消息。
下面的代码示例使用 receiveMessages
该方法在一次调用中获取 20 条消息。 然后,它使用 for
循环处理每个消息。 它还将每条消息的不可见超时设置为 5 分钟(300 秒)。 超时时间对于所有消息同时开始。 自调用 receiveMessages
已经过去五分钟后,任何未删除的消息都将再次可见。
public static void dequeueMessages(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
// The maximum number of messages to retrieve is 20
final int MAX_MESSAGES = 20;
// Retrieve 20 messages from the queue with a
// visibility timeout of 300 seconds (5 minutes)
for (QueueMessageItem message : queueClient.receiveMessages(MAX_MESSAGES,
Duration.ofSeconds(300), Duration.ofSeconds(1), new Context("key1", "value1")))
{
// Do processing for all messages in less than 5 minutes,
// deleting each message after processing.
System.out.println("Dequeing message: " + message.getMessageText());
queueClient.deleteMessage(message.getMessageId(), message.getPopReceipt());
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何列出队列
若要获取当前队列的列表,请调用 QueueServiceClient.listQueues()
该方法,该方法将返回对象的集合 QueueItem
。
public static void listQueues(String connectStr)
{
try
{
// Instantiate a QueueServiceClient which will be
// used to list the queues
QueueServiceClient queueServiceClient = new QueueServiceClientBuilder()
.connectionString(connectStr)
.buildClient();
// Loop through the collection of queues.
for (QueueItem queue : queueServiceClient.listQueues())
{
// Output each queue name.
System.out.println(queue.getName());
}
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
如何:删除队列
若要删除队列及其中包含的所有消息,请对 delete
对象调用 QueueClient
方法。
public static void deleteMessageQueue(String connectStr, String queueName)
{
try
{
// Instantiate a QueueClient which will be
// used to create and manipulate the queue
QueueClient queueClient = new QueueClientBuilder()
.connectionString(connectStr)
.queueName(queueName)
.buildClient();
System.out.println("Deleting queue: " + queueClient.getQueueName());
// Delete the queue
queueClient.delete();
}
catch (QueueStorageException e)
{
// Output the exception message and stack trace
System.out.println(e.getMessage());
e.printStackTrace();
}
}
后续步骤
了解队列存储的基础知识后,请按照以下链接了解更复杂的存储任务。
有关使用已弃用的 Java 版本 8 SDK 的相关代码示例,请参阅使用 Java 版本 8 的代码示例。