Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
A volte, un modello semplice di messaggio/risposta non è sufficiente e il client deve ricevere notifiche asincrone. Ad esempio, un utente potrebbe voler inviare una notifica quando un amico pubblica un nuovo messaggio istantaneo.
Gli osservatori client sono un meccanismo che consente la notifica asincrona dei client. Le interfacce Observer devono ereditare da IGrainObservere tutti i metodi devono restituire void, Task, Task<TResult>ValueTask, o ValueTask<TResult>. Non è consigliabile un tipo di ritorno di void poiché potrebbe incoraggiare l'uso di async void nell'implementazione. Si tratta di un modello pericoloso, in quanto può causare il crash dell'applicazione se viene generata un'eccezione dal metodo. Per gli scenari di notifica più impegnativi, è consigliabile applicare l'oggetto OneWayAttribute al metodo di interfaccia dell'osservatore. In questo modo il ricevitore non invia una risposta per l'invocazione del metodo e il metodo viene restituito immediatamente al punto di chiamata, senza attendere una risposta dal osservatore. Un grain chiama un metodo su un osservatore invocandolo come qualsiasi metodo dell'interfaccia di un grain. Il Orleans runtime garantisce il recapito di richieste e risposte. Un caso d'uso comune per gli osservatori consiste nell'integrare un client per ricevere notifiche quando si verifica un evento nell'applicazione Orleans . Una unità che pubblica tali notifiche deve fornire un'API per aggiungere o rimuovere osservatori. Inoltre, è in genere utile esporre un metodo che consente l'annullamento di una sottoscrizione esistente.
È possibile usare una classe di utilità come ObserverManager<TObserver> per semplificare lo sviluppo di tipi di granularità osservati. A differenza dei grani, che Orleans reagisce automaticamente in base alle esigenze dopo l'errore, i client non sono a tolleranza di errore: un client che non riesce potrebbe non essere mai ripristinato. Per questo motivo, l'utilità ObserverManager<T> rimuove le sottoscrizioni dopo una durata configurata. I clienti attivi devono rinnovare periodicamente il loro abbonamento per mantenerlo attivo.
Per sottoscrivere una notifica, il client deve prima creare un oggetto locale che implementa l'interfaccia observer. Chiama quindi il metodo CreateObjectReference sulla fabbrica di granuli per trasformare l'oggetto in un riferimento al granulo. È quindi possibile passare questo riferimento al metodo di sottoscrizione sul grano notificante.
Altri grani possono anche usare questo modello per ricevere notifiche asincrone. I grani possono implementare IGrainObserver interfacce. A differenza del caso di sottoscrizione del client, il grain di sottoscrizione implementa semplicemente l'interfaccia osservatore e passa un riferimento a se stesso (ad esempio, this.AsReference<IMyGrainObserverInterface>()). Non c'è bisogno di CreateObjectReference perché i dati sono già indirizzabili.
Esempio di codice
Supponiamo che tu abbia un modulo che invia periodicamente messaggi ai clienti. Per semplicità, il messaggio nell'esempio è una stringa. Prima di tutto, definire l'interfaccia nel client che riceve il messaggio.
L'interfaccia è simile alla seguente:
public interface IChat : IGrainObserver
{
Task ReceiveMessage(string message);
}
L'unico requisito speciale è che l'interfaccia deve ereditare da IGrainObserver.
A questo momento, qualsiasi client che vuole osservare questi messaggi deve implementare una classe che implementa IChat.
Il caso più semplice è simile al seguente:
public class Chat : IChat
{
public Task ReceiveMessage(string message)
{
Console.WriteLine(message);
return Task.CompletedTask;
}
}
Sul server, dovresti successivamente avere un componente che invia questi messaggi di chat ai client. Il grano dovrebbe anche fornire un meccanismo per consentire ai client di iscriversi e disiscriversi alle notifiche. Per le sottoscrizioni, il grain può usare un'istanza della utility class ObserverManager<IChat>.
Annotazioni
ObserverManager<TObserver> fa parte della Orleans versione 7.0. Per le versioni precedenti, è possibile copiare l'implementazione seguente.
class HelloGrain : Grain, IHello
{
private readonly ObserverManager<IChat> _subsManager;
public HelloGrain(ILogger<HelloGrain> logger)
{
_subsManager =
new ObserverManager<IChat>(
TimeSpan.FromMinutes(5), logger);
}
// Clients call this to subscribe.
public Task Subscribe(IChat observer)
{
_subsManager.Subscribe(observer, observer);
return Task.CompletedTask;
}
//Clients use this to unsubscribe and no longer receive messages.
public Task UnSubscribe(IChat observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}
Per inviare un messaggio ai client, usare il Notify metodo dell'istanza di ObserverManager<IChat> . Il metodo accetta un metodo o un'espressione Action<T> lambda (dove T è di tipo IChat qui). È possibile chiamare qualsiasi metodo sull'interfaccia per inviarlo ai client. In questo caso, è disponibile un solo metodo, ReceiveMessagee il codice di invio nel server è simile al seguente:
public Task SendUpdateMessage(string message)
{
_subsManager.Notify(s => s.ReceiveMessage(message));
return Task.CompletedTask;
}
Il server dispone ora di un metodo per inviare messaggi ai client osservatori e due metodi per la sottoscrizione/annullamento della sottoscrizione. Il client ha implementato una classe in grado di osservare i messaggi dei grain. Il passaggio finale consiste nel creare un riferimento osservatore sul client usando la classe implementata Chat in precedenza e consentire la ricezione di messaggi dopo la sottoscrizione.
Il codice sarà simile al seguente:
//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();
//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);
//Subscribe the instance to receive messages.
await friend.Subscribe(obj);
Ora, ogni volta che il nostro grain sul server chiama il metodo SendUpdateMessage, tutti i client sottoscritti ricevono il messaggio. Nel codice client l'istanza Chat nella variabile c riceve il messaggio e lo restituisce nella console.
Importante
Gli oggetti passati a CreateObjectReference vengono gestiti tramite WeakReference<T> e sono quindi sottoposti a Garbage Collection se non esistono altri riferimenti.
Dovresti mantenere un riferimento per ogni osservatore che non vuoi rimosso.
Annotazioni
Gli osservatori sono intrinsecamente inaffidabili perché un client che ospita un osservatore potrebbe non riuscire e gli osservatori creati dopo il ripristino hanno identità diverse (casuali). ObserverManager<TObserver> si basa sul rinnovamento periodico della sottoscrizione da parte degli osservatori, come discusso in precedenza, in modo da poter rimuovere osservatori inattivi.
Modello di esecuzione
Le implementazioni di IGrainObserver vengono registrate tramite una chiamata a IGrainFactory.CreateObjectReference. Ogni chiamata a tale metodo crea un nuovo riferimento che punta a tale implementazione. Orleans esegue le richieste inviate a ognuno di questi riferimenti uno alla volta, fino al completamento. Gli osservatori non sono reentranti; pertanto, Orleans non interleave le richieste simultanee a un osservatore. Se più osservatori ricevono richieste contemporaneamente, tali richieste possono essere eseguite in parallelo. Gli attributi, AlwaysInterleaveAttribute ad esempio o ReentrantAttribute non influiscono sull'esecuzione di metodi osservatore, ma non è possibile personalizzare il modello di esecuzione.
Supporto di CancellationToken
Orleans A partire dalla versione 9.0, i metodi dell'interfaccia observer supportano completamente i parametri CancellationToken. Ciò consente ai grain di segnalare l'annullamento agli osservatori, abilitando l'interruzione corretta delle osservazioni a lunga esecuzione.
Definire un'interfaccia osservatore con CancellationToken
Aggiungere un CancellationToken parametro come ultimo parametro nel metodo di interfaccia observer:
public interface IDataObserver : IGrainObserver
{
Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default);
}
Implementare l'osservatore con supporto per l'annullamento
public class DataObserver : IDataObserver
{
public async Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default)
{
// Check cancellation before processing
cancellationToken.ThrowIfCancellationRequested();
// Process data with cancellation-aware operations
await ProcessDataAsync(data, cancellationToken);
}
private async Task ProcessDataAsync(DataPayload data, CancellationToken cancellationToken)
{
// Use cancellation token with async operations
await Task.Delay(100, cancellationToken);
Console.WriteLine($"Processed: {data.Id}");
}
}
Notificare agli osservatori l'annullamento
Quando si invia una notifica agli osservatori, è possibile passare un token di annullamento per abilitare l'annullamento cooperativo:
public async Task SendDataToObserversAsync(DataPayload data, CancellationToken cancellationToken = default)
{
// Create a linked token source to combine the grain's token with a timeout
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30)); // Timeout for observer notifications
await _subsManager.NotifyAsync(
observer => observer.OnDataReceivedAsync(data, cts.Token));
}
Per ulteriori informazioni sull'uso dei token di annullamento in Orleans, vedere Usare i token di annullamento nei Orleans grains.
Il supporto di CancellationToken per gli osservatori è stato introdotto nella Orleans versione 9.0. Per le versioni precedenti, è possibile usare GrainCancellationToken come soluzione alternativa, ma il supporto diretto CancellationToken nei metodi observer non è disponibile.
Per il supporto di CancellationToken completo, è consigliabile eseguire l'aggiornamento alla Orleans versione 9.0 o successiva.
Il supporto di CancellationToken per gli osservatori è disponibile nella Orleans versione 9.0 e successive. Orleans 3.x utilizza il meccanismo legacy GrainCancellationToken per l'annullamento.