Шаблон разработки наблюдателя

Шаблон проектирования наблюдателя позволяет подписчику регистрироваться и получать уведомления от поставщика. Он подходит для любого сценария, требующего push-уведомления. Шаблон определяет поставщика (также известный как субъект или наблюдаемый) и ноль, один или несколько наблюдателей. Наблюдатели регистрируются в поставщике и всякий раз, когда происходит предопределенное условие, событие или изменение состояния, поставщик автоматически уведомляет всех наблюдателей путем вызова делегата. В этом вызове метода поставщик также может предоставить информацию о текущем состоянии наблюдателям. В .NET шаблон конструктора наблюдателя применяется путем реализации универсальных System.IObservable<T> и System.IObserver<T> интерфейсов. Параметр универсального типа представляет тип, предоставляющий сведения о уведомлениях.

Когда следует применить шаблон

Шаблон проектирования наблюдателя подходит для распределенных push-уведомлений, так как он поддерживает чистое разделение между двумя разными компонентами или уровнями приложений, например уровнем источника данных (бизнес-логики) и уровнем пользовательского интерфейса (отображения). Шаблон можно реализовать всякий раз, когда поставщик использует обратные вызовы для предоставления клиентам текущей информации.

Реализация шаблона требует предоставления следующих сведений:

  • Поставщик или субъект, который является объектом, который отправляет уведомления наблюдателям. Поставщик — это класс или структура, реализующая IObservable<T> интерфейс. Поставщик должен реализовать один метод, который вызывается наблюдателями, IObservable<T>.Subscribeкоторые хотят получать уведомления от поставщика.

  • Наблюдатель, являющийся объектом, который получает уведомления от поставщика. Наблюдатель — это класс или структура, реализующая IObserver<T> интерфейс. Наблюдатель должен реализовать три метода, все из которых вызываются поставщиком:

    • IObserver<T>.OnNext, который предоставляет наблюдателю новую или текущую информацию.
    • IObserver<T>.OnError, который сообщает наблюдателю, что произошла ошибка.
    • IObserver<T>.OnCompleted— указывает, что поставщик завершил отправку уведомлений.
  • Механизм, позволяющий поставщику отслеживать наблюдателей. Как правило, поставщик использует объект контейнера, такой как объект System.Collections.Generic.List<T>, для хранения ссылок на реализации IObserver<T>, которые подписаны на уведомления. Использование контейнера хранилища для этой цели позволяет поставщику обрабатывать от нуля до неограниченного количества наблюдателей. Порядок, в котором наблюдатели получают уведомления, не определены; Поставщик может использовать любой метод для определения порядка.

  • Реализация IDisposable, которая позволяет поставщику удалять наблюдателей после завершения уведомления. Наблюдатели получают ссылку на IDisposable реализацию из Subscribe метода, поэтому они также могут вызвать IDisposable.Dispose метод для отмены подписки до завершения отправки уведомлений поставщиком.

  • Объект, содержащий данные, которые поставщик отправляет своим наблюдателям. Тип этого объекта соответствует параметру универсального типа для интерфейсов IObservable<T> и IObserver<T>. Хотя этот объект может совпадать с реализацией IObservable<T> , чаще всего это отдельный тип.

Замечание

Помимо реализации шаблона проектирования «наблюдатель», вам может быть интересно изучить библиотеки, построенные с использованием интерфейсов IObservable<T> и IObserver<T>. Например, реактивные расширения для .NET (Rx) состоят из набора методов расширения и операторов стандартной последовательности LINQ для поддержки асинхронного программирования.

Когда следует рассмотреть альтернативные варианты

Интерфейсы IObservable<T>/IObserver<T> хорошо подходят для сценариев push-уведомлений, но .NET предлагает другие шаблоны, которые могут быть лучше подходят:

  • Стандартные события .NET — для простых сценариев уведомлений в рамках одного приложения события более типичны и их проще реализовать.
  • IAsyncEnumerable<T> — Для асинхронных последовательностей с извлечением по запросу, где потребитель управляет скоростью, используйте асинхронные потоки.
  • System.Threading.Channels — для шаблонов «производитель-потребитель» с поддержкой обратного давления и асинхронности используйте System.Threading.Channels.
  • Reactive Extensions (Rx.NET) — для сложной композиции событий, фильтрации и преобразования используйте пакет System.Reactive вместо реализации IObservable<T> напрямую.

Наиболее заметным использованием IObservable<T> в .NET является DiagnosticListener, что позволяет авторам платформы и библиотеки выдавать структурированные диагностические события, на которые подписываются потребители.

