Broadcast channels in Orleans

Broadcast channels are a special type of broadcasting mechanism that can be used to send messages to all subscribers. Unlike streaming providers, broadcast channels are not persistent and don't store messages, and they're not a replacement for persistent streams. With broadcast channels, grains are implicitly subscribed to the broadcast channel and receive broadcast messages from a producer. This decouples the sender and receiver of the message, which is useful for scenarios where the sender and receiver are not known in advance.

Example scenario

Consider a scenario where you have a grain that needs to receive stock price updates from a stock price provider. The stock price provider is a background service that publishes stock price updates to a broadcast channel. Grains are implicitly subscribed to the broadcast channel and receive updated stock prices. The following diagram shows the scenario:

Stock prices diagram depicting a silo, a stock grain and consuming client in a simple broadcast channel architecture.

In the preceding diagram:

  • The silo publishes stock price updates to the broadcast channel.
  • The grain subscribes to the broadcast channel and receives stock price updates.
  • The client consumes the stock price updates from the stock grain.

The broadcast channel decouples the producer and consumer of the stock price updates. The producer publishes stock price updates to the broadcast channel, and the consumer subscribes to the broadcast channel and receives stock price updates.

Define a consumer grain

To consume broadcast channel messages, your grain needs to implement the IOnBroadcastChannelSubscribed interface. Your implementation will use the IBroadcastChannelSubscription.Attach method to attach to the broadcast channel. The Attach method takes a generic-type parameter for the message type you're going to receive. The following example shows a grain that subscribes to a broadcast channel of type Stock:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

In the preceding code:

  • The LiveStockGrain grain implements the IOnBroadcastChannelSubscribed interface.
  • The OnSubscribed method is called when the grain subscribes to the broadcast channel.
  • The subscription parameter is used to call the Attach method to attach to the broadcast channel.
    • The OnStockUpdated method is passed to Attach as a callback that fires when the Stock message is received.
    • The OnError method is passed to Attach as a callback that fires when an error occurs.

This example grain will contain the latest stock prices as published on the broadcast channel. Any client that asks this grain for the latest stock price will get the latest price from the broadcast channel.

Publish messages to a broadcast channel

To publish messages to the broadcast channel, you need to get a reference to the broadcast channel. To do this, you need to get the IBroadcastChannelProvider from the IClusterClient. With the provider, you can call the IBroadcastChannelProvider.GetChannelWriter method to get an instance of IBroadcastChannelWriter<T>. The writer is used to publish messages to the broadcast channel. The following example shows how to publish messages to the broadcast channel:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

In the preceding code:

  • The StockWorker class is a background service that publishes messages to the broadcast channel.
  • The constructor takes an IStockClient and IClusterClient as parameters.
  • From the cluster client instance, the GetBroadcastChannelProvider method is used to get the broadcast channel provider.
  • Using the IStockClient, the StockWorker class gets the latest stock price for a stock symbol.
  • Every 15 seconds, the StockWorker class publishes a Stock message to the broadcast channel.

The publishing of messages to a broadcast channel is decoupled from the consumer grain. The consumer grain subscribes to the broadcast channel and receives messages from the broadcast channel. The producer lives in a silo and is responsible for publishing messages to the broadcast channel and doesn't know anything about consuming grains.

See also