Tutorial: Publish and subscribe messages between WebSocket clients using subprotocol
Articol
In Build a chat app tutorial, you learned how to use WebSocket APIs to send and receive data with Azure Web PubSub. You can see there's no protocol needed when client is communicating with the service. For example, you can send any type of data using WebSocket.send(), and the server receives it just as it is. WebSocket APIs process is easy to use, but the functionality is limited. For example, you can't specify the event name when sending the event to your server, or publish message to other clients instead of sending it to your server. In this tutorial, you learn how to use subprotocol to extend the functionality of client.
In this tutorial, you learn how to:
Create a Web PubSub service instance
Generate the full URL to establish the WebSocket connection
Publish messages between WebSocket clients using subprotocol
If you prefer to run CLI reference commands locally, install the Azure CLI. If you're running on Windows or macOS, consider running Azure CLI in a Docker container. For more information, see How to run the Azure CLI in a Docker container.
If you're using a local installation, sign in to the Azure CLI by using the az login command. To finish the authentication process, follow the steps displayed in your terminal. For other sign-in options, see Sign in with the Azure CLI.
When you're prompted, install the Azure CLI extension on first use. For more information about extensions, see Use extensions with the Azure CLI.
Run az version to find the version and dependent libraries that are installed. To upgrade to the latest version, run az upgrade.
This setup requires version 2.22.0 or higher of the Azure CLI. If using Azure Cloud Shell, the latest version is already installed.
Create an Azure Web PubSub instance
Create a resource group
A resource group is a logical container into which Azure resources are deployed and managed. Use the az group create command to create a resource group named myResourceGroup in the eastus location.
az group create --name myResourceGroup --location EastUS
Create a Web PubSub instance
Run az extension add to install or upgrade the webpubsub extension to the current version.
az extension add --upgrade --name webpubsub
Use the Azure CLI az webpubsub create command to create a Web PubSub in the resource group you've created. The following command creates a Free Web PubSub resource under resource group myResourceGroup in EastUS:
Important
Each Web PubSub resource must have a unique name. Replace <your-unique-resource-name> with the name of your Web PubSub in the following examples.
The output of this command shows properties of the newly created resource. Take note of the two properties listed below:
Resource Name: The name you provided to the --name parameter above.
hostName: In the example, the host name is <your-unique-resource-name>.webpubsub.azure.com/.
At this point, your Azure account is the only one authorized to perform any operations on this new resource.
Get the ConnectionString for future use
Important
A connection string includes the authorization information required for your application to access Azure Web PubSub service. The access key inside the connection string is similar to a root password for your service. In production environments, always be careful to protect your access keys. Use Azure Key Vault to manage and rotate your keys securely. Avoid distributing access keys to other users, hard-coding them, or saving them anywhere in plain text that is accessible to others. Rotate your keys if you believe they may have been compromised.
Use the Azure CLI az webpubsub key command to get the ConnectionString of the service. Replace the <your-unique-resource-name> placeholder with the name of your Azure Web PubSub instance.
az webpubsub key show --resource-group myResourceGroup --name <your-unique-resource-name> --query primaryConnectionString --output tsv
Copy the connection string to use later.
Copy the fetched ConnectionString and use later in this tutorial as the value of <connection_string>.
The client can start a WebSocket connection using a specific subprotocol. Azure Web PubSub service supports a subprotocol called json.webpubsub.azure.v1 to empower the clients to do publish/subscribe directly through the Web PubSub service instead of a round trip to the upstream server. Check Azure Web PubSub supported JSON WebSocket subprotocol for details about the subprotocol.
If you use other protocol names, they will be ignored by the service and passthrough to server in the connect event handler, so you can build your own protocols.
Now let's create a web application using the json.webpubsub.azure.v1 subprotocol.
Create a server.py to host the /negotiate API and web page.
import json
import sys
from http.server import HTTPServer, SimpleHTTPRequestHandler
from azure.messaging.webpubsubservice import WebPubSubServiceClient
service = WebPubSubServiceClient.from_connection_string(sys.argv[1], hub='stream')
class Resquest(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.path = 'public/index.html'
return SimpleHTTPRequestHandler.do_GET(self)
elif self.path == '/negotiate':
roles = ['webpubsub.sendToGroup.stream',
'webpubsub.joinLeaveGroup.stream']
token = service.get_client_access_token(roles=roles)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({
'url': token['url']
}).encode())
if __name__ == '__main__':
if len(sys.argv) != 2:
print('Usage: python server.py <connection-string>')
exit(1)
server = HTTPServer(('localhost', 8080), Resquest)
print('server started')
server.serve_forever()
Let's navigate to the /src/main/java/com/webpubsub/tutorial directory, open the App.java file in your editor, use Javalin.create to serve static files:
package com.webpubsub.tutorial;
import com.azure.messaging.webpubsub.WebPubSubServiceClient;
import com.azure.messaging.webpubsub.WebPubSubServiceClientBuilder;
import com.azure.messaging.webpubsub.models.GetClientAccessTokenOptions;
import com.azure.messaging.webpubsub.models.WebPubSubClientAccessToken;
import io.javalin.Javalin;
public class App {
public static void main(String[] args) {
if (args.length != 1) {
System.out.println("Expecting 1 arguments: <connection-string>");
return;
}
// create the service client
WebPubSubServiceClient service = new WebPubSubServiceClientBuilder()
.connectionString(args[0])
.hub("chat")
.buildClient();
// start a server
Javalin app = Javalin.create(config -> {
config.addStaticFiles("public");
}).start(8080);
// Handle the negotiate request and return the token to the client
app.get("/negotiate", ctx -> {
GetClientAccessTokenOptions option = new GetClientAccessTokenOptions();
option.addRole("webpubsub.sendToGroup.stream");
option.addRole("webpubsub.joinLeaveGroup.stream");
WebPubSubClientAccessToken token = service.getClientAccessToken(option);
// return JSON string
ctx.result("{\"url\":\"" + token.getUrl() + "\"}");
return;
});
}
}
Depending on your setup, you might need to explicitly set the language level to Java 8 in the pom.xml. Add the following snippet:
Create an HTML page with below content and save it as wwwroot/index.html:
Create an HTML page with below content and save it as public/index.html:
Create an HTML page with below content and save it as public/index.html:
Create an HTML page with below content and save it to /src/main/resources/public/index.html:
<html>
<body>
<div id="output"></div>
<script>
(async function () {
let res = await fetch('/negotiate')
let data = await res.json();
let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
ws.onopen = () => {
console.log('connected');
};
let output = document.querySelector('#output');
ws.onmessage = event => {
let d = document.createElement('p');
d.innerText = event.data;
output.appendChild(d);
};
})();
</script>
</body>
</html>
The code above connects to the service and print any message received to the page. The main change is that we specify the subprotocol when creating the WebSocket connection.
We use Secret Manager tool for .NET Core to set the connection string. Run the below command, replacing <connection_string> with the one fetched in previous step, and open http://localhost:5000/index.html in browser:
dotnet user-secrets init
dotnet user-secrets set Azure:WebPubSub:ConnectionString "<connection-string>"
dotnet run
Now run the below command, replacing <connection-string> with the ConnectionString fetched in previous step, and open http://localhost:8080 in browser:
export WebPubSubConnectionString="<connection-string>"
node server
Now run the below command, replacing <connection-string> with the ConnectionString fetched in previous step, and open http://localhost:8080 in browser:
python server.py "<connection-string>"
Now run the below command, replacing <connection-string> with the ConnectionString fetched in previous step, and open http://localhost:8080 in browser:
If you're using Chrome, you can press F12 or right-click -> Inspect -> Developer Tools, and select the Network tab. Load the web page, and you can see the WebSocket connection is established. Select to inspect the WebSocket connection, you can see below connected event message is received in client. You can see that you can get the connectionId generated for this client.
You can see that with the help of subprotocol, you can get some metadata of the connection when the connection is connected.
The client now receives a JSON message instead of a plain text. JSON message contains more information such as type and source of the message. So you can use this information to do more processing to the message (for example, display the message in a different style if it's from a different source), which you can find in later sections.
Publish messages from client
In the Build a chat app tutorial, when client sends a message through WebSocket connection to the Web PubSub service, the service triggers a user event at your server side. With subprotocol, client has more functionalities by sending a JSON message. For example, you can publish messages directly from client through the Web PubSub service to other clients.
This is useful if you want to stream a large amount of data to other clients in real time. Let's use this feature to build a log streaming application, which can stream console logs to browser in real time.
using System;
using System.Net.Http;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace stream
{
class Program
{
private static readonly HttpClient http = new HttpClient();
static async Task Main(string[] args)
{
// Get client url from remote
var stream = await http.GetStreamAsync("http://localhost:5000/negotiate");
var url = (await JsonSerializer.DeserializeAsync<ClientToken>(stream)).url;
var client = new ClientWebSocket();
client.Options.AddSubProtocol("json.webpubsub.azure.v1");
await client.ConnectAsync(new Uri(url), default);
Console.WriteLine("Connected.");
var streaming = Console.ReadLine();
while (streaming != null)
{
if (!string.IsNullOrEmpty(streaming))
{
var message = JsonSerializer.Serialize(new
{
type = "sendToGroup",
group = "stream",
data = streaming + Environment.NewLine,
});
Console.WriteLine("Sending " + message);
await client.SendAsync(Encoding.UTF8.GetBytes(message), WebSocketMessageType.Text, true, default);
}
streaming = Console.ReadLine();
}
await client.CloseAsync(WebSocketCloseStatus.NormalClosure, null, default);
}
private sealed class ClientToken
{
public string url { get; set; }
}
}
}
Create a stream.js with the following content.
const WebSocket = require('ws');
const fetch = (...args) => import('node-fetch').then(({default: fetch}) => fetch(...args));
async function main() {
let res = await fetch(`http://localhost:8080/negotiate`);
let data = await res.json();
let ws = new WebSocket(data.url, 'json.webpubsub.azure.v1');
let ackId = 0;
ws.on('open', () => {
process.stdin.on('data', data => {
ws.send(JSON.stringify({
type: 'sendToGroup',
group: 'stream',
ackId: ++ackId,
dataType: 'text',
data: data.toString()
}));
});
});
ws.on('message', data => console.log("Received: %s", data));
process.stdin.on('close', () => ws.close());
}
main();
The code above creates a WebSocket connection to the service and then whenever it receives some data it uses ws.send() to publish the data. In order to publish to others, you just need to set type to sendToGroup and specify a group name in the message.
Open another bash window for the stream program, and install the websockets dependency:
mkdir stream
cd stream
# Create venv
python -m venv env
# Active venv
source ./env/bin/activate
pip install websockets
Create a stream.py with the following content.
import asyncio
import sys
import threading
import time
import websockets
import requests
import json
async def connect(url):
async with websockets.connect(url, subprotocols=['json.webpubsub.azure.v1']) as ws:
print('connected')
id = 1
while True:
data = input()
payload = {
'type': 'sendToGroup',
'group': 'stream',
'dataType': 'text',
'data': str(data + '\n'),
'ackId': id
}
id = id + 1
await ws.send(json.dumps(payload))
await ws.recv()
if __name__ == '__main__':
res = requests.get('http://localhost:8080/negotiate').json()
try:
asyncio.get_event_loop().run_until_complete(connect(res['url']))
except KeyboardInterrupt:
pass
The code above creates a WebSocket connection to the service and then whenever it receives some data it uses ws.send() to publish the data. In order to publish to others, you just need to set type to sendToGroup and specify a group name in the message.
Let's use another terminal and go back to the root folder to create a streaming console app logstream-streaming and switch into the logstream-streaming folder:
Now let's use WebSocket to connect to the service. Let's navigate to the /src/main/java/com/webpubsub/quickstart directory, open the App.java file in your editor, and replace code with the below:
package com.webpubsub.quickstart;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.WebSocket;
import java.util.concurrent.CompletionStage;
import com.google.gson.Gson;
public class App
{
public static void main( String[] args ) throws IOException, InterruptedException
{
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create("http://localhost:8080/negotiate"))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
Gson gson = new Gson();
String url = gson.fromJson(response.body(), Entity.class).url;
WebSocket ws = HttpClient.newHttpClient().newWebSocketBuilder().subprotocols("json.webpubsub.azure.v1")
.buildAsync(URI.create(url), new WebSocketClient()).join();
int id = 0;
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String streaming = reader.readLine();
App app = new App();
while (streaming != null && !streaming.isEmpty()){
String frame = gson.toJson(app.new GroupMessage(streaming + "\n", ++id));
System.out.println("Sending: " + frame);
ws.sendText(frame, true);
streaming = reader.readLine();
}
}
private class GroupMessage{
public String data;
public int ackId;
public final String type = "sendToGroup";
public final String group = "stream";
GroupMessage(String data, int ackId){
this.data = data;
this.ackId = ackId;
}
}
private static final class WebSocketClient implements WebSocket.Listener {
private WebSocketClient() {
}
@Override
public void onOpen(WebSocket webSocket) {
System.out.println("onOpen using subprotocol " + webSocket.getSubprotocol());
WebSocket.Listener.super.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
System.out.println("onText received " + data);
return WebSocket.Listener.super.onText(webSocket, data, last);
}
@Override
public void onError(WebSocket webSocket, Throwable error) {
System.out.println("Bad day! " + webSocket.toString());
WebSocket.Listener.super.onError(webSocket, error);
}
}
private static final class Entity {
public String url;
}
}
Navigate to the directory containing the pom.xml file and run the project using the below command
You can see there's a new concept "group" here. Group is logical concept in a hub where you can publish message to a group of connections. In a hub, you can have multiple groups and one client can subscribe to multiple groups at the same time. When using subprotocol, you can only publish to a group instead of broadcasting to the whole hub. For details about the terms, check the basic concepts.
Since we use group here, we also need to update the web page index.html to join the group when the WebSocket connection is established inside ws.onopen callback.
You can see client joins the group by sending a message in joinGroup type.
Also update the ws.onmessage callback logic slightly to parse the JSON response and print the messages only from stream group so that it acts as live stream printer.
ws.onmessage = event => {
let message = JSON.parse(event.data);
if (message.type === 'message' && message.group === 'stream') {
let d = document.createElement('span');
d.innerText = message.data;
output.appendChild(d);
window.scrollTo(0, document.body.scrollHeight);
}
};
For security consideration, by default a client can't publish or subscribe to a group by itself. So you noticed that we set roles to the client when generating the token:
The complete code sample of this tutorial can be found here.
Next steps
This tutorial provides you with a basic idea of how to connect to the Web PubSub service and how to publish messages to the connected clients using subprotocol.
Check other tutorials to further dive into how to use the service.