Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Manchmal reicht ein einfaches Nachrichten-/Antwortmuster nicht aus, und der Client muss asynchrone Benachrichtigungen empfangen. Beispielsweise kann ein Benutzer eine Benachrichtigung wünschen, wenn ein Freund eine neue Chatnachricht veröffentlicht.
Clientbeobachter sind ein Mechanismus, der asynchrone Benachrichtigungen von Clients zulässt. Beobachterschnittstellen müssen von IGrainObserver erben, und alle Methoden müssen entweder void, Task, Task<TResult>, ValueTask oder ValueTask<TResult> zurückgeben. Es wird nicht empfohlen, einen Rückgabetyp void zurückzugeben, da er die Verwendung async void in der Implementierung fördern könnte. Dies ist ein gefährliches Muster, da es zu Anwendungsabstürzen führen kann, wenn eine Ausnahme von der Methode ausgelöst wird. Stattdessen sollten Sie für bestmögliche Benachrichtigungsszenarien erwägen, den OneWayAttribute auf die Schnittstellenmethode des Beobachters anzuwenden. Dies bewirkt, dass der Empfänger keine Antwort für den Methodenaufruf sendet und die Methode sofort an der Aufrufstelle zurückkehrt, ohne auf eine Antwort von dem Beobachter zu warten. Ein Korn ruft eine Methode bei einem Beobachter auf, indem es sie wie jede Kornschnittstellenmethode aufruft. Die Orleans Laufzeit stellt die Übermittlung von Anforderungen und Antworten sicher. Ein gängiger Anwendungsfall für Beobachter besteht darin, dass ein Client Benachrichtigungen empfängt, wenn ein Ereignis in der Orleans Anwendung auftritt. Ein Modul, das solche Benachrichtigungen veröffentlicht, sollte eine API zum Hinzufügen oder Entfernen von Beobachtern einrichten. Darüber hinaus ist es in der Regel praktisch, eine Methode verfügbar zu machen, mit der eine Kündigung eines vorhandenen Abonnements ermöglicht wird.
Sie können eine Hilfsklasse verwenden, ObserverManager<TObserver> um die Entwicklung beobachteter Getreidetypen zu vereinfachen. Im Gegensatz zu Körnern, die Orleans nach einem Ausfall automatisch nach Bedarf reaktiviert werden, sind Clients nicht fehlertolerant: Ein Client, der fehlschlägt, kann sich möglicherweise nie erholen. Aus diesem Grund entfernt das ObserverManager<T> Hilfsprogramm Abonnements nach einer konfigurierten Dauer. Aktive Kunden sollten sich regelmäßig erneut anmelden, um ihre Abonnements aktiv zu halten.
Um eine Benachrichtigung zu abonnieren, muss der Client zunächst ein lokales Objekt erstellen, das die Beobachterschnittstelle implementiert. Anschließend wird die Methode CreateObjectReference auf der Grain Factory aufgerufen, um das Objekt in eine Grain-Referenz zu konvertieren. Anschließend können Sie diesen Verweis an die Subscribe-Methode der Benachrichtigungseinheit weiterleiten.
Andere Getreide können dieses Modell auch verwenden, um asynchrone Benachrichtigungen zu empfangen. Getreide kann Schnittstellen implementieren IGrainObserver . Im Gegensatz zum Clientabonnementfall implementiert das abonnierende Objekt einfach die Beobachter-Schnittstelle und übergibt einen Verweis auf sich selbst (z. B. this.AsReference<IMyGrainObserverInterface>()). Es ist nicht erforderlich für CreateObjectReference, da Getreide bereits adressierbar ist.
Codebeispiel
Angenommen, Sie haben ein Modul, das regelmäßig Nachrichten an Kunden sendet. Der Einfachheit halber ist die Nachricht in unserem Beispiel eine Zeichenfolge. Definieren Sie zunächst die Schnittstelle auf dem Client, die die Nachricht empfängt.
Die Schnittstelle sieht wie folgt aus:
public interface IChat : IGrainObserver
{
Task ReceiveMessage(string message);
}
Die einzige besondere Anforderung besteht darin, dass die Schnittstelle von IGrainObserver erben muss.
Nun sollte jeder Client, der diese Meldungen beobachten möchte, eine Klasse implementieren, die IChat umsetzt.
Der einfachste Fall sieht ungefähr wie folgt aus:
public class Chat : IChat
{
public Task ReceiveMessage(string message)
{
Console.WriteLine(message);
return Task.CompletedTask;
}
}
Als Nächstes sollten Sie auf dem Server eine Komponente haben, die diese Chatnachrichten an die Clients sendet. Das Korn sollte auch einen Mechanismus für Kunden bereitstellen, um Benachrichtigungen zu abonnieren und abzubestellen. Für Abonnements kann die Komponente eine Instanz der Hilfsklasse ObserverManager<IChat> verwenden.
Hinweis
ObserverManager<TObserver> ist seit Version 7.0 Teil von Orleans. Bei älteren Versionen kann die folgende Implementierung kopiert werden.
class HelloGrain : Grain, IHello
{
private readonly ObserverManager<IChat> _subsManager;
public HelloGrain(ILogger<HelloGrain> logger)
{
_subsManager =
new ObserverManager<IChat>(
TimeSpan.FromMinutes(5), logger);
}
// Clients call this to subscribe.
public Task Subscribe(IChat observer)
{
_subsManager.Subscribe(observer, observer);
return Task.CompletedTask;
}
//Clients use this to unsubscribe and no longer receive messages.
public Task UnSubscribe(IChat observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}
Verwenden Sie zum Senden einer Nachricht an Clients die Notify Methode der ObserverManager<IChat-Instanz> . Die Methode verwendet eine Action<T> Methode oder einen Lambda-Ausdruck (wo T der Typ IChat ist). Sie können eine beliebige Methode auf der Schnittstelle aufrufen, um sie an Clients zu senden. In unserem Fall haben wir nur eine Methode, ReceiveMessageund unser Sendecode auf dem Server sieht wie folgt aus:
public Task SendUpdateMessage(string message)
{
_subsManager.Notify(s => s.ReceiveMessage(message));
return Task.CompletedTask;
}
Jetzt verfügt unser Server über eine Methode zum Senden von Nachrichten an Beobachterclients und zwei Methoden zum Abonnieren/Abmelden. Der Klient hat eine Klasse implementiert, die die Grain-Nachrichten überwachen kann. Der letzte Schritt besteht darin, einen Beobachterverweis auf dem Client mithilfe unserer zuvor implementierten Chat Klasse zu erstellen und damit es nach dem Abonnieren Nachrichten empfängt.
Der Code sieht wie folgt aus:
//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();
//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);
//Subscribe the instance to receive messages.
await friend.Subscribe(obj);
Jetzt, wann immer unser Grain auf dem Server die SendUpdateMessage Methode aufruft, erhalten alle abonnierten Clients die Nachricht. In unserem Clientcode empfängt die Chat Instanz in der Variablen c die Nachricht und gibt sie an die Konsole aus.
Von Bedeutung
Objekte, die CreateObjectReference übergeben werden, werden über eine WeakReference<T> gespeichert und werden daher garbage collection, wenn keine anderen Verweise vorhanden sind.
Sie sollten einen Verweis für jeden Beobachter beibehalten, den Sie nicht sammeln möchten.
Hinweis
Beobachter sind inhärent unzuverlässig, da ein Client, der einen Beobachter hostet, fehlschlagen kann, und Beobachter, die nach der Wiederherstellung erstellt wurden, unterschiedliche (randomisierte) Identitäten haben. ObserverManager<TObserver> beruht auf einer regelmäßigen Erneuerung durch Beobachter, wie oben beschrieben, damit inaktive Beobachter entfernt werden können.
Ausführungsmodell
Implementierungen von IGrainObserver werden über einen Aufruf von IGrainFactory.CreateObjectReference registriert. Jeder Aufruf dieser Methode erstellt einen neuen Verweis, der auf diese Implementierung verweist. Orleans führt Anforderungen aus, die nacheinander an jede dieser Referenzen gesendet werden, bis zur Vollständigkeit. Beobachter sind nicht wiedereintrittsfähig; daher überschneiden sich gleichzeitige Anfragen an einen Beobachter nicht. Wenn mehrere Beobachter gleichzeitig Anforderungen empfangen, können diese Anforderungen parallel ausgeführt werden. Attribute wie AlwaysInterleaveAttribute oder ReentrantAttribute wirken sich nicht auf die Ausführung von Beobachtermethoden aus. Sie können das Ausführungsmodell nicht anpassen.
CancellationToken-Unterstützung
Ab Orleans 9.0 unterstützen Beobachterschnittstellenmethoden CancellationToken Parameter vollständig. Dies ermöglicht es Körnern, die Absage an Beobachter zu signalisieren, sodass lange laufende Beobachteroperationen ordnungsgemäß angehalten werden können.
Definieren einer Beobachterschnittstelle mit CancellationToken
Fügen Sie einen CancellationToken Parameter als letzten Parameter in der Beobachterschnittstellenmethode hinzu:
public interface IDataObserver : IGrainObserver
{
Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default);
}
Implementieren Sie den Beobachter mit Abbruchunterstützung
public class DataObserver : IDataObserver
{
public async Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default)
{
// Check cancellation before processing
cancellationToken.ThrowIfCancellationRequested();
// Process data with cancellation-aware operations
await ProcessDataAsync(data, cancellationToken);
}
private async Task ProcessDataAsync(DataPayload data, CancellationToken cancellationToken)
{
// Use cancellation token with async operations
await Task.Delay(100, cancellationToken);
Console.WriteLine($"Processed: {data.Id}");
}
}
Benachrichtigen von Beobachtern über Stornierung
Wenn Sie Beobachter benachrichtigen, können Sie ein Abbruchtoken übergeben, um eine kooperative Kündigung zu ermöglichen:
public async Task SendDataToObserversAsync(DataPayload data, CancellationToken cancellationToken = default)
{
// Create a linked token source to combine the grain's token with a timeout
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30)); // Timeout for observer notifications
await _subsManager.NotifyAsync(
observer => observer.OnDataReceivedAsync(data, cts.Token));
}
Weitere Informationen zur Verwendung von Abbruchtokens in Orleans finden Sie unter Verwendung von Abbruchtokens in Orleans Grains.
Die CancellationToken-Unterstützung für Beobachter wurde in Orleans 9.0 eingeführt. Für frühere Versionen können Sie GrainCancellationToken als Problemumgehung verwenden, aber direkte CancellationToken Unterstützung in Beobachtermethoden ist nicht verfügbar.
Für die vollständige CancellationToken-Unterstützung sollten Sie ein Upgrade auf Orleans 9.0 oder höher in Betracht ziehen.
CancellationToken-Unterstützung für Beobachter ist in Orleans 9.0 und höher verfügbar. Orleans 3.x verwendet den veralteten Mechanismus GrainCancellationToken für Abbruchvorgänge.