Observateurs

Il existe certaines situations où un modèle simple de message/réponse ne suffit et où le client a besoin de recevoir des notifications asynchrones. Par exemple, un utilisateur peut souhaiter être averti dès qu’un nouveau message instantané est publié par un ami.

Les observateurs clients sont un mécanisme qui permet de notifier les clients de manière asynchrone. Les interfaces d’observateur doivent hériter de IGrainObserver, et toutes les méthodes doivent retourner void, Task, Task<TResult>, ValueTask ou ValueTask<TResult>. Un type de retour void n’est pas recommandé, car il peut encourager l’utilisation d’async void sur l’implémentation, ce qui est un modèle dangereux, car cela peut entraîner des plantages d’application si une exception est levée à partir de la méthode. Au lieu de cela, pour les scénarios de notification les plus performants, envisagez d’appliquer OneWayAttribute à la méthode d’interface de l’observateur. De cette façon, le destinataire n’envoie pas de réponse pour l’appel de méthode et la méthode retourne immédiatement sur le site d’appel, sans attendre une réponse de l’observateur. Un grain appelle une méthode sur un observateur en l’appelant comme n’importe quelle méthode d’interface de grain. Le runtime Orleans garantira la remise des requêtes et des réponses. Un cas d’usage courant pour les observateurs consiste à inscrire un client afin qu’il reçoive des notifications quand un événement se produit dans l’application Orleans. Un grain qui publie de telles notifications doit fournir une API pour l’ajout ou la suppression d’observateurs. Par ailleurs, il est généralement opportun d’exposer une méthode qui permette d’annuler un abonnement existant.

Les développeurs de grains peuvent utiliser une classe utilitaire comme ObserverManager<TObserver> pour simplifier le développement de types de grains observés. Contrairement aux grains, qui sont automatiquement réactivés si nécessaire après une défaillance, les clients ne sont pas tolérants aux pannes : un client qui connaît une défaillance peut ne jamais être récupéré. Pour cette raison, l’utilitaire ObserverManager<T> supprime les abonnements après une durée configurée. Les clients actifs doivent se réinscrire dans un délai déterminé pour que leur abonnement reste actif.

Pour s’abonner à une notification, le client doit d’abord créer un objet local qui implémente l’interface d’observateur. Il appelle ensuite une méthode dans la fabrique d’observateur (CreateObjectReference) pour transformer l’objet en référence de grain, qui peut ensuite être transmise à la méthode d’abonnement du grain de notification.

Ce modèle peut aussi être utilisé par d’autres grains pour recevoir des notifications asynchrones. Les grains peuvent aussi implémenter des interfaces IGrainObserver. Contrairement au cas d’abonnement de client, le grain qui s’abonne implémente simplement l’interface d’observateur et transmet une référence à elle-même (par exemple this.AsReference<IMyGrainObserverInterface>()). CreateObjectReference() n’est pas nécessaire, car les grains sont déjà adressables.

Exemple de code

Supposons que nous avons un grain qui envoie régulièrement des messages aux clients. Par souci de simplicité, le message de notre exemple est une chaîne (string). Pour commencer, nous définissons l’interface sur le client qui recevra le message.

Voici comment se présente l’interface :

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

La seule particularité est que l’interface doit hériter de IGrainObserver. À présent, tout client souhaitant observer ces messages doit implémenter une classe qui implémente IChat.

Voici comment à quoi ressemblerait le cas le plus simple :

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

Sur le serveur, nous devons ensuite faire envoyer à un grain ces messages de conversation aux clients. Le grain doit aussi disposer d’un mécanisme qui permette aux clients de s’abonner et de se désabonner aux notifications. Pour les abonnements, le grain peut utiliser une instance de la classe utilitaire ObserverManager<TObserver>.

Notes

ObserverManager<TObserver> fait partie de Orleans depuis la version 7.0. Pour les versions antérieures, l’implémentation suivante peut être copiée.

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Pour envoyer un message aux clients, la méthode Notify de l’instance ObserverManager<IChat> peut être utilisée. La méthode prend une méthode Action<T> ou une expression lambda (où T est ici de type IChat). Vous pouvez appeler n’importe quelle méthode sur l’interface pour l’envoyer aux clients. Dans notre cas, nous n’avons qu’une seule méthode, ReceiveMessage, et notre code d’envoi sur le serveur se présente comme suit :

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Maintenant, notre serveur dispose d’une méthode pour envoyer des messages aux clients observateurs, de deux méthodes pour l’abonnement et le désabonnement, et le client a implémenté une classe capable d’observer les messages de grain. La dernière étape consiste à créer une référence d’observateur sur le client à l’aide de la classe Chat que nous avons implémentée précédemment, et à lui permettre de recevoir les messages après l’avoir abonné à ceux-ci.

Le code se présente comme ceci :

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Désormais, chaque fois que notre grain sur le serveur appellera la méthode SendUpdateMessage, tous les clients abonnés recevront le message. Dans notre code client, l’instance Chat dans la variable c recevra le message et le sortira vers la console.

Important

Les objets transmis à CreateObjectReference sont conservés via une référence WeakReference<T> et font donc l’objet d’une opération de garbage collection s’il n’existe aucune autre référence.

Les utilisateurs doivent conserver une référence pour chaque observateur qu’ils ne souhaitent pas voir collectés.

Notes

Les observateurs sont intrinsèquement peu fiables, car un client qui héberge un observateur peut être défaillant et les observateurs créés après la récupération ont des identités différentes (aléatoires). ObserverManager<TObserver> s’appuie sur le réabonnement périodique des observateurs, comme indiqué ci-dessus, afin que les observateurs inactifs puissent être supprimés.

Modèle d’exécution

Les implémentations de IGrainObserver sont inscrites via un appel à IGrainFactory.CreateObjectReference, et chaque appel à cette méthode crée une nouvelle référence qui pointe vers cette implémentation. Orleans exécutera les requêtes envoyées à chacune de ces références, l’une après l’autre, jusqu’à leur achèvement. Les observateurs n’étant pas réentrants, les requêtes simultanées adressées à un observateur ne seront pas entrelacées par Orleans. Si plusieurs observateurs reçoivent des demandes de manière simultanée, elles peuvent s’exécuter en parallèle. L’exécution des méthodes d’observateur n’est pas affectée par les attributs tels que AlwaysInterleaveAttribute ou ReentrantAttribute : le modèle d’exécution ne peut pas être personnalisé par un développeur.