你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

教程:使用子协议在 WebSocket 客户端之间发布和订阅消息

构建聊天应用教程中,你已了解如何使用 WebSocket API 通过 Azure Web PubSub 发送和接收数据。 客户端与服务进行通信时不需要协议。 例如,你可以使用 WebSocket.send() 发送任何数据,服务器将按原样接收数据。 这很容易使用,但功能也很有限。 例如,你不能在向服务器发送事件时指定事件名称,也不能将消息发布到其他客户端,而只能将其发送到服务器。 在本教程中,你将了解如何使用子协议来扩展客户端的功能。

在本教程中,你将了解:

  • 创建 Web PubSub 服务实例
  • 生成完整 URL 以建立 WebSocket 连接
  • 使用子协议在 WebSocket 客户端之间发布消息

如果没有 Azure 订阅,请在开始之前创建一个 Azure 免费帐户

先决条件

  • 本设置需要 Azure CLI 版本 2.22.0 或更高版本。 如果使用 Azure Cloud Shell,则最新版本已安装。

创建 Azure Web PubSub 实例

创建资源组

资源组是在其中部署和管理 Azure 资源的逻辑容器。 使用 az group create 命令在 eastus 位置创建一个名为 myResourceGroup 的资源组 。

az group create --name "myResourceGroup" -l "EastUS"

创建 Web PubSub 实例

运行 az extension add,安装 webpubsub 扩展或将其升级到当前版本。

az extension add --upgrade --name webpubsub

使用 Azure CLI az webpubsub create 命令在已创建的资源组中创建 Web PubSub。 以下命令在 EastUS 的资源组 myResourceGroup 下创建一个免费的 Web PubSub 资源:

重要

每个 Web PubSub 资源必须具有唯一名称。 在以下示例中,将 <your-unique-resource-name> 替换为 Web PubSub 的名称。

az webpubsub create --name "<your-unique-resource-name>" --resource-group "myResourceGroup" --location "EastUS" --sku Free_F1

此命令的输出会显示新建的资源的属性。 请记下下面列出的两个属性:

  • 资源名称:为上面的 --name 参数提供的名称。
  • 主机名:在本例中,主机名为 <your-unique-resource-name>.webpubsub.azure.com/

目前,只有你的 Azure 帐户才有权对这个新资源执行任何操作。

获取 ConnectionString 以供将来使用

重要

连接字符串包括应用程序访问 Azure Web PubSub 服务所需的授权信息。 连接字符串中的访问密钥类似于服务的根密码。 请始终小心保护访问密钥。 使用 Azure 密钥保管库安全地管理和轮换密钥。 避免将访问密钥分发给其他用户、对其进行硬编码或将其以纯文本形式保存在其他人可以访问的任何位置。 如果你认为访问密钥可能已泄露,请轮换密钥。

使用 Azure CLI az webpubsub key 命令获取服务的 ConnectionString。

az webpubsub key show --name "<your-unique-resource-name>" --resource-group "myResourceGroup" --query primaryConnectionString

复制已提取的 ConnectionString,本教程稍后会将其用作 <connection_string> 的值。

设置项目

先决条件

使用子协议

客户端可以使用特定的子协议启动 WebSocket 连接。 Azure Web PubSub 服务支持名为 json.webpubsub.azure.v1 的子协议,使客户端能够通过 Web PubSub 服务直接进行发布/订阅,而不是往返于上游服务器。 查看 Azure Web PubSub 支持的 JSON WebSocket 子协议,了解该子协议的详细信息。

如果使用其他协议名称,这些名称将被服务忽略,并在连接事件处理程序中传递到服务器,因此你可以构建自己的协议。

