Наблюдатели

Существуют ситуации, в которых недостаточно простого шаблона сообщения и ответа, и клиенту необходимо получать асинхронные уведомления. Например, пользователю может потребоваться получать уведомления о публикации нового мгновенного сообщения другом.

Клиентские наблюдатели — это механизм, позволяющий асинхронно уведомлять клиентов. Интерфейсы наблюдателя должны наследоваться от IGrainObserver, и все методы должны возвращать либо void, , ValueTaskTaskTask<TResult>ValueTask<TResult>либо . Возвращаемый тип void не рекомендуется, так как он может поощрять использование async void в реализации, что является опасным шаблоном, так как это может привести к сбою приложения, если исключение создается из метода. Вместо этого для сценариев уведомления о лучших усилиях рассмотрите возможность применения OneWayAttribute метода интерфейса наблюдателя. Это приведет к тому, что получатель не отправляет ответ на вызов метода и приведет к тому, что метод немедленно возвращается на сайте вызова, не ожидая ответа от наблюдателя. Зерно вызывает метод наблюдателя, вызывая его как любой метод интерфейса зерна. Среда Orleans выполнения обеспечит доставку запросов и ответов. Распространенный вариант использования для наблюдателей заключается в том, чтобы заручиться клиентом получать уведомления при возникновении события в Orleans приложении. Зерно, которое публикует такие уведомления, должно предоставить API для добавления или удаления наблюдателей. Кроме того, обычно удобно предоставлять метод, позволяющий отменить существующую подписку.

Разработчики зерна могут использовать служебный класс, такой как ObserverManager<TObserver> упрощение разработки наблюдаемых типов зерна. В отличие от зерна, которые автоматически активируются по мере необходимости после сбоя, клиенты не являются отказоустойчивыми: клиент, который не может восстановиться. По этой причине ObserverManager<T> программа удаляет подписки после заданной длительности. Клиенты, которые активны, должны повторно подписаться на таймер, чтобы сохранить свою подписку активной.

Чтобы подписаться на уведомление, клиент должен сначала создать локальный объект, реализующий интерфейс наблюдателя. Затем он вызывает метод на фабрике наблюдателей, CreateObjectReferenceчтобы превратить объект в ссылку на зерно, которая затем может быть передана методу подписки на уведомляющее зерно.

Эта модель также может использоваться другими зернами для получения асинхронных уведомлений. Зерна также могут реализовывать IGrainObserver интерфейсы. В отличие от клиентской подписки, подписка просто реализует интерфейс наблюдателя и передает ссылку на себя (например, ). this.AsReference<IMyGrainObserverInterface>() Нет необходимости, CreateObjectReference() потому что зерна уже доступны.

Пример кода

Предположим, что у нас есть зерно, которое периодически отправляет сообщения клиентам. Для простоты сообщение в нашем примере будет строкой. Сначала мы определим интерфейс на клиенте, который получит сообщение.

Интерфейс будет выглядеть следующим образом.

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

Единственное особенное заключается в том, что интерфейс должен наследоваться от IGrainObserver. Теперь любой клиент, который хочет наблюдать за этими сообщениями, должен реализовать класс, реализующий IChat.

Самый простой случай будет примерно таким:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

На сервере у нас должен быть рядом с зерном, который отправляет эти сообщения чата клиентам. Кроме того, у клиентов должен быть механизм подписки и отмены подписки на уведомления. Для подписок зерно может использовать экземпляр служебного класса ObserverManager<TObserver>.

Примечание.

ObserverManager<TObserver> входит в Orleans состав версии 7.0. Для более старых версий можно скопировать следующую реализацию .

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Чтобы отправить сообщение клиентам, Notify можно использовать метод экземпляра ObserverManager<IChat> . Метод принимает Action<T> метод или лямбда-выражение (где T имеет тип IChat здесь). Вы можете вызвать любой метод в интерфейсе, чтобы отправить его клиентам. В нашем случае у нас есть только один метод, ReceiveMessageи наш код отправки на сервере будет выглядеть следующим образом:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Теперь у нашего сервера есть метод отправки сообщений клиентам наблюдателей, два метода для подписки или отмены подписки, а клиент реализовал класс, способный наблюдать за сообщениями зерна. Последний шаг — создать ссылку наблюдателя на клиент с помощью ранее реализованного Chat класса и позволить ему получать сообщения после подписки на него.

Код будет выглядеть следующим образом:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Теперь, когда наше зерно на сервере вызывает SendUpdateMessage метод, все подписанные клиенты получат сообщение. В нашем клиентском коде Chat экземпляр в переменной c получит сообщение и выводит его в консоль.

Внимание

Объекты, передаваемые CreateObjectReference через объект, будут собираться с помощью мусора WeakReference<T> , если другие ссылки не существуют.

Пользователи должны поддерживать ссылку для каждого наблюдателя, который они не хотят собирать.

Примечание.

Наблюдатели по сути ненадежны, так как клиент, на котором размещается наблюдатель, может завершиться ошибкой, и наблюдатели, созданные после восстановления, имеют разные (случайные) удостоверения. ObserverManager<TObserver> использует периодическое повторное описание наблюдателями, как описано выше, чтобы неактивные наблюдатели могли быть удалены.

Модель выполнения

IGrainObserver Реализации регистрируются через вызов IGrainFactory.CreateObjectReference и каждый вызов этого метода создает новую ссылку, указывающую на реализацию. Orleans будет выполнять запросы, отправленные каждому из этих ссылок по одному, к завершению. Наблюдатели не являются повторными и поэтому одновременные запросы наблюдателя не будут перемечены Orleans. Если есть несколько наблюдателей, которые одновременно получают запросы, эти запросы могут выполняться параллельно. Выполнение методов наблюдателя не влияет на такие атрибуты, как AlwaysInterleaveAttribute или ReentrantAttribute: модель выполнения не может быть настроена разработчиком.