Udostępnij za pośrednictwem


Obserwatorzy

Istnieją sytuacje, w których prosty wzorzec komunikatu/odpowiedzi nie wystarczy, a klient musi otrzymywać powiadomienia asynchroniczne. Na przykład użytkownik może chcieć otrzymywać powiadomienia o opublikowaniu nowej wiadomości błyskawicznej przez znajomego.

Obserwatorzy klienta to mechanizm, który umożliwia asynchroniczne powiadamianie klientów. Interfejsy obserwatora muszą dziedziczyć z IGrainObservermetody , a wszystkie metody muszą zwracać wartości void, , Task, Task<TResult>, ValueTasklub ValueTask<TResult>. Typ zwracany void nie jest zalecany, ponieważ może zachęcić do użycia async void w implementacji, który jest niebezpiecznym wzorcem, ponieważ może to spowodować awarię aplikacji, jeśli wyjątek zostanie zgłoszony z metody. Zamiast tego w przypadku scenariuszy powiadamiania o najlepszym wysiłku rozważ zastosowanie OneWayAttribute metody interfejsu obserwatora. Spowoduje to, że odbiorca nie wyśle odpowiedzi dla wywołania metody i spowoduje, że metoda zwróci się natychmiast w lokacji wywołania bez oczekiwania na odpowiedź od obserwatora. Ziarno wywołuje metodę na obserwatorze, wywołując ją jak dowolną metodę interfejsu ziarna. Środowisko Orleans uruchomieniowe zapewni dostarczanie żądań i odpowiedzi. Typowym przypadkiem użycia obserwatorów jest zarejestrowanie klienta w celu otrzymywania powiadomień w przypadku wystąpienia zdarzenia w Orleans aplikacji. Ziarno publikujące takie powiadomienia powinno dostarczyć interfejs API do dodawania lub usuwania obserwatorów. Ponadto zwykle wygodne jest uwidocznienie metody, która umożliwia anulowanie istniejącej subskrypcji.

Deweloperzy ziarna mogą używać klasy narzędziowej, takiej jak ObserverManager<TObserver> upraszczanie opracowywania obserwowanych typów ziarna. W przeciwieństwie do ziarna, które są automatycznie ponownie uaktywniane zgodnie z potrzebami po awarii, klienci nie są odporne na błędy: klient, który kończy się niepowodzeniem, może nigdy nie odzyskać. Z tego powodu ObserverManager<T> narzędzie usuwa subskrypcje po skonfigurowanym czasie trwania. Klienci, którzy są aktywni, powinni ponownie przypisać czasomierz, aby zachować aktywną subskrypcję.

Aby zasubskrybować powiadomienie, klient musi najpierw utworzyć obiekt lokalny, który implementuje interfejs obserwatora. Następnie wywołuje metodę w fabryce CreateObjectReferenceobserwatora ", aby przekształcić obiekt w odwołanie ziarna, które następnie można przekazać do metody subskrypcji na ziarno powiadamiania.

Ten model może być również używany przez inne ziarna do odbierania powiadomień asynchronicznych. Ziarna mogą również implementować IGrainObserver interfejsy. W przeciwieństwie do przypadku subskrypcji klienta subskrybowanie ziarna po prostu implementuje interfejs obserwatora i przekazuje odwołanie do samego siebie (np. this.AsReference<IMyGrainObserverInterface>()). Nie ma potrzeby, CreateObjectReference() ponieważ ziarna są już adresowalne.

Przykład kodu

Załóżmy, że mamy ziarno, które okresowo wysyła komunikaty do klientów. Dla uproszczenia komunikat w naszym przykładzie będzie ciągiem. Najpierw zdefiniujemy interfejs na kliencie, który otrzyma komunikat.

Interfejs będzie wyglądać następująco

public interface IChat : IGrainObserver
{
    Task ReceiveMessage(string message);
}

Jedyną wyjątkową rzeczą jest to, że interfejs powinien dziedziczyć z IGrainObserverklasy . Teraz każdy klient, który chce obserwować te komunikaty, powinien zaimplementować klasę, która implementuje IChat.

Najprostszym przypadkiem będzie coś takiego:

public class Chat : IChat
{
    public Task ReceiveMessage(string message)
    {
        Console.WriteLine(message);
        return Task.CompletedTask;
    }
}