使用 json.webpubsub.azure.v1 子协议创建 Web 应用程序。

  1. 安装依赖项

    mkdir logstream
    cd logstream
    dotnet new web
    dotnet add package Microsoft.Extensions.Azure
    dotnet add package Azure.Messaging.WebPubSub
    
  2. 创建服务器端来托管 /negotiate API 和 Web 页面。

    使用以下代码更新 Startup.cs

    • 更新 ConfigureServices 方法以添加服务客户端,并从配置中读取连接字符串。
    • 更新 Configure 方法,在 app.UseRouting(); 之前添加 app.UseStaticFiles();,以支持静态文件。
    • 并使用 /negotiate 请求更新 app.UseEndpoints,以生成客户端访问令牌。
    using Azure.Messaging.WebPubSub;
    
    using Microsoft.AspNetCore.Builder;
    using Microsoft.AspNetCore.Hosting;
    using Microsoft.AspNetCore.Http;
    using Microsoft.Extensions.Azure;
    using Microsoft.Extensions.Configuration;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Hosting;
    
    namespace logstream
    {
        public class Startup
        {
            public Startup(IConfiguration configuration)
            {
                Configuration = configuration;
            }
    
            public IConfiguration Configuration { get; }
    
            public void ConfigureServices(IServiceCollection services)
            {
                services.AddAzureClients(builder =>
                {
                    builder.AddWebPubSubServiceClient(Configuration["Azure:WebPubSub:ConnectionString"], "stream");
                });
            }
    
            public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
            {
                if (env.IsDevelopment())
                {
                    app.UseDeveloperExceptionPage();
                }
    
                app.UseStaticFiles();
    
                app.UseRouting();
    
                app.UseEndpoints(endpoints =>
                {
                    endpoints.MapGet("/negotiate", async context =>
                    {
                        var service = context.RequestServices.GetRequiredService<WebPubSubServiceClient>();
                        var response = new
                        {
                            url = service.GetClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" }).AbsoluteUri
                        };
                        await context.Response.WriteAsJsonAsync(response);
                    });
                });
            }
        }
    }
    
    
  3. 创建网页

    使用以下内容创建一个 HTML 页面,并将其另存为 wwwroot/index.html

    <html>
    
    <body>
      <div id="output"></div>
      <script>
        (async function () {
          let res = await fetch('/negotiate')
          let data = await res.json();
          let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
          ws.onopen = () => {
            console.log('connected');
          };
    
          let output = document.querySelector('#output');
          ws.onmessage = event => {
            let d = document.createElement('p');
            d.innerText = event.data;
            output.appendChild(d);
          };
        })();
      </script>
    </body>
    
    </html>
    

    它只连接到服务,并将收到的任何消息打印到页面上。 主要的变化是在创建 WebSocket 连接时指定了子协议。

  4. 运行服务器

    使用适用于 .NET Core 的机密管理器工具来设置连接字符串。 运行以下命令,将 <connection_string> 替换为在上一步中提取的连接字符串,并在浏览器中打开 http://localhost:5000/index.html :

    dotnet user-secrets init
    dotnet user-secrets set Azure:WebPubSub:ConnectionString "<connection-string>"
    dotnet run
    

    如果你使用的是 Chrome,可以按“F12”或单击右键 ->“检查”->“开发人员工具”,并选择“网络”选项卡。加载网页,你可以看到 WebSocket 连接已经建立。 单击以检查 WebSocket 连接,可以看到客户端中收到以下 connected 事件消息。 可以发现,你能够获得为该客户端生成的 connectionId

    {"type":"system","event":"connected","userId":null,"connectionId":"<the_connection_id>"}
    

可以发现,借助子协议,可在连接为 connected 时获取连接的某些元数据。

另请注意,客户端现在接收到的不是纯文本,而是包含更多信息的 JSON 消息(例如消息类型和来源)。 因此,你可以使用此信息对消息进行更多的处理(例如,如果消息来自不同的来源,则以不同的样式显示消息),可以在后面的部分中了解这些信息。

从客户端发布消息

构建聊天应用教程中,当客户端通过 WebSocket 连接将消息发送到 Web PubSub 服务时,该服务将在服务器端触发用户事件。 使用子协议,客户端将通过发送 JSON 消息获得更多的功能。 例如,可以通过 Web PubSub 服务将消息从一个客户端直接发布到其他客户端。

