你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程:在 .NET 中使用 Azure 队列存储队列

Azure 队列存储实现基于云的队列以在分布式应用程序的组件之间实现通信。 每个队列维护一个可由发送方组件添加的、由接收方组件处理的消息列表。 使用队列时,应用程序可根据需求立即缩放。 本文介绍有关使用 Azure 队列存储队列的基本步骤。

本教程介绍如何执行下列操作:

  • 创建 Azure 存储帐户
  • 创建应用程序
  • 添加 Azure 客户端库
  • 支持异步代码的支持
  • 创建队列
  • 将消息插入队列
  • 取消消息的排队
  • 删除空队列
  • 检查命令行参数
  • 生成并运行应用

先决条件

创建 Azure 存储帐户

首先创建 Azure 存储帐户。 有关创建存储帐户的分步指南,请参阅创建存储帐户。 这是在先决条件中创建免费的 Azure 帐户后执行的单独的步骤。

创建应用

创建名为 QueueApp 的 .NET Core 应用程序。 为方便起见,此应用将通过队列发送和接收消息。

  1. 在控制台窗口(例如 CMD、PowerShell 或 Azure CLI)中,使用 dotnet new 命令创建名为 QueueApp 的新控制台应用。 此命令将创建简单的“hello world”C# 项目,其中包含一个名为 Program.cs 的源文件。

    dotnet new console -n QueueApp
    
  2. 切换到新建的 QueueApp 文件夹并生成应用,以验证一切是否正常。

    cd QueueApp
    
    dotnet build
    

    应看到结果类似于以下输出:

    C:\Tutorials>dotnet new console -n QueueApp
    The template "Console Application" was created successfully.
    
    Processing post-creation actions...
    Running 'dotnet restore' on QueueApp\QueueApp.csproj...
      Restore completed in 155.63 ms for C:\Tutorials\QueueApp\QueueApp.csproj.
    
    Restore succeeded.
    
    C:\Tutorials>cd QueueApp
    
    C:\Tutorials\QueueApp>dotnet build
    Microsoft (R) Build Engine version 16.0.450+ga8dc7f1d34 for .NET Core
    Copyright (C) Microsoft Corporation. All rights reserved.
    
      Restore completed in 40.87 ms for C:\Tutorials\QueueApp\QueueApp.csproj.
      QueueApp -> C:\Tutorials\QueueApp\bin\Debug\netcoreapp3.1\QueueApp.dll
    
    Build succeeded.
        0 Warning(s)
        0 Error(s)
    
    Time Elapsed 00:00:02.40
    
    C:\Tutorials\QueueApp>_
    

添加 Azure 客户端库

  1. 使用 dotnet add package 命令将 Azure 存储客户端库添加到项目。

    在控制台窗口中从项目文件夹运行以下命令。

    dotnet add package Azure.Storage.Queues
    

添加 using 语句

  1. 在项目目录中的命令行下,键入 code . 以在当前目录中打开 Visual Studio Code。 请将命令行窗口保持打开状态。 稍后需要运行更多的命令。 如果系统提示是否要添加用于生成和调试的 C# 资产,请单击“是”按钮。

  2. 打开 Program.cs 源文件,紧接在 using System; 语句的后面添加以下命名空间。 此应用将使用这些命名空间中的类型来连接 Azure 存储和使用队列。

    using System.Threading.Tasks;
    using Azure.Storage.Queues;
    using Azure.Storage.Queues.Models;
    
  3. 保存 Program.cs 文件。

支持异步代码的支持

由于该应用使用云资源,因此代码将以异步方式运行。

  1. 更新 Main 方法以异步运行。 将 void 替换为async Task返回值。

    static async Task Main(string[] args)
    
  2. 保存 Program.cs 文件。

创建队列

在对 Azure API 进行任何调用之前,必须从 Azure 门户获取凭据。

从 Azure 门户复制凭据

当示例应用程序向 Azure 存储发出请求时,必须对其进行授权。 若要对请求进行授权,请将存储帐户凭据以连接字符串形式添加到应用程序中。 若要查看存储帐户凭据,请按以下步骤操作:

  1. 登录 Azure 门户

  2. 找到自己的存储帐户。

  3. 在存储帐户菜单窗格中的“安全性 + 网络”下,选择“访问密钥” 。 在这里,可以查看帐户访问密钥以及每个密钥的完整连接字符串。

    屏幕截图显示了访问密钥设置在 Azure 门户中的位置

  4. 在“访问密钥”窗格中,选择“显示密钥” 。

  5. 在“key1”部分,找到“连接字符串”值 。 选择“复制到剪贴板”图标来复制该连接字符串。 在下一部分,你要将此连接字符串值添加到某个环境变量。

    显示如何从 Azure 门户复制连接字符串的屏幕截图

配置存储连接字符串

在复制连接字符串后,请将其写入到运行该应用程序的本地计算机上的新环境变量。 若要设置环境变量,请打开控制台窗口,并遵照适用于操作系统的说明。 将 <yourconnectionstring> 替换为实际的连接字符串。

setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"

在 Windows 中添加环境变量后,必须启动命令窗口的新实例。

重新启动程序

添加环境变量后,重启需要读取环境变量的任何正在运行的程序。 例如,先重启开发环境或编辑器,然后再继续操作。

将连接字符串添加到应用

将连接字符串添加到应用中,使应用可以访问存储帐户。

  1. 切回到 Visual Studio Code。

  2. Main 方法中,将 Console.WriteLine("Hello, World"); 代码替换为以下行,该行从环境变量中获取连接字符串。

    string connectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING");
    
  3. 将以下代码添加到 Main 以创建 queue 对象,稍后要将它传入到 send 和 receive 方法。

    QueueClient queue = new QueueClient(connectionString, "mystoragequeue");
    
  4. 保存文件。

将消息插入队列

创建一个新方法用于将消息发送到队列。

  1. Program 类添加以下 InsertMessageAsync 方法。

    向此方法传递一个队列引用。 通过调用 CreateIfNotExistsAsync 创建新队列(如果尚不存在)。 然后,通过调用 SendMessageAsyncnewMessage 添加到队列中。

    static async Task InsertMessageAsync(QueueClient theQueue, string newMessage)
    {
        if (null != await theQueue.CreateIfNotExistsAsync())
        {
            Console.WriteLine("The queue was created.");
        }
    
        await theQueue.SendMessageAsync(newMessage);
    }
    
  2. 可选: 默认情况下,消息的最大生存时间默认设置为 7 天。 可以为消息生存时间指定任何正数。 下面的代码片段添加一个永不过期的消息。

    若要添加未过期的消息,请在对 SendMessageAsync 的调用中使用 Timespan.FromSeconds(-1)

    await theQueue.SendMessageAsync(newMessage, default, TimeSpan.FromSeconds(-1), default);
    
  3. 保存文件。

队列消息必须采用与使用 UTF-8 编码的 XML 请求兼容的格式。 消息的大小最大可为 64 KB。 如果消息包含二进制数据,则对消息进行 Base64 编码

取消消息的排队

创建一个新方法,用于从队列检索消息。 成功收到消息后,必须从队列中删除该消息,以免再次处理该消息。

  1. Program 类添加一个名为 RetrieveNextMessageAsync 的新方法。

    此方法通过调用 ReceiveMessagesAsync 接收来自队列的消息,在第一个参数中传递 1,仅检索队列中的下一条消息。 收到消息后,通过调用 DeleteMessageAsync 从队列中删除该消息。

    当你使用 v12 以前的 SDK 版本将消息发送到队列时,系统会自动对其进行 Base64 编码。 从 v12 开始,该功能已删除。 当你使用 v12 SDK 检索消息时,系统不会自动对其进行 Base64 解码。 你必须自行对内容进行显式 Base64 解码

    static async Task<string> RetrieveNextMessageAsync(QueueClient theQueue)
    {
        if (await theQueue.ExistsAsync())
        {
            QueueProperties properties = await theQueue.GetPropertiesAsync();
    
            if (properties.ApproximateMessagesCount > 0)
            {
                QueueMessage[] retrievedMessage = await theQueue.ReceiveMessagesAsync(1);
                string theMessage = retrievedMessage[0].Body.ToString();
                await theQueue.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
                return theMessage;
            }
    
            return null;
        }
    
        return null;
    }
    
  2. 保存文件。

删除空队列

在项目结束时,最好是确定是否仍然需要所创建的资源。 持续运行资源可能会产生费用。 如果存在空队列,请询问用户是否要删除该队列。

  1. 扩展 RetrieveNextMessageAsync 方法以包含有关是否删除空队列的提示。

    static async Task<string> RetrieveNextMessageAsync(QueueClient theQueue)
    {
        if (await theQueue.ExistsAsync())
        {
            QueueProperties properties = await theQueue.GetPropertiesAsync();
    
            if (properties.ApproximateMessagesCount > 0)
            {
                QueueMessage[] retrievedMessage = await theQueue.ReceiveMessagesAsync(1);
                string theMessage = retrievedMessage[0].Body.ToString();
                await theQueue.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
                return theMessage;
            }
            else
            {
                Console.Write("The queue is empty. Attempt to delete it? (Y/N) ");
                string response = Console.ReadLine();
    
                if (response.ToUpper() == "Y")
                {
                    await theQueue.DeleteIfExistsAsync();
                    return "The queue was deleted.";
                }
                else
                {
                    return "The queue was not deleted.";
                }
            }
        }
        else
        {
            return "The queue does not exist. Add a message to the command line to create the queue and store the message.";
        }
    }
    
  2. 保存文件。

检查命令行参数

如果在应用中传入了任何命令行参数,系统会假设这些参数是要添加到队列的消息。 将参数联接到一起以构成一个字符串。 调用前面添加的 InsertMessageAsync 方法,将此字符串添加到消息队列。