Внедрение шаблона

В следующем примере используется шаблон проектирования наблюдателя для реализации информационной системы получения багажа в аэропорту. Класс BaggageInfo предоставляет сведения о прибывающих рейсах и каруселях, где можно получить багаж из каждого рейса. Он показан в следующем примере.

namespace Observables.Example;

public readonly record struct BaggageInfo(
    int FlightNumber,
    string From,
    int Carousel);
Namespace Example

    Public Structure BaggageInfo
        Implements IEquatable(Of BaggageInfo)

        Public ReadOnly Property FlightNumber As Integer
        Public ReadOnly Property From As String
        Public ReadOnly Property Carousel As Integer

        Public Sub New(flightNumber As Integer, from As String, carousel As Integer)
            Me.FlightNumber = flightNumber
            Me.From = from
            Me.Carousel = carousel
        End Sub

        Public Overloads Function Equals(other As BaggageInfo) As Boolean Implements IEquatable(Of BaggageInfo).Equals
            Return FlightNumber = other.FlightNumber AndAlso
                   From = other.From AndAlso
                   Carousel = other.Carousel
        End Function

        Public Overrides Function Equals(obj As Object) As Boolean
            If TypeOf obj Is BaggageInfo Then
                Return Equals(DirectCast(obj, BaggageInfo))
            End If
            Return False
        End Function

        Public Overrides Function GetHashCode() As Integer
            Return HashCode.Combine(FlightNumber, From, Carousel)
        End Function

        Public Shared Operator =(left As BaggageInfo, right As BaggageInfo) As Boolean
            Return left.Equals(right)
        End Operator

        Public Shared Operator <>(left As BaggageInfo, right As BaggageInfo) As Boolean
            Return Not left.Equals(right)
        End Operator
    End Structure

End Namespace

Класс BaggageHandler отвечает за получение информации о прибывающих рейсах и каруселях выдачи багажа. Внутри системы она поддерживает две коллекции:

  • _observers: коллекция клиентов, наблюдающих за обновленными сведениями.
  • _flights: коллекция рейсов и назначенных каруселей.

Исходный BaggageHandler код для класса показан в следующем примере.

namespace Observables.Example;

public sealed class BaggageHandler : IObservable<BaggageInfo>
{
    private readonly Lock _lock = new();
    private readonly HashSet<IObserver<BaggageInfo>> _observers = [];
    private readonly HashSet<BaggageInfo> _flights = [];

    public IDisposable Subscribe(IObserver<BaggageInfo> observer)
    {
        BaggageInfo[] snapshot;

        lock (_lock)
        {
            // Check whether observer is already registered. If not, add it.
            if (!_observers.Add(observer))
            {
                return new Unsubscriber<BaggageInfo>(_lock, _observers, observer);
            }

            // Snapshot existing data while holding the lock.
            snapshot = [.. _flights];
        }

        // Provide observer with existing data outside the lock.
        foreach (BaggageInfo item in snapshot)
        {
            observer.OnNext(item);
        }

        return new Unsubscriber<BaggageInfo>(_lock, _observers, observer);
    }

    // Called to indicate all baggage is now unloaded.
    public void BaggageStatus(int flightNumber) =>
        BaggageStatus(flightNumber, string.Empty, 0);

    public void BaggageStatus(int flightNumber, string from, int carousel)
    {
        var info = new BaggageInfo(flightNumber, from, carousel);
        IObserver<BaggageInfo>[] snapshot;

        // Carousel is assigned, so add new info object to list.
        if (carousel > 0)
        {
            lock (_lock)
            {
                if (!_flights.Add(info))
                {
                    return;
                }

                snapshot = [.. _observers];
            }

            foreach (IObserver<BaggageInfo> observer in snapshot)
            {
                observer.OnNext(info);
            }
        }
        else if (carousel is 0)
        {
            // Baggage claim for flight is done.
            lock (_lock)
            {
                if (_flights.RemoveWhere(
                    flight => flight.FlightNumber == info.FlightNumber) == 0)
                {
                    return;
                }

                snapshot = [.. _observers];
            }

            foreach (IObserver<BaggageInfo> observer in snapshot)
            {
                observer.OnNext(info);
            }
        }
    }