如果要将大量数据实时流式传输到其他客户端,这很有用。 使用此功能来构建日志流式处理应用程序,该应用程序可实时将控制台日志流式传输到浏览器。

  1. 创建流式处理程序

    创建 stream 程序:

    mkdir stream
    cd stream
    dotnet new console
    

    使用以下内容更新 Program.cs

    using System;
    using System.Net.Http;
    using System.Net.WebSockets;
    using System.Text;
    using System.Text.Json;
    using System.Threading.Tasks;
    
    namespace stream
    {
        class Program
        {
            private static readonly HttpClient http = new HttpClient();
            static async Task Main(string[] args)
            {
                // Get client url from remote
                var stream = await http.GetStreamAsync("http://localhost:5000/negotiate");
                var url = (await JsonSerializer.DeserializeAsync<ClientToken>(stream)).url;
                var client = new ClientWebSocket();
                client.Options.AddSubProtocol("json.webpubsub.azure.v1");
    
                await client.ConnectAsync(new Uri(url), default);
    
                Console.WriteLine("Connected.");
                var streaming = Console.ReadLine();
                while (streaming != null)
                {
                    if (!string.IsNullOrEmpty(streaming))
                    {
                        var message = JsonSerializer.Serialize(new
                        {
                            type = "sendToGroup",
                            group = "stream",
                            data = streaming + Environment.NewLine,
                        });
                        Console.WriteLine("Sending " + message);
                        await client.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, default);
                    }
    
                    streaming = Console.ReadLine();
                }
    
                await client.CloseAsync(WebSocketCloseStatus.NormalClosure, null, default);
            }
    
            private sealed class ClientToken
            {
                public string url { get; set; }
            }
        }
    }
    
    

    可以看到此处有一个新的概念“组”。 组是中心中的逻辑概念,你可以在中心内将消息发布到一组连接。 在中心中,可以有多个组,一个客户端可以同时订阅多个组。 使用子协议时,只能发布到组,而不是广播到整个中心。 有关术语的详细信息,请查看基本概念

  2. 由于我们在此处使用组,因此在 ws.onopen 回调内建立 WebSocket 连接时,还需要更新 index.html 网页以加入组。

    let ackId = 0;
    ws.onopen = () => {
      console.log('connected');
      ws.send(JSON.stringify({
        type: 'joinGroup',
        group: 'stream',
        ackId: ++ackId
      }));
    };
    

    可以发现,客户端通过发送 joinGroup 类型的消息来加入该组。

  3. 此外,稍微更新 ws.onmessage 回调逻辑,以分析 JSON 响应并只打印来自 stream 组中的消息,使其充当实时流打印机。

    ws.onmessage = event => {
      let message = JSON.parse(event.data);
      if (message.type === 'message' && message.group === 'stream') {
        let d = document.createElement('span');
        d.innerText = message.data;
        output.appendChild(d);
        window.scrollTo(0, document.body.scrollHeight);
      }
    };
    
  4. 出于安全考虑,默认情况下,客户端不能自行发布或订阅组。 因此,你可能已注意到,我们在生成令牌时为客户端设置了 roles

    Startup.cs 中执行 GenerateClientAccessUri 时,按如下所示设置 roles

    service.GenerateClientAccessUri(roles: new string[] { "webpubsub.sendToGroup.stream", "webpubsub.joinLeaveGroup.stream" })
    
  5. 最后,还要对 index.html 应用一些样式,使它得到更好的显示。

    <html>
    
      <head>
        <style>
          #output {
            white-space: pre;
            font-family: monospace;
          }
        </style>
      </head>
    

现在,运行以下代码,键入任意文本,这些文本将在浏览器中实时显示:

ls -R | dotnet run

# Or call `dir /s /b | dotnet run` when you are using CMD under Windows

或者,你可以让它的速度放慢,以便可以看到数据实时流式传输到浏览器:

for i in $(ls -R); do echo $i; sleep 0.1; done | dotnet run

可以在此处找到本教程的完整代码示例。

后续步骤

本教程提供如何连接到 Web PubSub 服务以及如何使用子协议将消息发布到连接的客户端的基本概念。

查看其他教程,进一步深入了解如何使用该服务。