Compartir a través de


Observadores

A veces, un patrón de mensaje o respuesta simple no es suficiente y el cliente debe recibir notificaciones asincrónicas. Por ejemplo, un usuario podría querer notificaciones cuando un amigo publica un nuevo mensaje instantáneo.

Los observadores de cliente son un mecanismo que permite la notificación asincrónica de los clientes. Las interfaces de observador deben heredar de IGrainObservery todos los métodos deben devolver void, Task, Task<TResult>, ValueTasko ValueTask<TResult>. No recomendamos un tipo de retorno de void porque podría fomentar el uso de async void en la implementación. Se trata de un patrón peligroso, ya que puede provocar fallos de la aplicación si se produce una excepción en el método. En su lugar, para escenarios de notificación de mejor esfuerzo, considere aplicar el OneWayAttribute al método de la interfaz del observador. Esto hace que el receptor no envíe una respuesta para la invocación del método y hace que el método vuelva inmediatamente en el sitio de llamada, sin esperar una respuesta del observador. Un grain llama a un método en un observador invocándolo como cualquier método de la interfaz de grain. El Orleans tiempo de ejecución garantiza la entrega de solicitudes y respuestas. Un caso de uso común para los observadores es inscribir a un cliente para recibir notificaciones cuando se produce un evento en la Orleans aplicación. Una unidad que publica dichas notificaciones debe proporcionar una API para agregar o quitar observadores. Además, suele ser conveniente exponer un método que permita la cancelación de una suscripción existente.

Puede usar una clase de utilidad como ObserverManager<TObserver> para simplificar el desarrollo de tipos de grano observados. A diferencia de los granos, que Orleans se reactivan automáticamente según sea necesario después del fallo, los clientes no son tolerantes a fallos: un cliente que falla podría no recuperarse nunca. Por este motivo, la ObserverManager<T> utilidad quita las suscripciones después de una duración configurada. Los clientes activos deben renovar su suscripción periódicamente para mantenerla activa.

Para suscribirse a una notificación, el cliente primero debe crear un objeto local que implemente la interfaz de observador. A continuación, llama al método CreateObjectReference de la fábrica de granos para convertir el objeto en una referencia de grano. Luego, puede pasar esta referencia al método de suscripción en el grano de notificación.

Otros granos también pueden usar este modelo para recibir notificaciones asincrónicas. Los granos son capaces de implementar IGrainObserver interfaces. A diferencia del caso de la suscripción de cliente, el grano de suscripción simplemente implementa la interfaz de observador y pasa una referencia a sí misma (por ejemplo, this.AsReference<IMyGrainObserverInterface>()). No es necesario CreateObjectReference porque los granulados ya son direccionables.

Ejemplo de código

Vamos a suponer que tiene una entidad que envía mensajes periódicamente a los clientes. Por motivos de simplicidad, el mensaje de nuestro ejemplo es una cadena. En primer lugar, defina la interfaz en el cliente que recibe el mensaje.

La interfaz tiene este aspecto:

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

El único requisito especial es que la interfaz debe heredar de IGrainObserver. Ahora, cualquier cliente que quiera observar estos mensajes debe implementar una clase que implemente IChat.

El caso más sencillo tiene un aspecto similar al siguiente:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

En el servidor, a continuación, debería tener un grano que envíe estos mensajes de chat a los clientes. El sistema también debe proporcionar un mecanismo para que los clientes puedan suscribirse y cancelar su suscripción a las notificaciones. En el caso de las suscripciones, el grano puede usar una instancia de la clase utilitaria ObserverManager<IChat>.

Nota:

ObserverManager<TObserver> forma parte de Orleans desde la versión 7.0. En el caso de las versiones anteriores, se puede copiar la siguiente implementación .

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Para enviar un mensaje a los clientes, use el Notify método de la instancia de IChat< observerManager>. El método toma un Action<T> método o una expresión lambda (donde T es de tipo IChat aquí). Puede llamar a cualquier método de la interfaz para enviarlo a los clientes. En nuestro caso, solo tenemos un método, ReceiveMessagey nuestro código de envío en el servidor tiene este aspecto:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Ahora, nuestro servidor tiene un método para enviar mensajes a los clientes de observador y dos métodos para suscribirse o anular la suscripción. El cliente ha implementado una clase que puede observar los mensajes de grano. El último paso consiste en crear una referencia de observador en el cliente mediante nuestra clase implementada Chat anteriormente y dejar que reciba mensajes después de suscribirse.

