Kanały nadawania w programie Orleans

Kanały emisji to specjalny typ mechanizmu emisji, który może służyć do wysyłania komunikatów do wszystkich subskrybentów. W przeciwieństwie do dostawców przesyłania strumieniowego kanały emisji nie są trwałe i nie przechowują komunikatów i nie zastępują trwałych strumieni. W przypadku kanałów transmisji ziarna są automatycznie subskrybowane na kanał transmisji i otrzymują komunikaty transmisji od producenta. Spowoduje to oddzielenie nadawcy i odbiorcy komunikatu, co jest przydatne w scenariuszach, w których nadawca i odbiorca nie są znane z wyprzedzeniem.

Aby użyć kanału emisji, należy skonfigurować strumienie Orleans, a następnie włączyć emisję na kanale za pomocą AddBroadcastChannel podczas konfiguracji silosu.

siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);

Przykładowy scenariusz

Rozważmy scenariusz, w którym masz ziarno, które musi otrzymywać aktualizacje cen akcji od dostawcy cen akcji. Dostawca cen akcji to usługa w tle, która publikuje aktualizacje cen akcji w kanale emisji. Ziarna są subskrybowane niejawnie do kanału nadawczego i otrzymują zaktualizowane ceny akcji. Na poniższym diagramie przedstawiono scenariusz:

Diagram cen zapasów przedstawiający silos, ziarna zapasów i zużywającego klienta w prostej architekturze kanału emisji.

Na powyższym diagramie:

  • Silos publikuje aktualizacje cen akcji do kanału emisji.
  • Ziarno subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.
  • Klient korzysta z aktualizacji cen akcji z ziarna zapasów.

Kanał emisji rozdziela producenta i konsumenta aktualizacji cen akcji. Producent publikuje aktualizacje cen akcji w kanale emisji, a konsument subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.

Definiowanie ziarna konsumenta

Aby korzystać z komunikatów kanału emisji, ziarno musi zaimplementować IOnBroadcastChannelSubscribed interfejs. Ten interfejs umożliwia niejawne subskrypcje, co oznacza, że ziarna są automatycznie subskrybowane do kanału emisji po aktywowaniu. Twoja implementacja używa metody IBroadcastChannelSubscription.Attach, aby dołączyć do kanału emisji. Metoda Attach przyjmuje parametr typu ogólnego dla typu komunikatu, który ma zostać odebrany.

Najpierw zdefiniuj interfejs ziarna używany przez użytkowników do interakcji z ziarnem:

namespace BroadcastChannel.GrainInterfaces;

public interface ILiveStockGrain : IGrainWithGuidKey
{
    ValueTask<Stock> GetStock(StockSymbol symbol);
}

Interfejs ILiveStockGrain używa elementu IGrainWithGuidKey, co oznacza, że ziarno jest identyfikowane przez klucz GUID. Następnie zaimplementuj ziarno, które subskrybuje kanał emisji:

using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo;

[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
    Grain,
    ILiveStockGrain,
    IOnBroadcastChannelSubscribed
{
    private readonly IDictionary<StockSymbol, Stock> _stockCache =
        new ConcurrentDictionary<StockSymbol, Stock>();

    public ValueTask<Stock> GetStock(StockSymbol symbol) =>
        _stockCache.TryGetValue(symbol, out Stock? stock) is false
            ? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
            : new ValueTask<Stock>(stock);

    public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
        subscription.Attach<Stock>(OnStockUpdated, OnError);

    private Task OnStockUpdated(Stock stock)
    {
        if (stock is { GlobalQuote: { } })
        {
            _stockCache[stock.GlobalQuote.Symbol] = stock;
        }

        return Task.CompletedTask;
    }

    private static Task OnError(Exception ex)
    {
        Console.Error.WriteLine($"An error occurred: {ex}");

        return Task.CompletedTask;
    }
}

W poprzednim kodzie:

  • Ziarno LiveStockGrain implementuje IOnBroadcastChannelSubscribed interfejs.
  • Atrybut [ImplicitChannelSubscription] oznacza ten element do automatycznej subskrypcji kanałów transmisji.
  • Metoda OnSubscribed jest wywoływana automatycznie po aktywowaniu ziarna (po pierwszym użyciu lub po odzyskiwaniu po awarii).
  • Parametr subscription służy do wywołania metody Attach w celu przyłączenia do kanału emisji.
    • Metoda OnStockUpdated jest przekazywana do Attach jako wywołanie zwrotne, które uruchamia się po odebraniu komunikatu Stock.
    • Metoda OnError jest przekazywana do Attach jako wywołanie zwrotne uruchamiane, kiedy wystąpi błąd.

To przykładowe ziarno zawiera najnowsze ceny akcji opublikowane w kanale emisji. Każdy klient, który prosi o to ziarno dla najnowszej ceny akcji dostaje najnowszą cenę z kanału emisji.

Publikowanie komunikatów w kanale emisji

Aby opublikować komunikaty w kanale emisji, musisz uzyskać odwołanie do kanału emisji. W tym celu pobierz IBroadcastChannelProvider z IClusterClient. Za pomocą dostawcy wywołaj metodę IBroadcastChannelProvider.GetChannelWriter, aby uzyskać wystąpienie IBroadcastChannelWriter<T>. Autor używa się do publikowania komunikatów w kanale transmisji.

Najpierw zdefiniuj stałą dla nazwy kanału, aby upewnić się, że producent i konsumenci używają tego samego identyfikatora kanału:

namespace BroadcastChannel.GrainInterfaces;

public sealed class ChannelNames
{
    public const string LiveStockTicker = "live-stock-ticker";
}

Następnie utwórz wydawcę, który wysyła komunikaty do kanału emisji:

using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;

namespace BroadcastChannel.Silo.Services;

internal sealed class StockWorker : BackgroundService
{
    private readonly StockClient _stockClient;
    private readonly IBroadcastChannelProvider _provider;
    private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();

    public StockWorker(
        StockClient stockClient, IClusterClient clusterClient) =>
        (_stockClient, _provider) =
        (stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            // Capture the starting timestamp.
            long startingTimestamp = Stopwatch.GetTimestamp();

            // Get all updated stock values.
            Stock[] stocks = await Task.WhenAll(
                tasks: _symbols.Select(selector: _stockClient.GetStockAsync));

            // Get the live stock ticker broadcast channel.
            ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
            IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);

            // Broadcast all stock updates on this channel.
            await Task.WhenAll(
                stocks.Where(s => s is not null).Select(channelWriter.Publish));

            // Use the elapsed time to calculate a 15 second delay.
            int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
            int remaining = Math.Max(0, 15_000 - elapsed);

            await Task.Delay(remaining, stoppingToken);
        }
    }
}

