Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Kanały emisji to specjalny typ mechanizmu emisji, który może służyć do wysyłania komunikatów do wszystkich subskrybentów. W przeciwieństwie do dostawców przesyłania strumieniowego kanały emisji nie są trwałe i nie przechowują komunikatów i nie zastępują trwałych strumieni. W przypadku kanałów transmisji ziarna są automatycznie subskrybowane na kanał transmisji i otrzymują komunikaty transmisji od producenta. Spowoduje to oddzielenie nadawcy i odbiorcy komunikatu, co jest przydatne w scenariuszach, w których nadawca i odbiorca nie są znane z wyprzedzeniem.
Aby użyć kanału emisji, należy skonfigurować strumienie Orleans, a następnie włączyć emisję na kanale za pomocą AddBroadcastChannel podczas konfiguracji silosu.
siloBuilder.AddBroadcastChannel(OrleansBroadcastChannelNames.ReadmodelChanges);
Przykładowy scenariusz
Rozważmy scenariusz, w którym masz ziarno, które musi otrzymywać aktualizacje cen akcji od dostawcy cen akcji. Dostawca cen akcji to usługa w tle, która publikuje aktualizacje cen akcji w kanale emisji. Ziarna są subskrybowane niejawnie do kanału nadawczego i otrzymują zaktualizowane ceny akcji. Na poniższym diagramie przedstawiono scenariusz:
Na powyższym diagramie:
- Silos publikuje aktualizacje cen akcji do kanału emisji.
- Ziarno subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.
- Klient korzysta z aktualizacji cen akcji z ziarna zapasów.
Kanał emisji rozdziela producenta i konsumenta aktualizacji cen akcji. Producent publikuje aktualizacje cen akcji w kanale emisji, a konsument subskrybuje kanał emisji i otrzymuje aktualizacje cen akcji.
Definiowanie ziarna konsumenta
Aby korzystać z komunikatów kanału emisji, ziarno musi zaimplementować IOnBroadcastChannelSubscribed interfejs. Ten interfejs umożliwia niejawne subskrypcje, co oznacza, że ziarna są automatycznie subskrybowane do kanału emisji po aktywowaniu. Twoja implementacja używa metody IBroadcastChannelSubscription.Attach, aby dołączyć do kanału emisji. Metoda Attach przyjmuje parametr typu ogólnego dla typu komunikatu, który ma zostać odebrany.
Najpierw zdefiniuj interfejs ziarna używany przez użytkowników do interakcji z ziarnem:
namespace BroadcastChannel.GrainInterfaces;
public interface ILiveStockGrain : IGrainWithGuidKey
{
ValueTask<Stock> GetStock(StockSymbol symbol);
}
Interfejs ILiveStockGrain używa elementu IGrainWithGuidKey, co oznacza, że ziarno jest identyfikowane przez klucz GUID. Następnie zaimplementuj ziarno, które subskrybuje kanał emisji:
using System.Collections.Concurrent;
using BroadcastChannel.GrainInterfaces;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo;
[ImplicitChannelSubscription]
public sealed class LiveStockGrain :
Grain,
ILiveStockGrain,
IOnBroadcastChannelSubscribed
{
private readonly IDictionary<StockSymbol, Stock> _stockCache =
new ConcurrentDictionary<StockSymbol, Stock>();
public ValueTask<Stock> GetStock(StockSymbol symbol) =>
_stockCache.TryGetValue(symbol, out Stock? stock) is false
? new ValueTask<Stock>(Task.FromException<Stock>(new KeyNotFoundException()))
: new ValueTask<Stock>(stock);
public Task OnSubscribed(IBroadcastChannelSubscription subscription) =>
subscription.Attach<Stock>(OnStockUpdated, OnError);
private Task OnStockUpdated(Stock stock)
{
if (stock is { GlobalQuote: { } })
{
_stockCache[stock.GlobalQuote.Symbol] = stock;
}
return Task.CompletedTask;
}
private static Task OnError(Exception ex)
{
Console.Error.WriteLine($"An error occurred: {ex}");
return Task.CompletedTask;
}
}
W poprzednim kodzie:
- Ziarno
LiveStockGrainimplementujeIOnBroadcastChannelSubscribedinterfejs. - Atrybut
[ImplicitChannelSubscription]oznacza ten element do automatycznej subskrypcji kanałów transmisji. - Metoda
OnSubscribedjest wywoływana automatycznie po aktywowaniu ziarna (po pierwszym użyciu lub po odzyskiwaniu po awarii). - Parametr
subscriptionsłuży do wywołania metodyAttachw celu przyłączenia do kanału emisji.- Metoda
OnStockUpdatedjest przekazywana doAttachjako wywołanie zwrotne, które uruchamia się po odebraniu komunikatuStock. - Metoda
OnErrorjest przekazywana doAttachjako wywołanie zwrotne uruchamiane, kiedy wystąpi błąd.
- Metoda
To przykładowe ziarno zawiera najnowsze ceny akcji opublikowane w kanale emisji. Każdy klient, który prosi o to ziarno dla najnowszej ceny akcji dostaje najnowszą cenę z kanału emisji.
Publikowanie komunikatów w kanale emisji
Aby opublikować komunikaty w kanale emisji, musisz uzyskać odwołanie do kanału emisji. W tym celu pobierz IBroadcastChannelProvider z IClusterClient. Za pomocą dostawcy wywołaj metodę IBroadcastChannelProvider.GetChannelWriter, aby uzyskać wystąpienie IBroadcastChannelWriter<T>. Autor używa się do publikowania komunikatów w kanale transmisji.
Najpierw zdefiniuj stałą dla nazwy kanału, aby upewnić się, że producent i konsumenci używają tego samego identyfikatora kanału:
namespace BroadcastChannel.GrainInterfaces;
public sealed class ChannelNames
{
public const string LiveStockTicker = "live-stock-ticker";
}
Następnie utwórz wydawcę, który wysyła komunikaty do kanału emisji:
using System.Diagnostics;
using BroadcastChannel.GrainInterfaces;
using Microsoft.Extensions.Hosting;
using Orleans.BroadcastChannel;
namespace BroadcastChannel.Silo.Services;
internal sealed class StockWorker : BackgroundService
{
private readonly StockClient _stockClient;
private readonly IBroadcastChannelProvider _provider;
private readonly List<StockSymbol> _symbols = Enum.GetValues<StockSymbol>().ToList();
public StockWorker(
StockClient stockClient, IClusterClient clusterClient) =>
(_stockClient, _provider) =
(stockClient, clusterClient.GetBroadcastChannelProvider(ChannelNames.LiveStockTicker));
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
// Capture the starting timestamp.
long startingTimestamp = Stopwatch.GetTimestamp();
// Get all updated stock values.
Stock[] stocks = await Task.WhenAll(
tasks: _symbols.Select(selector: _stockClient.GetStockAsync));
// Get the live stock ticker broadcast channel.
ChannelId channelId = ChannelId.Create(ChannelNames.LiveStockTicker, Guid.Empty);
IBroadcastChannelWriter<Stock> channelWriter = _provider.GetChannelWriter<Stock>(channelId);
// Broadcast all stock updates on this channel.
await Task.WhenAll(
stocks.Where(s => s is not null).Select(channelWriter.Publish));
// Use the elapsed time to calculate a 15 second delay.
int elapsed = Stopwatch.GetElapsedTime(startingTimestamp).Milliseconds;
int remaining = Math.Max(0, 15_000 - elapsed);
await Task.Delay(remaining, stoppingToken);
}
}
}
W poprzednim kodzie:
- Klasa
StockWorkerto usługa w tle, która publikuje komunikaty w kanale emisji. - Konstruktor przyjmuje
StockClienti IClusterClient jako parametry. - W instancji klienta klastra metoda GetBroadcastChannelProvider jest używana do uzyskania dostawcy kanału transmisyjnego dla kanału
LiveStockTicker. - Metoda
ChannelId.Createtworzy identyfikator kanału przy użyciu:- Nazwa kanału (
ChannelNames.LiveStockTicker) — musi być zgodna z nazwą używaną podczas konfigurowania kanału emisji w konfiguracji silosu. -
Guid.Emptyjako przestrzeń nazw — w przypadku kanałów emisji wszyscy subskrybenci otrzymują wszystkie komunikaty, więc przestrzeń nazw jest zwykle ustawiona naGuid.Emptywartość wskazującą pojedynczą emisję udostępnioną.
- Nazwa kanału (
- Korzystając z
StockClient, klasaStockWorkerpobiera najnowszą cenę akcji dla każdego symbolu akcji. - Co 15 sekund
StockWorkerklasa publikujeStockkomunikaty w kanale emisji.
Publikowanie komunikatów w kanale emisji jest oddzielone od ziarna konsumenta. Producent nie wie o konkretnych ziarnach konsumentów. Zamiast tego publikuje w kanale nadawczym, a wszystkie niejawnie subskrybowane jednostki automatycznie odbierają komunikaty.
Kanały emisji a strumienie
Kanały emisji i Orleans strumienie (w tym strumienie w pamięci) są mechanizmami przesyłu wiadomości, ale służą różnym celom i mają różne cechy. W poniższej tabeli porównaliśmy kluczowe różnice:
| Funkcja | Kanały emisji | Orleans Strumienie danych |
|---|---|---|
| Model subskrypcji | Niejawne — ziarna są automatycznie subskrybowane po aktywowaniu | Jawne — ziarna muszą jawnie subskrybować strumienie |
| Trwałość komunikatów | Nietrwałe — komunikaty są tracone, jeśli żadni subskrybenci nie są aktywni | Może być trwały (kolejki platformy Azure, usługa Event Hubs) lub przejściowy (w pamięci) |
| Dostarczanie komunikatów | W najlepszym razie, wystrzel i zapomnij transmisja | Zależy od dostawcy — może obsługiwać co najmniej jednokrotne lub dokładnie jednokrotne dostarczanie |
| Przypadek użycia | Nadawanie tej samej wiadomości wszystkim zainteresowanym ziarnom w czasie rzeczywistym | Obsługa komunikatów typu punkt-punkt lub pub-sub z gwarancjami dostarczania |
| Historia wiadomości | Brak historii komunikatów — tylko bieżące emisje | Strumienie mogą obsługiwać subskrypcje z możliwością przewijania wstecz z historią wiadomości. |
| Skalowalność | Zoptymalizowane pod kątem dystrybucji do wielu konsumentów | Zoptymalizowane pod kątem przetwarzania opartego na kolejce z zastosowaniem backpressure |
| Cykl życia klienta | Użytkownicy są niejawnie zarządzani przez Orleans | Konsumenci muszą zarządzać cyklem życia subskrypcji |
| Configuration | Proste — wymaga tylko nazwy kanału | Bardziej złożone — wymaga konfiguracji dostawcy strumienia |
Kiedy należy używać kanałów emisji
Użyj kanałów emisji, gdy:
- Musisz wysłać ten sam komunikat do wszystkich wystąpień typu ziarna.
- Dostarczanie komunikatów nie jest krytyczne (od czasu do czasu straty są akceptowalne).
- Potrzebujesz niejawnej subskrypcji bez zarządzania cyklem życia subskrypcji.
- Potrzebujesz aktualizacji w czasie rzeczywistym bez historii komunikatów.
- Potrzebujesz prostej konfiguracji i ustawienia.
Kiedy należy używać strumieni
Użyj strumieni, gdy:
- Potrzebujesz gwarantowanego dostarczania komunikatów.
- Potrzebujesz funkcji trwałości i odtwarzania komunikatów.
- Potrzebujesz jawnej kontroli nad cyklem życia subskrypcji.
- Potrzebujesz mechanizmów kontrolowania ciśnienia wstecznego i przepływu.
- Wzorzec obsługi komunikatów to punkt-punkt lub wymaga bardziej złożonego routingu.
- Integrujesz się z zewnętrznymi systemami kolejkowania (Event Hubs, Service Bus, Kafka).