使用 .NET 版本 11.x 客户端库的 Azure 队列存储代码示例
本文介绍使用适用于 .NET 的 Azure 队列存储客户端库版本 11.x 的代码示例。
2023 年 3 月 31 日,我们停用了对不符合当前 Azure SDK 指南的 Azure SDK 库的支持。 新的 Azure SDK 库会定期更新,以推动一致的体验并增强安全态势。 建议转换到新的 Azure SDK 库,以利用新功能和关键安全更新。
尽管 2023 年 3 月 31 日之后仍然可以使用较旧的库,但它们将不再从 Microsoft 获得官方支持和更新。 有关详细信息,请参阅支持停用公告。
有关使用最新版本 12.x 客户端库版本的代码示例,请参阅快速入门:适用于 .NET 的 Azure 队列存储客户端库。
创建队列存储客户端
使用 CloudQueueClient
类可以检索存储在队列存储中的队列。 下面是创建服务客户端的一种方法:
// Retrieve storage account from connection string
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
创建队列
此示例演示如何创建队列:
// Retrieve storage account from connection string
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Create the queue if it doesn't already exist
queue.CreateIfNotExists();
在队列中插入消息
要将消息插入到现有队列中,请先创建新的 CloudQueueMessage
。 接下来,调用 AddMessage
方法。 可以通过字符串(采用 UTF-8 格式)或字节数组创建 CloudQueueMessage
。 下面的代码示例将创建一个队列(如果该队列不存在)并插入消息 Hello, World
:
// Retrieve storage account from connection string
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Create the queue if it doesn't already exist
queue.CreateIfNotExists();
// Create a message and add it to the queue
CloudQueueMessage message = new CloudQueueMessage("Hello, World");
queue.AddMessage(message);
扫视下一条消息
通过调用 PeekMessage
方法,可以速览队列前面的消息,而不会从队列中删除它。
// Retrieve storage account from connection string
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Peek at the next message
CloudQueueMessage peekedMessage = queue.PeekMessage();
// Display message.
Console.WriteLine(peekedMessage.AsString);
更改已排队消息的内容
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Get the message from the queue and update the message contents.
CloudQueueMessage message = queue.GetMessage();
message.SetMessageContent2("Updated contents.", false);
queue.UpdateMessage(message,
TimeSpan.FromSeconds(60.0), // Make it invisible for another 60 seconds.
MessageUpdateFields.Content | MessageUpdateFields.Visibility);
取消下一条消息的排队
代码通过两个步骤来取消对队列中某条消息的排队。 调用 GetMessage
时,会获得队列中的下一条消息。 从 GetMessage
返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 DeleteMessage
。 此删除消息的两步过程可确保,如果代码因硬件或软件故障而无法处理消息,则代码的其他实例可以获取相同消息并重试。 代码在处理消息后会立即调用 DeleteMessage
。
// Retrieve storage account from connection string
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Get the next message
CloudQueueMessage retrievedMessage = queue.GetMessage();
//Process the message in less than 30 seconds, and then delete the message
queue.DeleteMessage(retrievedMessage);
将 aync-await 模式与公用队列存储 API 配合使用
// Create the queue if it doesn't already exist
if(await queue.CreateIfNotExistsAsync())
{
Console.WriteLine("Queue '{0}' Created", queue.Name);
}
else
{
Console.WriteLine("Queue '{0}' Exists", queue.Name);
}
// Create a message to put in the queue
CloudQueueMessage cloudQueueMessage = new CloudQueueMessage("My message");
// Async enqueue the message
await queue.AddMessageAsync(cloudQueueMessage);
Console.WriteLine("Message added");
// Async dequeue the message
CloudQueueMessage retrievedMessage = await queue.GetMessageAsync();
Console.WriteLine("Retrieved message with content '{0}'", retrievedMessage.AsString);
// Async delete the message
await queue.DeleteMessageAsync(retrievedMessage);
Console.WriteLine("Deleted message");
使用用于取消消息排队的其他选项
以下代码示例使用 GetMessages
方法在一个调用中获取 20 条消息。 然后,使用 foreach
循环处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。 超时对于所有消息都是同时开始的,因此在调用 GetMessages
5 分钟后,尚未删除的任何消息都会再次变得可见。
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");
foreach (CloudQueueMessage message in queue.GetMessages(20, TimeSpan.FromMinutes(5)))
{
// Process all messages in less than 5 minutes, deleting each message after processing.
queue.DeleteMessage(message);
}
获取队列长度
可以获取队列中消息的估计数。 FetchAttributes
方法返回包括消息计数在内的队列属性。 ApproximateMessageCount
属性返回 FetchAttributes
方法检索到的最后一个值,而不会调用队列存储。
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Fetch the queue attributes.
queue.FetchAttributes();
// Retrieve the cached approximate message count.
int cachedMessageCount = queue.ApproximateMessageCount;
// Display number of messages.
Console.WriteLine("Number of messages in queue: " + cachedMessageCount);
删除队列
若要删除队列及其包含的所有消息,请对队列对象调用 Delete
方法。
// Retrieve storage account from connection string.
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(
CloudConfigurationManager.GetSetting("StorageConnectionString"));
// Create the queue client.
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
// Retrieve a reference to a queue.
CloudQueue queue = queueClient.GetQueueReference("myqueue");
// Delete the queue.
queue.Delete();
使用队列
在项目目录中,使用 dotnet add package
命令添加以下包:
dotnet add package Microsoft.Azure.Storage.Common
dotnet add package Microsoft.Azure.Storage.Queue
完整代码示例:
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Queue;
namespace QueueApp
{
class Program
{
static async Task Main(string[] args)
{
string connectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING");
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("mystoragequeue");
if (args.Length > 0)
{
string value = String.Join(" ", args);
await InsertMessageAsync(queue, value);
Console.WriteLine($"Sent: {value}");
}
else
{
string value = await RetrieveNextMessageAsync(queue);
Console.WriteLine($"Received: {value}");
}
Console.Write("Press Enter...");
Console.ReadLine();
}
static async Task InsertMessageAsync(CloudQueue theQueue, string newMessage)
{
if (await theQueue.CreateIfNotExistsAsync())
{
Console.WriteLine("The queue was created.");
}
CloudQueueMessage message = new CloudQueueMessage(newMessage);
await theQueue.AddMessageAsync(message);
}
static async Task<string> RetrieveNextMessageAsync(CloudQueue theQueue)
{
bool exists = await theQueue.ExistsAsync();
if (exists)
{
CloudQueueMessage retrievedMessage = await theQueue.GetMessageAsync();
if (retrievedMessage != null)
{
string theMessage = retrievedMessage.AsString;
await theQueue.DeleteMessageAsync(retrievedMessage);
return theMessage;
}
else
{
Console.Write("The queue is empty. Attempt to delete it? (Y/N) ");
string response = Console.ReadLine();
if (response.ToUpper() == "Y")
{
await theQueue.DeleteIfExistsAsync();
return "The queue was deleted.";
}
else
{
return "The queue was not deleted.";
}
}
}
else
{
return "The queue does not exist. Add a message to the command line to create the queue and store the message.";
}
}
}
}