Freigeben über


Übertragungskanäle in Orleans

Übertragungskanäle sind ein spezieller Übertragungsmechanismus, der zum Senden von Nachrichten an alle Abonnenten verwendet werden kann. Im Gegensatz zu Streaminganbietern sind Übertragungskanäle nicht persistent und speichern keine Nachrichten. Sie sind daher kein Ersatz für persistente Streams. Bei Übertragungskanälen werden Grains implizit für den Übertragungskanal abonniert und empfangen Übertragungsnachrichten von einem Producer. Dadurch werden Absender und Empfänger der Nachricht entkoppelt, was für Szenarien nützlich ist, in denen Absender und Empfänger nicht im Voraus bekannt sind.

Beispielszenario

Betrachten Sie ein Szenario, in dem Sie über einen Grain verfügen, der Aktienkursaktualisierungen von einem Aktienkursanbieter erhalten muss. Der Aktienkursanbieter ist ein Hintergrunddienst, der Aktienkursupdates in einem Übertragungskanal veröffentlicht. Grains abonnieren implizit den Übertragungskanal und erhalten aktualisierte Aktienkurse. Das Szenario wird in der folgenden Abbildung veranschaulicht:

Stock prices diagram depicting a silo, a stock grain and consuming client in a simple broadcast channel architecture.

Im obigen Diagramm ist Folgendes zu sehen:

  • Das Silo veröffentlicht Aktienkursupdates im Übertragungskanal.
  • Der Grain abonniert den Übertragungskanal und erhält Aktienkursupdates.
  • Der Client nutzt die Aktienkursupdates aus dem Börsengrain.

Der Übertragungskanal entkoppelt Producer und Consumer der Aktienkursaktualisierungen. Der Producer veröffentlicht Aktienkursupdates im Übertragungskanal, und der Consumer abonniert den Übertragungskanal und erhält Aktienkursaktualisierungen.

Definieren eines Consumergrains

Um Übertragungskanäle nutzen zu können, muss Ihr Grain die IOnBroadcastChannelSubscribed-Schnittstelle implementieren. Ihre Implementierung verwendet die IBroadcastChannelSubscription.Attach-Methode zum Anfügen an den Übertragungskanal. Die Attach-Methode nimmt einen generischen Parameter für den Nachrichtentyp an, den Sie empfangen möchten. Das folgende Beispiel zeigt einen Grain, der einen Übertragungskanal vom Typ Stock abonniert:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

Im obigen Code:

  • Der Grain LiveStockGrain implementiert die IOnBroadcastChannelSubscribed-Schnittstelle.
  • Die OnSubscribed-Methode wird aufgerufen, wenn der Grain den Übertragungskanal abonniert.
  • Der subscription-Parameter wird verwendet, um die Attach-Methode aufzurufen, die an den Übertragungskanal angefügt werden soll.
    • Die OnStockUpdated-Methode wird als Rückruf übergeben Attach, der ausgelöst wird, wenn die Nachricht Stock empfangen wird.
    • Die OnError-Methode wird als Rückruf an Attach übergeben, der ausgelöst wird, wenn ein Fehler auftritt.

Dieser Beispielgrain enthält die aktuellen Aktienkurse, die im Übertragungskanal veröffentlicht wurden. Jeder Client, der diesen Grain nach dem neuesten Aktienkurs abfragt, erhält den neuesten Preis vom Übertragungskanal.

Veröffentlichen von Nachrichten in einem Übertragungskanal

Um Nachrichten im Übertragungskanal zu veröffentlichen, müssen Sie einen Verweis auf den Übertragungskanal abrufen. Dazu müssen Sie IBroadcastChannelProvider von IClusterClient abrufen. Beim Anbieter können Sie die IBroadcastChannelProvider.GetChannelWriter-Methode aufrufen, um eine Instanz von IBroadcastChannelWriter<T> abzurufen. Der Writer wird verwendet, um Nachrichten im Übertragungskanal zu veröffentlichen. Das folgende Beispiel zeigt, wie Nachrichten im Übertragungskanal veröffentlicht werden:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

Im obigen Code:

  • Die StockWorker-Klasse ist ein Hintergrunddienst, der Nachrichten im Übertragungskanal veröffentlicht.
  • Der Konstruktor akzeptiert IStockClient und IClusterClient als Parameter.
  • In der Clusterclientinstanz wird die GetBroadcastChannelProvider-Methode verwendet, um den Übertragungskanalanbieter abzurufen.
  • Mithilfe von IStockClient erhält die StockWorker-Klasse den aktuellen Aktienkurs für ein Aktiensymbol.
  • Alle 15 Sekunden veröffentlicht die StockWorker-Klasse eine Stock-Nachricht im Übertragungskanal.

Die Veröffentlichung von Nachrichten in einem Übertragungskanal ist vom Consumergrain entkoppelt. Der Consumergrain abonniert den Übertragungskanal und empfängt Nachrichten vom Übertragungskanal. Der Producer befindet sich in einem Silo, ist für die Veröffentlichung von Nachrichten im Übertragungskanal verantwortlich und weiß nichts über die Verbrauchergrains.

Siehe auch