    public void LastBaggageClaimed()
    {
        IObserver<BaggageInfo>[] snapshot;

        lock (_lock)
        {
            snapshot = [.. _observers];
            _observers.Clear();
        }

        foreach (IObserver<BaggageInfo> observer in snapshot)
        {
            observer.OnCompleted();
        }
    }
}
Namespace Example

    Public NotInheritable Class BaggageHandler
        Implements IObservable(Of BaggageInfo)

        Private ReadOnly _lock As New Object()
        Private ReadOnly _observers As New HashSet(Of IObserver(Of BaggageInfo))()
        Private ReadOnly _flights As New HashSet(Of BaggageInfo)()

        Public Function Subscribe(observer As IObserver(Of BaggageInfo)) As IDisposable Implements IObservable(Of BaggageInfo).Subscribe
            Dim snapshot As BaggageInfo()

            SyncLock _lock
                ' Check whether observer is already registered. If not, add it.
                If Not _observers.Add(observer) Then
                    Return New Unsubscriber(Of BaggageInfo)(_lock, _observers, observer)
                End If

                ' Snapshot existing data while holding the lock.
                snapshot = _flights.ToArray()
            End SyncLock

            ' Provide observer with existing data outside the lock.
            For Each item As BaggageInfo In snapshot
                observer.OnNext(item)
            Next

            Return New Unsubscriber(Of BaggageInfo)(_lock, _observers, observer)
        End Function

        ' Called to indicate all baggage is now unloaded.
        Public Sub BaggageStatus(flightNumber As Integer)
            BaggageStatus(flightNumber, String.Empty, 0)
        End Sub

        Public Sub BaggageStatus(flightNumber As Integer, from As String, carousel As Integer)
            Dim info As New BaggageInfo(flightNumber, from, carousel)
            Dim snapshot As IObserver(Of BaggageInfo)()

            ' Carousel is assigned, so add new info object to list.
            If carousel > 0 Then
                SyncLock _lock
                    If Not _flights.Add(info) Then
                        Return
                    End If

                    snapshot = _observers.ToArray()
                End SyncLock

                For Each observer As IObserver(Of BaggageInfo) In snapshot
                    observer.OnNext(info)
                Next
            ElseIf carousel = 0 Then
                ' Baggage claim for flight is done.
                SyncLock _lock
                    If _flights.RemoveWhere(
                        Function(flight) flight.FlightNumber = info.FlightNumber) = 0 Then
                        Return
                    End If

                    snapshot = _observers.ToArray()
                End SyncLock

                For Each observer As IObserver(Of BaggageInfo) In snapshot
                    observer.OnNext(info)
                Next
            End If
        End Sub

        Public Sub LastBaggageClaimed()
            Dim snapshot As IObserver(Of BaggageInfo)()

            SyncLock _lock
                snapshot = _observers.ToArray()
                _observers.Clear()
            End SyncLock

            For Each observer As IObserver(Of BaggageInfo) In snapshot
                observer.OnCompleted()
            Next
        End Sub
    End Class

End Namespace

Клиенты, которые хотят получать обновленную информацию, вызывают BaggageHandler.Subscribe метод. Если клиент ранее не подписался на уведомления, в коллекцию IObserver<T> добавляется ссылка на реализацию клиента _observers.

Перегруженный BaggageHandler.BaggageStatus метод можно вызвать, чтобы указать, что багаж из рейса либо выгружается, либо больше не выгружается. В первом случае методу передаются номер рейса, аэропорт, из которого отправляется рейс, и карусель, где выгружают багаж. Во втором случае методу передается только номер рейса. Когда багаж выгружается, метод проверяет, существует ли информация BaggageInfo, переданная в метод, в коллекции _flights. Если это не так, метод добавляет информацию и вызывает метод OnNext каждого наблюдателя. Для рейсов, багаж которых больше не выгружается, метод проверяет, хранится ли информация об этом рейсе в коллекции _flights. Если это так, метод вызывает метод каждого наблюдателя OnNext и удаляет BaggageInfo объект из _flights коллекции.

После приземления последнего рейса дня и обработки его багажа вызывается метод BaggageHandler.LastBaggageClaimed. Этот метод вызывает метод каждого наблюдателя OnCompleted , чтобы указать, что все уведомления завершены, а затем очищает коллекцию _observers .

Метод поставщика Subscribe возвращает IDisposable реализацию, которая позволяет наблюдателям перестать получать уведомления перед вызовом OnCompleted метода. Исходный код для этого Unsubscriber класса показан в следующем примере. Когда класс создается в BaggageHandler.Subscribe методе, он передает ссылку _lock на объект, _observers коллекцию и ссылку на наблюдателя, добавляемого в коллекцию. Эти ссылки назначаются локальным переменным. При вызове метода объекта Dispose он удаляет наблюдателя из _observers коллекции в пределах блокировки.

namespace Observables.Example;