El código tiene este aspecto:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Ahora, cada vez que nuestra instancia en el servidor llama al método SendUpdateMessage, todos los clientes suscritos reciben el mensaje. En nuestro código de cliente, la Chat instancia de la variable c recibe el mensaje y la envía a la consola.

Importante

Los objetos pasados a CreateObjectReference se mantienen a través de WeakReference<T> y, por tanto, son recogidos por el recolector de basura si no existen otras referencias.

Debe mantener una referencia para cada observador que no quiera recopilar.

Nota:

Los observadores son intrínsecamente poco confiables porque un cliente que hospeda un observador podría producir un error y los observadores creados después de la recuperación tienen identidades diferentes (aleatorias). ObserverManager<TObserver> se basa en la suscripción periódica por parte de los observadores, como se ha descrito anteriormente, para que pueda quitar observadores inactivos.

Modelo de ejecución

Las implementaciones de IGrainObserver se registran mediante una llamada a IGrainFactory.CreateObjectReference. Cada llamada a ese método crea una nueva referencia que apunta a esa implementación. Orleans ejecuta las solicitudes enviadas a cada una de estas referencias una por una, hasta su finalización. Los observadores no son reentrantes; por lo tanto, Orleans no intercala las solicitudes simultáneas a un observador. Si varios observadores reciben solicitudes simultáneamente, esas solicitudes se pueden ejecutar en paralelo. Atributos como AlwaysInterleaveAttribute o ReentrantAttribute no afectan a la ejecución de métodos de observador; no se puede personalizar el modelo de ejecución.

Compatibilidad con CancellationToken

A partir de la versión Orleans 9.0, los métodos de la interfaz de observador admiten completamente los parámetros CancellationToken. Esto permite que los granos señalen la cancelación a los observadores, lo que posibilita detener de forma adecuada las operaciones prolongadas de observación.

Definición de una interfaz de observador con CancellationToken

Agregue un CancellationToken parámetro como último parámetro en el método de interfaz de observador:

public interface IDataObserver : IGrainObserver
{
    Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default);
}

Implementar el observador con soporte para cancelación

public class DataObserver : IDataObserver
{
    public async Task OnDataReceivedAsync(DataPayload data, CancellationToken cancellationToken = default)
    {
        // Check cancellation before processing
        cancellationToken.ThrowIfCancellationRequested();

        // Process data with cancellation-aware operations
        await ProcessDataAsync(data, cancellationToken);
    }

    private async Task ProcessDataAsync(DataPayload data, CancellationToken cancellationToken)
    {
        // Use cancellation token with async operations
        await Task.Delay(100, cancellationToken);
        Console.WriteLine($"Processed: {data.Id}");
    }
}

Notificar a los observadores de la cancelación

Al notificar a los observadores, puede pasar un token de cancelación para habilitar la cancelación cooperativa:

public async Task SendDataToObserversAsync(DataPayload data, CancellationToken cancellationToken = default)
{
    // Create a linked token source to combine the grain's token with a timeout
    using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
    cts.CancelAfter(TimeSpan.FromSeconds(30)); // Timeout for observer notifications

    await _subsManager.NotifyAsync(
        observer => observer.OnDataReceivedAsync(data, cts.Token));
}

Para obtener más información sobre el uso de tokens de cancelación en Orleans, consulte Uso de tokens de cancelación en Orleans granos.

La compatibilidad con CancellationToken para observadores se introdujo en Orleans la versión 9.0. En versiones anteriores, puede usar GrainCancellationToken como solución alternativa, pero la compatibilidad directa de CancellationToken en los métodos de observador no está disponible.

Para obtener compatibilidad completa con CancellationToken, considere la posibilidad de actualizar a Orleans la versión 9.0 o posterior.

La compatibilidad con CancellationToken para observadores está disponible a partir de la versión Orleans 9.0. Orleans 3.x usa el mecanismo heredado GrainCancellationToken para la cancelación.