Na serwerze powinniśmy mieć ziarno, które wysyła te wiadomości czatu do klientów. Ziarno powinno również mieć mechanizm, aby klienci subskrybowali i anulowali subskrypcję powiadomień. W przypadku subskrypcji ziarno może używać wystąpienia klasy ObserverManager<TObserver>narzędziowej .

Uwaga

ObserverManager<TObserver> jest częścią wersji Orleans 7.0. W przypadku starszych wersji można skopiować następującą implementację .

class HelloGrain : Grain, IHello
{
    private readonly ObserverManager<IChat> _subsManager;

    public HelloGrain(ILogger<HelloGrain> logger)
    {
        _subsManager =
            new ObserverManager<IChat>(
                TimeSpan.FromMinutes(5), logger);
    }

    // Clients call this to subscribe.
    public Task Subscribe(IChat observer)
    {
        _subsManager.Subscribe(observer, observer);

        return Task.CompletedTask;
    }

    //Clients use this to unsubscribe and no longer receive messages.
    public Task UnSubscribe(IChat observer)
    {
        _subsManager.Unsubscribe(observer);

        return Task.CompletedTask;
    }
}

Aby wysłać komunikat do klientów, Notify można użyć metody ObserverManager<IChat> wystąpienia. Metoda przyjmuje metodę Action<T> lub wyrażenie lambda (gdzie T jest typu IChat tutaj). Możesz wywołać dowolną metodę w interfejsie, aby wysłać ją do klientów. W naszym przypadku mamy tylko jedną metodę , ReceiveMessagea nasz kod wysyłający na serwerze wygląda następująco:

public Task SendUpdateMessage(string message)
{
    _subsManager.Notify(s => s.ReceiveMessage(message));

    return Task.CompletedTask;
}

Teraz nasz serwer ma metodę wysyłania komunikatów do klientów obserwatorów, dwie metody subskrybowania/anulowania subskrypcji, a klient zaimplementował klasę umożliwiającą obserwowanie komunikatów ziarna. Ostatnim krokiem jest utworzenie odwołania obserwatora na kliencie przy użyciu wcześniej zaimplementowanego Chat klasy i umożliwianie odbierania komunikatów po zasubskrybowaniu.

Kod będzie wyglądać następująco:

//First create the grain reference
var friend = _grainFactory.GetGrain<IHello>(0);
Chat c = new Chat();

//Create a reference for chat, usable for subscribing to the observable grain.
var obj = _grainFactory.CreateObjectReference<IChat>(c);

//Subscribe the instance to receive messages.
await friend.Subscribe(obj);

Teraz, gdy nasze ziarno na serwerze wywołuje metodę SendUpdateMessage , wszyscy subskrybucyjni klienci otrzymają komunikat. W naszym kodzie Chat klienta wystąpienie w zmiennej c otrzyma komunikat i wyświetli go w konsoli.

Ważne

Obiekty przekazywane do CreateObjectReference są przechowywane za pośrednictwem elementu WeakReference<T> i dlatego będą wyrzucane śmieci, jeśli nie istnieją żadne inne odwołania.

Użytkownicy powinni zachować odwołanie dla każdego obserwatora, którego nie chcą zbierać.

Uwaga

Obserwatorzy są z natury zawodni, ponieważ klient, który hostuje obserwatora, może zakończyć się niepowodzeniem, a obserwatorzy utworzone po odzyskiwaniu mają różne (losowe) tożsamości. ObserverManager<TObserver> opiera się na okresowej ponownej subskrypcji przez obserwatorów, zgodnie z powyższym opisem, tak aby nieaktywni obserwatorzy mogli zostać usunięci.

Model wykonania

Implementacje IGrainObserver są rejestrowane za pośrednictwem wywołania metody IGrainFactory.CreateObjectReference i każde wywołanie tej metody tworzy nowe odwołanie wskazujące tę implementację. Orleans spowoduje wykonanie żądań wysyłanych do każdego z tych odwołań jeden po jednym do ukończenia. Obserwatorzy nie są reentrantami i dlatego współbieżne żądania do obserwatora nie będą przeplatane przez Orleanselement . Jeśli istnieje wiele obserwatorów, którzy odbierają żądania jednocześnie, te żądania mogą być wykonywane równolegle. Wykonywanie metod obserwatora nie ma wpływu na atrybuty, takie jak AlwaysInterleaveAttribute lub ReentrantAttribute: model wykonywania nie może być dostosowany przez dewelopera.