internal sealed class Unsubscriber<T> : IDisposable
{
    private readonly Lock _lock;
    private readonly ISet<IObserver<T>> _observers;
    private readonly IObserver<T> _observer;

    internal Unsubscriber(
        Lock @lock,
        ISet<IObserver<T>> observers,
        IObserver<T> observer) => (_lock, _observers, _observer) = (@lock, observers, observer);

    public void Dispose()
    {
        lock (_lock)
        {
            _observers.Remove(_observer);
        }
    }
}
Namespace Example

    Friend NotInheritable Class Unsubscriber(Of T)
        Implements IDisposable

        Private ReadOnly _lock As Object
        Private ReadOnly _observers As ISet(Of IObserver(Of T))
        Private ReadOnly _observer As IObserver(Of T)

        Friend Sub New(lock As Object, observers As ISet(Of IObserver(Of T)), observer As IObserver(Of T))
            _lock = lock
            _observers = observers
            _observer = observer
        End Sub

        Public Sub Dispose() Implements IDisposable.Dispose
            SyncLock _lock
                _observers.Remove(_observer)
            End SyncLock
        End Sub
    End Class

End Namespace

В следующем примере предоставлена реализация IObserver<T> с именем ArrivalsMonitor, которая является базовым классом, отображающим информацию о выдаче багажа. Сведения отображаются в алфавитном порядке по имени исходного города. Методы ArrivalsMonitor помечены как overridable (в Visual Basic) или virtual (в C#), поэтому их можно переопределить в производном классе.

namespace Observables.Example;

public class ArrivalsMonitor : IObserver<BaggageInfo>
{
    private readonly string _name;
    private readonly Lock _lock = new();
    private readonly List<string> _flights = [];
    private readonly string _format = "{0,-20} {1,5}  {2, 3}";
    private IDisposable? _cancellation;

    public ArrivalsMonitor(string name)
    {
        ArgumentException.ThrowIfNullOrEmpty(name);
        _name = name;
    }

    public virtual void Subscribe(BaggageHandler provider) =>
        _cancellation = provider.Subscribe(this);

    public virtual void Unsubscribe()
    {
        Interlocked.Exchange(ref _cancellation, null)?.Dispose();

        lock (_lock)
        {
            _flights.Clear();
        }
    }

    public virtual void OnCompleted()
    {
        lock (_lock)
        {
            _flights.Clear();
        }
    }

    // No implementation needed: Method is not called by the BaggageHandler class.
    public virtual void OnError(Exception e)
    {
        // No implementation.
    }

    // Update information.
    public virtual void OnNext(BaggageInfo info)
    {
        bool updated = false;

        lock (_lock)
        {
            // Flight has unloaded its baggage; remove from the monitor.
            if (info.Carousel is 0)
            {
                string flightNumber = $"{info.FlightNumber,5}";
                for (int index = _flights.Count - 1; index >= 0; index--)
                {
                    string flightInfo = _flights[index];
                    if (flightInfo.Substring(21, 5).Equals(flightNumber))
                    {
                        updated = true;
                        _flights.RemoveAt(index);
                    }
                }
            }
            else
            {
                // Add flight if it doesn't exist in the collection.
                string flightInfo = string.Format(_format, info.From, info.FlightNumber, info.Carousel);
                if (_flights.Contains(flightInfo) is false)
                {
                    _flights.Add(flightInfo);
                    updated = true;
                }
            }

            if (updated)
            {
                _flights.Sort();
                Console.WriteLine($"Arrivals information from {_name}");
                foreach (string flightInfo in _flights)
                {
                    Console.WriteLine(flightInfo);
                }

                Console.WriteLine();
            }
        }
    }
}
Imports System.Threading

Namespace Example

    Public Class ArrivalsMonitor
        Implements IObserver(Of BaggageInfo)

        Private ReadOnly _name As String
        Private ReadOnly _lock As New Object()
        Private ReadOnly _flights As New List(Of String)()
        Private ReadOnly _format As String = "{0,-20} {1,5}  {2, 3}"
        Private _cancellation As IDisposable

        Public Sub New(name As String)
            If String.IsNullOrEmpty(name) Then
                Throw New ArgumentException("Value cannot be null or empty.", NameOf(name))
            End If
            _name = name
        End Sub

        Public Overridable Sub Subscribe(provider As BaggageHandler)
            _cancellation = provider.Subscribe(Me)
        End Sub

        Public Overridable Sub Unsubscribe()
            Dim previous = Interlocked.Exchange(_cancellation, Nothing)
            previous?.Dispose()

            SyncLock _lock
                _flights.Clear()
            End SyncLock
        End Sub

        Public Overridable Sub OnCompleted() Implements IObserver(Of BaggageInfo).OnCompleted
            SyncLock _lock
                _flights.Clear()
            End SyncLock
        End Sub

        ' No implementation needed: Method is not called by the BaggageHandler class.
        Public Overridable Sub OnError([error] As Exception) Implements IObserver(Of BaggageInfo).OnError
            ' No implementation.
        End Sub

        ' Update information.
        Public Overridable Sub OnNext(info As BaggageInfo) Implements IObserver(Of BaggageInfo).OnNext
            Dim updated As Boolean = False

            SyncLock _lock
                ' Flight has unloaded its baggage; remove from the monitor.
                If info.Carousel = 0 Then
                    Dim flightNumber As String = String.Format("{0,5}", info.FlightNumber)
                    For index As Integer = _flights.Count - 1 To 0 Step -1
                        Dim flightInfo As String = _flights(index)
                        If flightInfo.Substring(21, 5).Equals(flightNumber) Then
                            updated = True
                            _flights.RemoveAt(index)
                        End If
                    Next
                Else
                    ' Add flight if it doesn't exist in the collection.
                    Dim flightInfo As String = String.Format(_format, info.From, info.FlightNumber, info.Carousel)
                    If Not _flights.Contains(flightInfo) Then
                        _flights.Add(flightInfo)
                        updated = True
                    End If
                End If

                If updated Then
                    _flights.Sort()
                    Console.WriteLine($"Arrivals information from {_name}")
                    For Each flightInfo As String In _flights
                        Console.WriteLine(flightInfo)
                    Next

                    Console.WriteLine()
                End If
            End SyncLock
        End Sub
    End Class

End Namespace

Класс ArrivalsMonitor включает Subscribe методы и Unsubscribe методы. Метод Subscribe позволяет классу сохранять IDisposable реализацию, возвращаемую вызовом Subscribe к частной переменной. Метод Unsubscribe позволяет классу отменять подписку из уведомлений путем вызова реализации поставщика Dispose . ArrivalsMonitor также предоставляет реализации методов OnNext, OnError и OnCompleted. Только реализация OnNext содержит значительный объем кода. Метод работает с частным, отсортированным, универсальным List<T> объектом, который хранит информацию об аэропортах отправления для прибывающих рейсов и каруселей, на которых доступен их багаж. Если класс BaggageHandler сообщает о поступлении нового рейса, то реализация метода OnNext добавляет сведения об этом рейсе в список. Если класс BaggageHandler сообщает, что багаж рейса был выгружен, метод OnNext удаляет этот рейс из списка. При каждом изменении список отсортирован и отображается в консоли.

В следующем примере содержится точка входа приложения: создается экземпляр класса BaggageHandler и два экземпляра класса ArrivalsMonitor, а метод BaggageHandler.BaggageStatus используется для добавления и удаления сведений о прибывающих рейсах. В каждом случае мониторы получают обновления и правильно отображают информацию о получении багажа.

using Observables.Example;

BaggageHandler provider = new();
ArrivalsMonitor observer1 = new("BaggageClaimMonitor1");
ArrivalsMonitor observer2 = new("SecurityExit");

provider.BaggageStatus(712, "Detroit", 3);
observer1.Subscribe(provider);

provider.BaggageStatus(712, "Kalamazoo", 3);
provider.BaggageStatus(400, "New York-Kennedy", 1);
provider.BaggageStatus(712, "Detroit", 3);
observer2.Subscribe(provider);

provider.BaggageStatus(511, "San Francisco", 2);
provider.BaggageStatus(712);
observer2.Unsubscribe();

provider.BaggageStatus(400);
provider.LastBaggageClaimed();
Imports Observables.Example
Imports System.Threading

Module Program
    Sub Main(args As String())
        Dim provider As New BaggageHandler()
        Dim observer1 As New ArrivalsMonitor("BaggageClaimMonitor1")
        Dim observer2 As New ArrivalsMonitor("SecurityExit")

        provider.BaggageStatus(712, "Detroit", 3)
        observer1.Subscribe(provider)

        provider.BaggageStatus(712, "Kalamazoo", 3)
        provider.BaggageStatus(400, "New York-Kennedy", 1)
        provider.BaggageStatus(712, "Detroit", 3)
        observer2.Subscribe(provider)

        provider.BaggageStatus(511, "San Francisco", 2)
        provider.BaggageStatus(712)
        observer2.Unsubscribe()

        provider.BaggageStatus(400)
        provider.LastBaggageClaimed()
    End Sub
End Module