如果没有任何命令行参数,请尝试检索操作。 调用 RetrieveNextMessageAsync 方法检索队列中的下一条消息。

最后,等待用户输入,然后调用 Console.ReadLine 退出。

  1. 扩展 Main 方法以检查命令行参数并等待用户输入。

    static async Task Main(string[] args)
    {
        string connectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING");
    
        QueueClient queue = new QueueClient(connectionString, "mystoragequeue");
    
        if (args.Length > 0)
        {
            string value = String.Join(" ", args);
            await InsertMessageAsync(queue, value);
            Console.WriteLine($"Sent: {value}");
        }
        else
        {
            string value = await RetrieveNextMessageAsync(queue);
            Console.WriteLine($"Received: {value}");
        }
    
        Console.Write("Press Enter...");
        Console.ReadLine();
    }
    
  2. 保存文件。

完整代码

下面是此项目的完整代码列表。

using System;
using System.Threading.Tasks;
using Azure.Storage.Queues;
using Azure.Storage.Queues.Models;

namespace QueueApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string connectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONNECTION_STRING");

            QueueClient queue = new QueueClient(connectionString, "mystoragequeue");

            if (args.Length > 0)
            {
                string value = String.Join(" ", args);
                await InsertMessageAsync(queue, value);
                Console.WriteLine($"Sent: {value}");
            }
            else
            {
                string value = await RetrieveNextMessageAsync(queue);
                Console.WriteLine($"Received: {value}");
            }

            Console.Write("Press Enter...");
            Console.ReadLine();
        }

        static async Task InsertMessageAsync(QueueClient theQueue, string newMessage)
        {
            if (null != await theQueue.CreateIfNotExistsAsync())
            {
                Console.WriteLine("The queue was created.");
            }

            await theQueue.SendMessageAsync(newMessage);
        }

        static async Task<string> RetrieveNextMessageAsync(QueueClient theQueue)
        {
            if (await theQueue.ExistsAsync())
            {
                QueueProperties properties = await theQueue.GetPropertiesAsync();

                if (properties.ApproximateMessagesCount > 0)
                {
                    QueueMessage[] retrievedMessage = await theQueue.ReceiveMessagesAsync(1);
                    string theMessage = retrievedMessage[0].Body.ToString();
                    await theQueue.DeleteMessageAsync(retrievedMessage[0].MessageId, retrievedMessage[0].PopReceipt);
                    return theMessage;
                }
                else
                {
                    Console.Write("The queue is empty. Attempt to delete it? (Y/N) ");
                    string response = Console.ReadLine();

                    if (response.ToUpper() == "Y")
                    {
                        await theQueue.DeleteIfExistsAsync();
                        return "The queue was deleted.";
                    }
                    else
                    {
                        return "The queue was not deleted.";
                    }
                }
            }
            else
            {
                return "The queue does not exist. Add a message to the command line to create the queue and store the message.";
            }
        }
    }
}

生成并运行应用

  1. 从项目目录中的命令行运行以下 dotnet 命令以生成项目。

    dotnet build
    
  2. 项目成功生成后,运行以下命令将第一个消息添加到队列。

    dotnet run First queue message
    

    你应该会看到以下输出:

    C:\Tutorials\QueueApp>dotnet run First queue message
    The queue was created.
    Sent: First queue message
    Press Enter..._
    
  3. 不结合任何命令行参数运行该应用可以接收和删除队列中的第一个消息。

    dotnet run
    
  4. 继续运行应用,直到已删除所有消息。 如果多次运行该应用,将会收到一条指出队列为空的消息,以及一条有关是否要删除该队列的提示。

    C:\Tutorials\QueueApp>dotnet run First queue message
    The queue was created.
    Sent: First queue message
    Press Enter...
    
    C:\Tutorials\QueueApp>dotnet run Second queue message
    Sent: Second queue message
    Press Enter...
    
    C:\Tutorials\QueueApp>dotnet run Third queue message
    Sent: Third queue message
    Press Enter...
    
    C:\Tutorials\QueueApp>dotnet run
    Received: First queue message
    Press Enter...
    
    C:\Tutorials\QueueApp>dotnet run
    Received: Second queue message
    Press Enter...
    
    C:\Tutorials\QueueApp>dotnet run
    Received: Third queue message
    Press Enter...
    
    C:\Tutorials\QueueApp>dotnet run
    The queue is empty. Attempt to delete it? (Y/N) Y
    Received: The queue was deleted.
    Press Enter...
    
    C:\Tutorials\QueueApp>_
    

后续步骤

在本教程中,你了解了如何执行以下操作:

  1. 创建队列
  2. 在队列中添加和删除消息
  3. 删除 Azure 队列存储队列

查看 Azure 队列存储快速入门了解详细信息。

有关使用已弃用的 .NET 版本 11.x SDK 的相关代码示例,请参阅使用 .NET 版本 11.x 的代码示例