Observadores

Hay situaciones en las que un patrón simple de mensaje/respuesta no es suficiente y el cliente debe recibir notificaciones asincrónicas. Por ejemplo, es posible que un usuario quiera recibir una notificación cuando un amigo haya publicado un nuevo mensaje instantáneo.

Los observadores de cliente son un mecanismo que permite enviar notificaciones a los clientes de forma asincrónica. Las interfaces de observador deben heredar de IGrainObserver y todos los métodos deben devolver void, Task, Task<TResult>, ValueTask o ValueTask<TResult>. No se recomienda un tipo de valor devuelto void, ya que puede fomentar el uso de async void en la implementación, que es un patrón peligroso porque puede provocar bloqueos de aplicación si se produce una excepción desde el método. En su lugar, para escenarios de notificación del mejor esfuerzo, considere la posibilidad de aplicar OneWayAttribute al método de interfaz del observador. Esto hará que el receptor no envíe una respuesta para la invocación del método y hará que el método se devuelva inmediatamente en el sitio de la llamada, sin esperar una respuesta del observador. Un grano llama a un método en un observador invocándolo como cualquier método de interfaz de grano. El runtime de Orleans garantizará la entrega de solicitudes y respuestas. Un caso de uso común para los observadores consiste en inscribir a un cliente para recibir notificaciones cuando se produce un evento en la aplicación de Orleans. Un grano que publique estas notificaciones debe proporcionar una API para agregar o quitar observadores. Además, normalmente resulta conveniente exponer un método que permita cancelar una suscripción existente.

Los desarrolladores de granos pueden usar una clase de utilidad como ObserverManager<TObserver> para simplificar el desarrollo de tipos de grano observados. A diferencia de los granos, que se reactivan automáticamente según sea necesario después de un error, los clientes no son tolerantes a errores; si un cliente genera un error, podría no recuperarse nunca. Por este motivo, la utilidad ObserverManager<T> quita las suscripciones cuando ha transcurrido la duración que se haya configurado. Los clientes que están activos deben volver a suscribirse en función de un temporizador para mantener activa su suscripción.

Para suscribirse a una notificación, el cliente debe crear primero un objeto local que implemente la interfaz del observador. Después, llama a un método en el generador del observador (CreateObjectReference) para convertir el objeto en una referencia de grano, que luego se puede pasar al método de suscripción en el grano de notificación.

Este modelo también pueden usarlo otros granos para recibir notificaciones asincrónicas. Los granos también pueden implementar interfaces IGrainObserver. A diferencia de lo que sucede en el caso de la suscripción de cliente, el grano de suscripción simplemente implementa la interfaz del observador y pasa una referencia a sí mismo (por ejemplo, this.AsReference<IMyGrainObserverInterface>()). No es necesario usar CreateObjectReference() porque los granos ya son direccionables.

Ejemplo de código

Supongamos que tenemos un grano que envía mensajes periódicamente a los clientes. Por motivos de simplicidad, el mensaje de nuestro ejemplo será una cadena. Primero definimos la interfaz en el cliente que recibirá el mensaje.

La interfaz tendrá un aspecto similar al siguiente:

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

Lo único especial es que la interfaz debe heredar de IGrainObserver. Ahora, cualquier cliente que quiera observar esos mensajes debe implementar una clase que implemente IChat.

El caso más simple será similar al siguiente:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

En el servidor, deberíamos tener un grano que envíe estos mensajes de chat a los clientes. El grano también debe tener un mecanismo para que los clientes puedan suscribirse y cancelar la suscripción a las notificaciones por su cuenta. En el caso de las suscripciones, el grano puede usar una instancia de la clase de utilidad ObserverManager<TObserver>.

Nota:

ObserverManager<TObserver> forma parte de Orleans desde la versión 7.0. En las versiones anteriores, se puede copiar la siguiente implementación.

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Para enviar un mensaje a los clientes, se puede usar el método Notify de la instancia de ObserverManager<IChat>. El método toma un método Action<T> o una expresión lambda (en este caso, T es de tipo IChat). Puede llamar a cualquier método de la interfaz para enviarlo a los clientes. En nuestro caso, solo tenemos un método (ReceiveMessage) y nuestro código de envío en el servidor tendría un aspecto similar al siguiente:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Ahora, el servidor tiene un método para enviar mensajes a los clientes del observador, así como dos métodos para suscribirse o cancelar la suscripción, y el cliente ha implementado una clase capaz de observar los mensajes de grano. El último paso es crear una referencia de observador en el cliente mediante la clase Chat implementada anteriormente y dejar que reciba los mensajes después de suscribirse.

El código tendrá un aspecto ser similar al siguiente:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Ahora, cada vez que el grano del servidor llame al método SendUpdateMessage, todos los clientes suscritos recibirán el mensaje. En el código de cliente, la instancia de Chat en la variable c recibirá el mensaje y lo mostrará en la consola.

Importante

Los objetos que se pasan a CreateObjectReference se mantienen mediante WeakReference<T> y, por tanto, se recopilarán como elementos no utilizados si no existen otras referencias.

Los usuarios deben mantener una referencia para cada observador que no quieran que se recopile.

Nota

Los observadores son poco confiables de manera inherente, ya que un cliente que hospeda un observador puede producir un error y los observadores creados después de la recuperación tienen identidades diferentes (aleatorias). ObserverManager<TObserver> se basa en la suscripción periódica de los observadores, como se explicó anteriormente, para que se puedan quitar observadores inactivos.

Modelo de ejecución

Las implementaciones de IGrainObserver se registran mediante una llamada a IGrainFactory.CreateObjectReference y cada llamada a ese método crea una referencia que apunta a esa implementación. Orleans ejecutará una a una las solicitudes enviadas a cada una de estas referencias, hasta su finalización. Los observadores no son reentrantes y, por lo tanto, Orleans no intercalará las solicitudes simultáneas a un observador. Si hay varios observadores que reciben solicitudes simultáneamente, dichas solicitudes pueden ejecutarse en paralelo. La ejecución de métodos del observador no se ve afectada por atributos como AlwaysInterleaveAttribute o ReentrantAttribute. Los desarrolladores no pueden personalizar el modelo de ejecución.