Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Anwendungen interagieren mit Datenströmen über APIs, die den bekannten reaktiven Erweiterungen (Rx) in .NET sehr ähnlich sind. Der Hauptunterschied besteht darin, dass Orleans Streamerweiterungen asynchron sind, um die Verarbeitung in Orleansverteilten und skalierbaren Compute Fabric effizienter zu gestalten.
AsyncStream
Sie verwenden zunächst einen Streamanbieter, um ein Handle zu einem Stream abzurufen. Sie können sich einen Streamanbieter als eine Streamfabrik vorstellen, mit dem Implementierer das Verhalten und die Semantik von Streams anpassen können.
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId = StreamId.Create("MyStreamNamespace", Guid);
IAsyncStream<T> stream = streamProvider.GetStream<T>(streamId);
IStreamProvider streamProvider = base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream = streamProvider.GetStream<T>(Guid, "MyStreamNamespace");
Sie können einen Verweis auf den Datenstromanbieter abrufen, indem Sie entweder die Grain.GetStreamProvider Methode innerhalb eines Grain aufrufen oder die GetStreamProvider
Methode auf der Clientinstanz anwenden.
Orleans.Streams.IAsyncStream<T> ist ein logischer, stark typisierter Handle zu einem virtuellen Stream, ähnlich wie eine Orleans Grain-Referenz. Die Aufrufe von GetStreamProvider
und GetStream
sind rein lokal. Die Argumente für GetStream
sind eine GUID und eine zusätzliche Zeichenfolge, die als Strom-Namespace bezeichnet wird (die null sein kann). Zusammen umfassen die GUID und die Namespacezeichenfolge die Datenstromidentität (ähnlich den Argumenten für IGrainFactory.GetGrain). Diese Kombination bietet zusätzliche Flexibilität beim Bestimmen von Datenstromidentitäten. Genau wie Korn 7 innerhalb des PlayerGrain
-Typs existieren kann und ein anderes Korn 7 innerhalb des ChatRoomGrain
-Typs, kann Stream 123 innerhalb des PlayerEventsStream
-Namespaces existieren und ein anderer Stream 123 innerhalb des ChatRoomMessagesStream
-Namespaces.
Erzeugung und Nutzung
IAsyncStream<T> implementiert sowohl die Schnittstelle IAsyncObserver<T> als auch die Schnittstelle IAsyncObservable<T>. Auf diese Weise kann Ihre Anwendung den Datenstrom verwenden, um mithilfe von IAsyncObserver<T>
neue Ereignisse zu erstellen oder Ereignisse zu abonnieren und zu nutzen IAsyncObservable<T>
.
public interface IAsyncObserver<in T>
{
Task OnNextAsync(T item, StreamSequenceToken token = null);
Task OnCompletedAsync();
Task OnErrorAsync(Exception ex);
}
public interface IAsyncObservable<T>
{
Task<StreamSubscriptionHandle<T>> SubscribeAsync(IAsyncObserver<T> observer);
}
Um Ereignisse im Stream zu erzeugen, ruft Ihre Anwendung Folgendes auf:
await stream.OnNextAsync<T>(event)
Um einen Stream zu abonnieren, ruft Ihre Anwendung Folgendes auf:
StreamSubscriptionHandle<T> subscriptionHandle = await stream.SubscribeAsync(IAsyncObserver)
Das Argument SubscribeAsync kann entweder ein Objekt sein, das die IAsyncObserver<T> Schnittstelle implementiert, oder eine Kombination aus Lambda-Funktionen zum Verarbeiten eingehender Ereignisse. Weitere Optionen für SubscribeAsync
sind über die AsyncObservableExtensions Klasse verfügbar.
SubscribeAsync
gibt ein StreamSubscriptionHandle<T>undurchsichtiges Handle zurück, das zum Kündigen des Datenstroms verwendet wird (ähnlich einer asynchronen Version von IDisposable).
await subscriptionHandle.UnsubscribeAsync()
Es ist wichtig zu beachten, dass das Abonnement für ein Korn gilt, nicht für die Aktivierung. Sobald der Grain-Code den Datenstrom abonniert hat, überdauert dieses Abonnement die Lebensdauer dieser Aktivierung und bleibt dauerhaft aktiv, bis der Grain-Code (möglicherweise in einer anderen Aktivierung) explizit abbestellt wird. Dies ist der Kern der virtuellen Streamabstraktion: Nicht nur alle Streams existieren immer logisch, sondern ein Streamabonnement ist auch dauerhaft und lebt über die bestimmte physische Aktivierung hinaus, die sie erstellt hat.
Multiplizität
Ein Orleans Stream kann mehrere Produzenten und mehrere Verbraucher haben. Eine von einem Produzenten veröffentlichte Nachricht wird an alle Verbraucher übermittelt, die den Stream abonniert haben, bevor die Nachricht veröffentlicht wurde.
Darüber hinaus kann ein Verbraucher denselben Datenstrom mehrmals abonnieren. Jedes Mal, wenn es abonniert, bekommt es ein einzigartiges StreamSubscriptionHandle<T> zurück. Wenn ein Korn (oder Client) X-mal denselben Datenstrom abonniert, erhält es dasselbe Ereignis X-mal, einmal für jedes Abonnement. Der Consumer kann auch ein einzelnes Abonnement abbestellen. Sie können alle aktuellen Abonnements finden, indem Sie anrufen:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Wiederherstellung bei Fehlern
Wenn der Produzent eines Datenstroms stirbt (oder sein Element deaktiviert ist), muss nichts unternommen werden. Wenn diese Komponente das nächste Mal mehr Ereignisse produzieren möchte, kann sie das Datenstrom-Handle wieder abrufen und wie gewohnt neue Ereignisse erzeugen.
Die Verbraucherlogik ist etwas stärker beteiligt. Wie bereits erwähnt, bleibt das Abonnement eines Konsumentenkorns für einen Stream gültig, bis das Korn sich ausdrücklich abmeldet. Wenn der Verbraucher des Datenstroms stirbt (oder sein Korn deaktiviert wird) und ein neues Ereignis im Datenstrom generiert wird, reaktiviert das Verbraucherkorn automatisch (genau wie jedes normale Orleans Korn wird automatisch aktiviert, wenn eine Nachricht an ihn gesendet wird). Das einzige, was der „grain code“ jetzt tun muss, ist, ein IAsyncObserver<T> bereitzustellen, um die Daten zu verarbeiten. Der Consumer muss die Verarbeitungslogik als Teil der OnActivateAsync()-Methode neu anfügen. Dazu kann folgendes aufgerufen werden:
StreamSubscriptionHandle<int> newHandle =
await subscriptionHandle.ResumeAsync(IAsyncObserver);
Der Benutzer verwendet das frühere Handle, das während des ursprünglichen Abonnements abgerufen wurde, um die Verarbeitung fortzusetzen. Beachten Sie, dass ResumeAsync nur ein vorhandenes Abonnement mit der neuen Instanz der IAsyncObserver
Logik aktualisiert wird und die Tatsache, dass dieser Verbraucher bereits diesen Stream abonniert hat, nicht ändert.
Wie bekommt der Verbraucher das alte subscriptionHandle
-Gerät? Es stehen zwei Optionen zur Verfügung. Der Verbraucher hat möglicherweise den vom ursprünglichen SubscribeAsync
Vorgang zurückgegebenen Verweis beibehalten und kann ihn nun verwenden. Wenn der Verbraucher nicht über das Handle verfügt, kann er auch die IAsyncStream<T>
um eine Liste seiner gesamten aktiven Abonnement-Handles bitten, indem er Folgendes aufruft:
IList<StreamSubscriptionHandle<T>> allMyHandles =
await IAsyncStream<T>.GetAllSubscriptionHandles();
Der Verbraucher kann dann alle von ihnen fortsetzen oder bei Bedarf kündigen.
Tipp
Wenn das Verbraucherkorn die IAsyncObserver<T> Schnittstelle direkt (public class MyGrain<T> : Grain, IAsyncObserver<T>
) implementiert, sollte es theoretisch nicht das IAsyncObserver
erneut anfügen müssen und müsste daher auch nicht ResumeAsync
aufrufen. Die Streaming-Runtime sollte automatisch herausfinden, dass das Grain bereits IAsyncObserver
implementiert und diese IAsyncObserver
Methoden aufruft. Die Streaming-Runtime unterstützt dies derzeit jedoch nicht, und der Grain-Code muss weiterhin explizit ResumeAsync
aufrufen, auch wenn der Grain IAsyncObserver
direkt implementiert.
Explizite und implizite Abonnements
Standardmäßig muss ein Stream-Consumer den Datenstrom explizit abonnieren. Dieses Abonnement wird in der Regel von einer externen Nachricht ausgelöst, die das Grain (oder der Client) erhält und es anweist, sich zu abonnieren. Wenn ein Benutzer beispielsweise einem Chatraum beitritt, erhält sein Grain eine JoinChatGroup
Nachricht mit dem Chatnamen, sodass das Benutzer-Grain diesen Chatstream abonniert.
Darüber hinaus Orleans unterstützen Streams implizite Abonnements. In diesem Modell abonniert das Getreide nicht direkt. Es wird automatisch und implizit basierend auf seiner Getreideidentität und einem ImplicitStreamSubscriptionAttribute abonniert. Der Hauptwert von impliziten Abonnements besteht darin, dass Streamaktivitäten die Kornaktivierung und folglich das Abonnement automatisch auslösen. Wenn beispielsweise ein SMS-Stream von einem Korn produziert und von einem anderen Korn verarbeitet werden soll, benötigt der Produzent die Identität des Verbraucherkorns und muss dieses kontaktieren, um es zur Abonnierung aufzufordern. Nur dann konnte das Senden von Ereignissen gestartet werden. Stattdessen kann der Produzent bei impliziten Abonnements einfach damit beginnen, Ereignisse zu einem Stream zu produzieren, und der Verbrauchergrain wird automatisch aktiviert und abonniert. In diesem Fall muss der Produzent nicht wissen, wer die Ereignisse liest.
Die Grainimplementierung MyGrainType
kann ein Attribut [ImplicitStreamSubscription("MyStreamNamespace")]
deklarieren. Dies teilt der Streaming-Runtime mit, dass, wenn ein Ereignis in einem Stream mit der Identität GUID XXX und Namespace "MyStreamNamespace"
generiert wird, es an das Grain mit der Identität XXX vom Typ MyGrainType
übermittelt werden sollte. Anders ausgedrückt: Die Runtime ordnet den Stream <XXX, MyStreamNamespace>
dem Consumergrain <XXX, MyGrainType>
zu.
Das Vorhandensein von ImplicitStreamSubscription
verursacht, dass die Streaming-Runtime dieses Grain automatisch für den Stream abonniert und Ereignisse des Streams an es liefert. Der Grain-Code muss jedoch der Laufzeit weiterhin mitteilen, wie Ereignisse verarbeitet werden sollen. Im Wesentlichen muss IAsyncObserver
angefügt werden. Daher muss der Grain-Code innerhalb von OnActivateAsync
beim Aktivieren aufgerufen werden:
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
StreamId streamId =
StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
IAsyncStream<T> stream =
streamProvider.GetStream<T>(streamId);
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
IStreamProvider streamProvider =
base.GetStreamProvider("SimpleStreamProvider");
IAsyncStream<T> stream =
streamProvider.GetStream<T>(this.GetPrimaryKey(), "MyStreamNamespace");
StreamSubscriptionHandle<T> subscription =
await stream.SubscribeAsync(IAsyncObserver<T>);
Schreiben von Abonnementlogik
Nachfolgend finden Sie Richtlinien zum Schreiben von Abonnementlogik für verschiedene Fälle: explizite und implizite Abonnements, zurückspulebare und nicht zurückspulebare Datenströme. Der Hauptunterschied zwischen expliziten und impliziten Abonnements besteht darin, dass bei impliziten Abonnements das Grain immer genau ein implizites Abonnement pro Stream-Namespace hat. Es gibt keine Möglichkeit, Mehrfachabonnements zu erstellen, keine Möglichkeit zum Abbestellen, und die Grain-Logik muss nur die Verarbeitungslogik anfügen. Dies bedeutet auch, dass es nie erforderlich ist, ein implizites Abonnement fortzusetzen. Auf der anderen Seite müssen Sie das Abonnement bei expliziten Abonnements wieder aufnehmen; andernfalls führt ein erneutes Abonnieren dazu, dass das Grain mehrfach abonniert wird.
Implizite Abonnements:
Bei impliziten Abonnements muss das Aggregationsintervall dennoch den Stream abonnieren, um die Verarbeitungslogik anzufügen. Sie können dies im Grain des Verbrauchers durchführen, indem Sie die Schnittstellen IStreamSubscriptionObserver
und IAsyncObserver<T>
implementieren, sodass das Grain unabhängig vom Abonnieren aktiviert werden kann. Um den Datenstrom zu abonnieren, erstellt das Aggregationsintervall ein Handle und ruft await handle.ResumeAsync(this)
in seiner Methode OnSubscribed(...)
auf.
Implementieren Sie zum Verarbeiten von Nachrichten die IAsyncObserver<T>.OnNextAsync(...)
Methode, um einen Datenstrom und ein Sequenztoken zu empfangen. Alternativ kann die ResumeAsync
Methode eine Reihe von Delegaten verwenden, die die Methoden der IAsyncObserver<T>
Schnittstelle darstellen: onNextAsync
, , onErrorAsync
und onCompletedAsync
.
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
_logger.LogInformation($"Received an item from the stream: {item}");
}
public async Task OnSubscribed(IStreamSubscriptionHandleFactory handleFactory)
{
var handle = handleFactory.Create<string>();
await handle.ResumeAsync(this);
}
public override async Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(
this.GetPrimaryKey(), "MyStreamNamespace");
await stream.SubscribeAsync(OnNextAsync);
}
Explizite Abonnements:
Bei expliziten Abonnements muss ein Grain SubscribeAsync
aufrufen, um den Stream zu abonnieren. Dadurch wird ein Abonnement erstellt und die Verarbeitungslogik hinzugefügt. Das explizite Abonnement besteht, bis die Einheit sich abmeldet. Wenn ein Grain deaktiviert und reaktiviert wird, bleibt es weiterhin explizit abonniert, aber es ist keine Verarbeitungslogik zugeordnet. In diesem Fall muss das Grain die Verarbeitungslogik erneut anfügen. Um dies zu tun, muss das Element in seiner OnActivateAsync
zunächst herausfinden, welche Abonnements es durch Aufruf von IAsyncStream<T>.GetAllSubscriptionHandles() hat. Das Korn muss ResumeAsync
auf jedem Handle ausführen, mit dem es die Verarbeitung fortsetzen möchte, oder UnsubscribeAsync
auf allen Handles, mit denen es fertig ist. Das Grain kann auch optional das StreamSequenceToken
als Argument für die ResumeAsync
-Aufrufe angeben, wodurch dieses explizite Abonnement damit beginnt, von diesem Token zu konsumieren.
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var streamId = StreamId.Create("MyStreamNamespace", this.GetPrimaryKey());
var stream = streamProvider.GetStream<string>(streamId);
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
foreach (var handle in subscriptionHandles)
{
await handle.ResumeAsync(this);
}
}
public async override Task OnActivateAsync()
{
var streamProvider = this.GetStreamProvider(PROVIDER_NAME);
var stream =
streamProvider.GetStream<string>(this.GetPrimaryKey(), "MyStreamNamespace");
var subscriptionHandles = await stream.GetAllSubscriptionHandles();
if (!subscriptionHandles.IsNullOrEmpty())
{
subscriptionHandles.ForEach(
async x => await x.ResumeAsync(OnNextAsync));
}
}
Streamreihenfolge und Sequenztoken
Die Reihenfolge der Ereignisübermittlung zwischen einem einzelnen Produzenten und Einem Verbraucher hängt vom Datenstromanbieter ab.
Mit SMS kontrolliert der Hersteller explizit die Reihenfolge der vom Verbraucher gesehenen Ereignisse, indem er steuert, wie sie sie veröffentlichen. Standardmäßig (wenn die Option für den SimpleMessageStreamProviderOptions.FireAndForgetDelivery SMS-Anbieter auf false
gesetzt ist) und wenn der Produzent auf jeden OnNextAsync
Anruf wartet, werden Ereignisse in FIFO-Reihenfolge eintreffen. In SMS kann der Produzent entscheiden, wie Zustellungsfehler behandelt werden, die durch ein fehlerhaftes Task
angezeigt und vom OnNextAsync
-Anruf zurückgegeben werden.
Azure Queue-Streams garantieren keine FIFO-Reihenfolge, da die zugrunde liegenden Azure-Warteschlangen in Ausfallfällen keine Reihenfolge garantieren (obwohl sie FIFO-Reihenfolge in fehlerfreien Ausführungen garantieren). Wenn ein Produzent ein Ereignis in einer Azure-Warteschlange bereitstellt und die Warteschlangenoperation fehlschlägt, muss der Produzent eine andere Warteschlange erneut versuchen und später mit potenziellen doppelten Nachrichten umgehen. Auf der Anlieferungsseite entfernt die Orleans Streaming-Laufzeit das Ereignis aus der Warteschlange und versucht, es zur Verarbeitung an Abnehmer zu übermitteln. Die Laufzeit löscht das Ereignis nur bei erfolgreicher Verarbeitung aus der Warteschlange. Wenn die Übermittlung oder Verarbeitung fehlschlägt, wird das Ereignis nicht aus der Warteschlange gelöscht und wird später automatisch wieder angezeigt. Die Streaming-Laufzeit versucht, die Daten erneut zu übermitteln, was möglicherweise die FIFO-Reihenfolge stört. Dieses Verhalten entspricht der normalen Semantik von Azure-Warteschlangen.
Anwendungsdefinierte Reihenfolge: Um die oben genannten Sortierungsprobleme zu behandeln, kann Ihre Anwendung optional die Sortierung angeben. Erreichen Sie dies mithilfe eines StreamSequenceTokenundurchsichtigen IComparable Objekts, das zum Anordnen von Ereignissen verwendet wird. Ein Producer kann ein optionales StreamSequenceToken
an den OnNextAsync
-Aufruf übergeben. Dies StreamSequenceToken
wird an den Verbraucher weitergegeben und zusammen mit dem Ereignis bereitgestellt. Auf diese Weise kann Ihre Anwendung die Reihenfolge unabhängig von der Streaminglaufzeit ermitteln und rekonstruieren.
Zurückspulbare Streams
Einige Datenströme erlauben nur ab dem neuesten Zeitpunkt das Abonnieren, während andere "Zurück in die Zeit" zulassen. Diese Funktion hängt von der zugrunde liegenden Warteschlangentechnologie und dem jeweiligen Datenstromanbieter ab. Beispielsweise ermöglichen Azure-Warteschlangen nur den Abruf der neuesten eingereihten Ereignisse, während Event Hubs das Wiedergeben von Ereignissen ab einem beliebigen Zeitpunkt bis zu einem gewissen Ablaufzeitpunkt zulässt. Datenströme, die das Zurückwechseln unterstützen, werden als rewindbare Datenströme bezeichnet.
Der Consumer eines zurückspulbaren Streams kann ein StreamSequenceToken
an den SubscribeAsync
-Aufruf übergeben. Die Laufzeit liefert Ereignisse beginnend mit diesem StreamSequenceToken
. Ein NULL-Token bedeutet, dass der Consumer Ereignisse ab dem letzten Ereignis erhalten möchte.
Die Möglichkeit zum Zurückspulen eines Streams ist insbesondere in Wiederherstellungsszenarien sehr nützlich. Betrachten Sie z. B. ein Korn, das einen Datenstrom abonniert und regelmäßig den Zustand zusammen mit dem neuesten Sequenztoken überprüft. Beim Wiederherstellen eines Fehlers kann das Korn denselben Datenstrom aus dem neuesten prüfpunktierten Sequenztoken erneut abonnieren und wiederherstellen, ohne dass ereignisse verloren gehen, die seit dem letzten Prüfpunkt generiert wurden.
Der Event Hubs-Anbieter ist rückspulbar. Sie finden den zugehörigen Code auf GitHub: Orleans/Azure/Orleans.Streaming.EventHubs. Die Dienste SMS (jetzt Broadcast Channel) und Azure Queue sind nicht wiederherstellbar.
Zustandslose, automatisch aufskalierte Verarbeitung
Standardmäßig unterstützen Orleans Streaming-Ziele eine große Anzahl relativ kleiner Datenströme, die jeweils von einem oder mehreren zustandsbehafteten Körnern verarbeitet werden. Insgesamt wird die Verarbeitung aller Datenströme unter vielen regulären (zustandsbehafteten) Getreiden abgeshardt. Ihr Anwendungscode steuert diese Sharding durch Zuweisen von Stream-IDs und Korn-IDs und durch explizites Abonnieren. Das Ziel ist eine zustandsbehaftete Verarbeitung mit Sharding.
Es gibt jedoch auch ein interessantes Szenario der automatisch skalierten zustandslosen Verarbeitung. In diesem Szenario verfügt eine Anwendung über eine kleine Anzahl von Datenströmen (oder sogar einen großen Datenstrom), und das Ziel ist die zustandslose Verarbeitung. Ein Beispiel ist ein globaler Datenstrom von Ereignissen, bei dem die Verarbeitung das Decodieren jedes Ereignisses und die potenziell Weiterleitung an andere Datenströme für eine weitere zustandsbehaftete Verarbeitung umfasst. Zustandslose, skalierte Datenstromverarbeitung kann in Orleans über StatelessWorkerAttribute Grains unterstützt werden.
Aktueller Status der zustandslosen automatisch skalierten Verarbeitung: Dies ist noch nicht implementiert. Der Versuch, einen Datenstrom von einem StatelessWorker
Grain zu abonnieren, führt zu einem nicht definierten Verhalten.
Wir erwägen die Unterstützung dieser Option.
Grains und Orleans-Clients
Die Funktionsweise von Orleans-Streams ist für Grains und Orleans Clients gleich. Das bedeutet, dass Sie dieselben APIs innerhalb eines "grain" und in einem Orleans Client verwenden können, um Ereignisse zu erstellen und zu nutzen. Dies vereinfacht die Anwendungslogik erheblich, wodurch spezielle clientseitige APIs wie Grain Observers redundant werden.
Vollständig verwaltetes und zuverlässiges Streaming-Pub/Sub
Zum Nachverfolgen von Stream-Abonnements verwendet Orleans eine Laufzeitkomponente namens Streaming Pub-Sub, die als Rendezvous-Punkt für Stream-Konsumenten und Produzenten dient. Pub-Sub verfolgt alle Streamabonnements, speichert sie und gleicht Stream-Consumer mit Stream-Produzenten ab.
Anwendungen können auswählen, wo und wie die Pub/Sub-Daten gespeichert werden. Die Pub/Sub-Komponente selbst ist in Form von Grains (genannt PubSubRendezvousGrain
) implementiert, die deklarative Orleans-Persistenz nutzen.
PubSubRendezvousGrain
verwendet den Speicheranbieter PubSubStore
. Wie bei jedem Grain können Sie eine Implementierung für einen Speicheranbieter angeben. Für Streaming Pub-Sub können Sie die Implementierung von PubSubStore
zur Konstruktionszeit des Silos mithilfe des Silohost-Builders ändern.
Im Folgenden wird Pub-Sub so konfiguriert, dass der Status in Azure-Tabellen gespeichert wird.
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");
Auf diese Weise werden Pub-Sub Daten dauerhaft in Azure Table gespeichert. Für die anfängliche Entwicklung können Sie auch Arbeitsspeicher verwenden. Zusätzlich zu Pub/Sub übermittelt die Orleans-Streamingruntime Ereignisse von Producern an Consumer, verwaltete alle Laufzeitressourcen, die aktiv genutzten Streams zugewiesen sind, und führt eine transparente Garbage Collection für Laufzeitressourcen von ungenutzten Streams durch.
Konfiguration
Um Datenströme zu verwenden, müssen Sie Streamanbieter über den Silohost oder Clusterclient-Generatoren aktivieren. Beispiel für die Einrichtung eines Streamanbieters:
hostBuilder.AddMemoryStreams("StreamProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConfigureTableServiceClient("<Secret>")))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConfigureTableServiceClient("<Secret>"));
hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
.AddAzureQueueStreams<AzureQueueDataAdapterV2>("AzureQueueProvider",
optionsBuilder => optionsBuilder.Configure(
options => options.ConnectionString = "<Secret>"))
.AddAzureTableGrainStorage("PubSubStore",
options => options.ConnectionString = "<Secret>");