W poprzednim kodzie:

  • Klasa StockWorker to usługa w tle, która publikuje komunikaty w kanale emisji.
  • Konstruktor przyjmuje StockClient i IClusterClient jako parametry.
  • W instancji klienta klastra metoda GetBroadcastChannelProvider jest używana do uzyskania dostawcy kanału transmisyjnego dla kanału LiveStockTicker.
  • Metoda ChannelId.Create tworzy identyfikator kanału przy użyciu:
    • Nazwa kanału (ChannelNames.LiveStockTicker) — musi być zgodna z nazwą używaną podczas konfigurowania kanału emisji w konfiguracji silosu.
    • Guid.Empty jako przestrzeń nazw — w przypadku kanałów emisji wszyscy subskrybenci otrzymują wszystkie komunikaty, więc przestrzeń nazw jest zwykle ustawiona na Guid.Empty wartość wskazującą pojedynczą emisję udostępnioną.
  • Korzystając z StockClient, klasa StockWorker pobiera najnowszą cenę akcji dla każdego symbolu akcji.
  • Co 15 sekund StockWorker klasa publikuje Stock komunikaty w kanale emisji.

Publikowanie komunikatów w kanale emisji jest oddzielone od ziarna konsumenta. Producent nie wie o konkretnych ziarnach konsumentów. Zamiast tego publikuje w kanale nadawczym, a wszystkie niejawnie subskrybowane jednostki automatycznie odbierają komunikaty.

Kanały emisji a strumienie

Kanały emisji i Orleans strumienie (w tym strumienie w pamięci) są mechanizmami przesyłu wiadomości, ale służą różnym celom i mają różne cechy. W poniższej tabeli porównaliśmy kluczowe różnice:

Funkcja Kanały emisji Orleans Strumienie danych
Model subskrypcji Niejawne — ziarna są automatycznie subskrybowane po aktywowaniu Jawne — ziarna muszą jawnie subskrybować strumienie
Trwałość komunikatów Nietrwałe — komunikaty są tracone, jeśli żadni subskrybenci nie są aktywni Może być trwały (kolejki platformy Azure, usługa Event Hubs) lub przejściowy (w pamięci)
Dostarczanie komunikatów W najlepszym razie, wystrzel i zapomnij transmisja Zależy od dostawcy — może obsługiwać co najmniej jednokrotne lub dokładnie jednokrotne dostarczanie
Przypadek użycia Nadawanie tej samej wiadomości wszystkim zainteresowanym ziarnom w czasie rzeczywistym Obsługa komunikatów typu punkt-punkt lub pub-sub z gwarancjami dostarczania
Historia wiadomości Brak historii komunikatów — tylko bieżące emisje Strumienie mogą obsługiwać subskrypcje z możliwością przewijania wstecz z historią wiadomości.
Skalowalność Zoptymalizowane pod kątem dystrybucji do wielu konsumentów Zoptymalizowane pod kątem przetwarzania opartego na kolejce z zastosowaniem backpressure
Cykl życia klienta Użytkownicy są niejawnie zarządzani przez Orleans Konsumenci muszą zarządzać cyklem życia subskrypcji
Configuration Proste — wymaga tylko nazwy kanału Bardziej złożone — wymaga konfiguracji dostawcy strumienia

Kiedy należy używać kanałów emisji

Użyj kanałów emisji, gdy:

  • Musisz wysłać ten sam komunikat do wszystkich wystąpień typu ziarna.
  • Dostarczanie komunikatów nie jest krytyczne (od czasu do czasu straty są akceptowalne).
  • Potrzebujesz niejawnej subskrypcji bez zarządzania cyklem życia subskrypcji.
  • Potrzebujesz aktualizacji w czasie rzeczywistym bez historii komunikatów.
  • Potrzebujesz prostej konfiguracji i ustawienia.

Kiedy należy używać strumieni

Użyj strumieni, gdy:

  • Potrzebujesz gwarantowanego dostarczania komunikatów.
  • Potrzebujesz funkcji trwałości i odtwarzania komunikatów.
  • Potrzebujesz jawnej kontroli nad cyklem życia subskrypcji.
  • Potrzebujesz mechanizmów kontrolowania ciśnienia wstecznego i przepływu.
  • Wzorzec obsługi komunikatów to punkt-punkt lub wymaga bardziej złożonego routingu.
  • Integrujesz się z zewnętrznymi systemami kolejkowania (Event Hubs, Service Bus, Kafka).

Zobacz też