Remarque
L’accès à cette page requiert une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page requiert une autorisation. Vous pouvez essayer de modifier des répertoires.
Parfois, un modèle de message/réponse simple n’est pas suffisant et le client doit recevoir des notifications asynchrones. Par exemple, un utilisateur peut souhaiter une notification lorsqu’un ami publie un nouveau message instantané.
Les observateurs clients sont un mécanisme permettant la notification asynchrone des clients. Les interfaces d’observateur doivent hériter de IGrainObserver, et toutes les méthodes doivent retourner soit void, Task, Task<TResult>, ValueTask, ou ValueTask<TResult>. Nous vous déconseillons d’utiliser un type de retour de void, car il peut encourager l’utilisation de async void dans l’implémentation. Il s’agit d’un modèle dangereux, car il peut provoquer des blocages d’application si une exception est levée à partir de la méthode. Au lieu de cela, pour les scénarios de notification les plus performants, envisagez d’appliquer la OneWayAttribute méthode d’interface de l’observateur. Ainsi, le destinataire ne renvoie pas de réponse à l'invocation de la méthode et fait que la méthode retourne immédiatement au site de l'appel, sans attendre de réponse de l'observateur. Un grain appelle une méthode sur un observateur en l’appelant comme n’importe quelle méthode d’interface de grain. Le Orleans runtime garantit la remise des demandes et des réponses. Un cas d’usage courant pour les observateurs consiste à inscrire un client pour recevoir des notifications lorsqu’un événement se produit dans l’application Orleans . Un grain publiant ces notifications devrait fournir une API pour ajouter ou supprimer des observateurs. En outre, il est généralement pratique d’exposer une méthode autorisant l’annulation d’un abonnement existant.
Vous pouvez utiliser une classe utilitaire comme ObserverManager<TObserver> pour simplifier le développement de types de grains observés. Contrairement aux grains, qui Orleans se réactivent automatiquement en fonction des besoins après l’échec, les clients ne sont pas tolérants aux pannes : un client qui échoue peut ne jamais récupérer. Pour cette raison, l’utilitaire ObserverManager<T> supprime les abonnements après une durée configurée. Les clients actifs doivent se réinscrire à intervalles réguliers pour que leurs abonnements restent actifs.
Pour vous abonner à une notification, le client doit d’abord créer un objet local implémentant l’interface observateur. Il appelle ensuite la méthode CreateObjectReference de l'usine à grains pour transformer l’objet en référence de grain. Vous pouvez ensuite transmettre cette référence à la méthode d’abonnement du grain de notification. Lorsque l’observateur n’est plus nécessaire, appelez DeleteObjectReference pour nettoyer la référence et éviter une fuite de mémoire dans le processus client.
D’autres grains peuvent également utiliser ce modèle pour recevoir des notifications asynchrones. Les grains peuvent implémenter des IGrainObserver interfaces. Contrairement au cas de l’abonnement client, le grain d’abonnement implémente simplement l’interface observateur et transmet une référence à elle-même (par exemple). this.AsReference<IMyGrainObserverInterface>() Il n’est pas nécessaire d’avoir CreateObjectReference car les grains sont déjà adressables.
Exemple de code
Supposons que vous disposez d’un grain qui envoie régulièrement des messages aux clients. Par souci de simplicité, le message dans notre exemple est une chaîne. Tout d’abord, définissez l’interface sur le client qui reçoit le message.
L’interface ressemble à ceci :
public interface IChat : IGrainObserver
{
Task ReceiveMessage(string message);
}
La seule exigence spéciale est que l’interface doit hériter de IGrainObserver.
À présent, tout client souhaitant observer ces messages doit implémenter une classe qui implémente IChat.
Le cas le plus simple ressemble à ceci :
public class Chat : IChat
{
public Task ReceiveMessage(string message)
{
Console.WriteLine(message);
return Task.CompletedTask;
}
}
Sur le serveur, vous devez ensuite disposer d’un grain qui envoie ces messages de chat aux clients. Le grain doit également fournir un mécanisme permettant aux clients de s’abonner et de se désabonner des notifications. Pour les abonnements, le grain peut utiliser une instance de la classe utilitaire ObserverManager<IChat>.
Remarque
ObserverManager<TObserver> fait partie de Orleans depuis la version 7.0. Pour les versions antérieures, l’implémentation suivante peut être copiée.
class HelloGrain : Grain, IHello
{
private readonly ObserverManager<IChat> _subsManager;
public HelloGrain(ILogger<HelloGrain> logger)
{
_subsManager =
new ObserverManager<IChat>(
TimeSpan.FromMinutes(5), logger);
}
// Clients call this to subscribe.
public Task Subscribe(IChat observer)
{
_subsManager.Subscribe(observer, observer);
return Task.CompletedTask;
}
//Clients use this to unsubscribe and no longer receive messages.
public Task UnSubscribe(IChat observer)
{
_subsManager.Unsubscribe(observer);
return Task.CompletedTask;
}
}
Pour envoyer un message aux clients, utilisez la Notify méthode de l’instance IChat< ObserverManager>. La méthode prend une méthode Action<T> ou une expression lambda (où T est de type IChat ici). Vous pouvez appeler n’importe quelle méthode sur l’interface pour l’envoyer aux clients. Dans notre cas, nous n’avons qu’une seule méthode, ReceiveMessageet notre code d’envoi sur le serveur ressemble à ceci :
public Task SendUpdateMessage(string message)
{
_subsManager.Notify(s => s.ReceiveMessage(message));
return Task.CompletedTask;
}
À présent, notre serveur dispose d’une méthode permettant d’envoyer des messages à des clients observateurs et deux méthodes pour s’abonner/annuler l’inscription. Le client a implémenté une classe capable d’observer les messages de grain. La dernière étape consiste à créer une référence d’observateur sur le client à l’aide de notre classe précédemment implémentée Chat et à lui permettre de recevoir des messages après l’abonnement.
Le code se présente comme suit :
//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();
//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);
//Subscribe the instance to receive messages.
await friend.Subscribe(obj);
À présent, chaque fois que notre grain sur le serveur appelle la SendUpdateMessage méthode, tous les clients abonnés reçoivent le message. Dans notre code client, l’instance Chat dans la variable c reçoit le message et l’affiche à la console.
Lorsque l’observateur n’est plus nécessaire, dédésabonnez-vous du grain et supprimez la référence d'objet.
// Unsubscribe the observer when it's no longer needed.
await friend.Unsubscribe(obj);
// Delete the object reference to free resources and avoid a memory leak.
_grainFactory.DeleteObjectReference<IChat>(obj);
Important
Le client contient les instances d’observateur transmises à CreateObjectReference dans son gestionnaire d’objets interne en utilisant un WeakReference<T>, de sorte que cette même instance d’observateur peut être collectée par le ramasse-miettes si aucune autre référence n’existe, néanmoins, l'enregistrement de la référence d'objet reste actif tant que vous ne l'avez pas supprimé.
Appelez DeleteObjectReference toujours lorsque vous n’avez plus besoin d’un observateur pour supprimer cette inscription. Sans cela, une référence reste dans le gestionnaire d’objets interne du client, provoquant une fuite de mémoire qui peut éventuellement bloquer le processus client.
Vous devez conserver une référence pour chaque observateur que vous ne souhaitez pas collecter.
Remarque
Les observateurs sont intrinsèquement peu fiables, car un client hébergeant un observateur peut échouer, et les observateurs créés après la récupération ont des identités différentes (aléatoires). ObserverManager<TObserver> s’appuie sur une réinscription périodique par des observateurs, comme indiqué ci-dessus, afin qu’il puisse supprimer les observateurs inactifs.
Modèle d’exécution
Les implémentations de IGrainObserver sont inscrites par le biais d’un appel à IGrainFactory.CreateObjectReference. Chaque appel à cette méthode crée une référence pointant vers cette implémentation. Orleans exécute les demandes envoyées à chacune de ces références une par une, jusqu'à leur complétion. Les observateurs ne sont pas reentrants ; par conséquent, Orleans n’interlace pas les demandes simultanées à un observateur. Si plusieurs observateurs reçoivent des demandes simultanément, ces requêtes peuvent s’exécuter en parallèle. Les attributs tels que AlwaysInterleaveAttribute ou ReentrantAttribute n’affectent pas l’exécution des méthodes d’observateur ; vous ne pouvez pas personnaliser le modèle d’exécution.
Prise en charge d’CancellationToken
Orleans À compter de la version 9.0, les méthodes d’interface observateur prennent entièrement en charge les CancellationToken paramètres. Cela permet aux grains de signaler l’annulation aux observateurs, permettant ainsi d'arrêter en douceur les opérations de surveillance de longue durée.
Définir une interface d’observateur avec CancellationToken
Ajoutez un CancellationToken paramètre comme dernier paramètre dans la méthode d’interface de votre observateur :
public interface IDataObserver : IGrainObserver
{
Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default);
}
Implémenter l’observateur avec prise en charge de l’annulation
public class DataObserver : IDataObserver
{
public async Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default)
{
// Check cancellation before processing
cancellationToken.ThrowIfCancellationRequested();
// Process data with cancellation-aware operations
await ProcessDataAsync(data, cancellationToken);
}
private async Task ProcessDataAsync(DataPayload data, CancellationToken cancellationToken)
{
// Use cancellation token with async operations
await Task.Delay(100, cancellationToken);
Console.WriteLine($"Processed: {data.Id}");
}
}
Notifier les observateurs de l'annulation
Lorsque vous avertissez les observateurs, vous pouvez transmettre un jeton d’annulation pour activer l’annulation coopérative :
public async Task SendDataToObserversAsync(DataPayload data, CancellationToken cancellationToken = default)
{
// Create a linked token source to combine the grain's token with a timeout
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(TimeSpan.FromSeconds(30)); // Timeout for observer notifications
await _subsManager.NotifyAsync(
observer => observer.OnDataReceivedAsync(data, cts.Token));
}
Pour plus d’informations sur l’utilisation des jetons d'annulation dans Orleans, consultez la page Utiliser des jetons d'annulation dans les grains Orleans.
La prise en charge de CancellationToken pour les observateurs a été introduite dans Orleans 9.0. Pour les versions antérieures, vous pouvez utiliser GrainCancellationToken comme solution de contournement, mais la prise en charge directe de CancellationToken pour les méthodes d'observateurs n’est pas possible.
Pour obtenir la prise en charge complète d’CancellationToken, envisagez la mise à niveau vers Orleans la version 9.0 ou ultérieure.
La prise en charge de CancellationToken pour les observateurs est disponible dans Orleans 9.0 et versions ultérieures. Orleans 3.x utilise le mécanisme hérité GrainCancellationToken pour l’annulation.