オブザーバー 設計パターンを使用すると、サブスクライバーはプロバイダーに登録し、プロバイダーから通知を受信できます。 これは、プッシュベースの通知を必要とする任意のシナリオに適しています。 このパターンでは、 プロバイダー ( サブジェクト または 観測可能とも呼ばれます) と、0 個、1 つ以上の オブザーバーを定義します。 オブザーバーはプロバイダーに登録し、定義済みの条件、イベント、または状態の変更が発生するたびに、プロバイダーはデリゲートを呼び出してすべてのオブザーバーに自動的に通知します。 このメソッド呼び出しでは、プロバイダーはオブザーバーに現在の状態情報を提供することもできます。 .NET では、汎用の System.IObservable<T> インターフェイスと System.IObserver<T> インターフェイスを実装することで、オブザーバー デザイン パターンが適用されます。 ジェネリック型パラメーターは、通知情報を提供する型を表します。
パターンを適用するタイミング
オブザーバーの設計パターンは、データ ソース (ビジネス ロジック) レイヤーやユーザー インターフェイス (表示) レイヤーなど、2 つの異なるコンポーネントまたはアプリケーション レイヤー間のクリーンな分離をサポートするため、分散プッシュベースの通知に適しています。 このパターンは、プロバイダーがコールバックを使用してクライアントに現在の情報を提供するたびに実装できます。
パターンを実装するには、次の詳細を指定する必要があります。
プロバイダーまたは件名。オブザーバーに通知を送信するオブジェクトです。 プロバイダーは、 IObservable<T> インターフェイスを実装するクラスまたは構造体です。 プロバイダーは、プロバイダーから通知を受信するオブザーバーによって呼び出される 1 つのメソッド ( IObservable<T>.Subscribe) を実装する必要があります。
オブザーバー。プロバイダーから通知を受け取るオブジェクトです。 オブザーバーは、 IObserver<T> インターフェイスを実装するクラスまたは構造体です。 オブザーバーは、プロバイダーによって呼び出される 3 つのメソッドを実装する必要があります。
- IObserver<T>.OnNext新しい情報または現在の情報をオブザーバーに提供します。
- IObserver<T>.OnError:エラーが発生したことをオブザーバーに通知します。
- IObserver<T>.OnCompleted: プロバイダーが通知の送信を完了したことを示します。
プロバイダーがオブザーバーを追跡できるようにするメカニズム。 通常、プロバイダーは、 System.Collections.Generic.List<T> オブジェクトなどのコンテナー オブジェクトを使用して、通知をサブスクライブした IObserver<T> 実装への参照を保持します。 この目的でストレージ コンテナーを使用すると、プロバイダーは 0 から無制限のオブザーバーを処理できます。 オブザーバーが通知を受信する順序は定義されていません。プロバイダーは、任意のメソッドを自由に使用して順序を決定できます。
通知が完了したときにプロバイダーがオブザーバーを削除できるようにする IDisposable 実装。 オブザーバーは、IDisposable メソッドからSubscribe実装への参照を受け取るので、プロバイダーが通知の送信を完了する前に、IDisposable.Dispose メソッドを呼び出してサブスクライブを解除することもできます。
プロバイダーがオブザーバーに送信するデータを格納しているオブジェクト。 このオブジェクトの型は、 IObservable<T> インターフェイスと IObserver<T> インターフェイスのジェネリック型パラメーターに対応します。 このオブジェクトは IObservable<T> 実装と同じにすることができますが、通常は別の型です。
注
オブザーバー デザイン パターンの実装に加えて、 IObservable<T> インターフェイスと IObserver<T> インターフェイスを使用して構築されたライブラリの探索にも関心がある場合があります。 たとえば、 .NET (Rx) のリアクティブ拡張 は、一連の拡張メソッドと、非同期プログラミングをサポートする LINQ 標準シーケンス演算子で構成されます。
代替手段を検討する場合
IObservable<T>
/
IObserver<T> インターフェイスはプッシュベースの通知シナリオに適していますが、.NETには、より適した他のパターンが用意されています。
- Standard .NET イベント — 1 つのアプリケーション内の単純な通知シナリオでは、events はより慣用的で実装が簡単です。
-
IAsyncEnumerable<T>— コンシューマーがペースを制御する非同期プルベースのシーケンスの場合は、 非同期ストリームを使用します。 -
System.Threading.Channels— バックプレッシャと非同期サポートを備えたプロデューサー/コンシューマー パターンの場合は、 System.Threading.Channelsを使用します。 -
Reactive Extensions (Rx.NET) — 複雑なイベントの構成、フィルター処理、および変換の場合は、
System.Reactiveを直接実装する代わりに、IObservable<T>パッケージを使用します。
.NETでの IObservable<T> の最も顕著な用途は DiagnosticListener です。これにより、フレームワークおよびライブラリの作成者は、コンシューマーがサブスクライブする構造化された診断イベントを生成できます。
パターンを実装する
次の例では、オブザーバーの設計パターンを使用して、空港手荷物クレーム情報システムを実装します。
BaggageInfoクラスでは、到着する便とその便からの手荷物を受け取るカルーセルに関する情報を提供します。 次の例に示します。
namespace Observables.Example;
public readonly record struct BaggageInfo(
int FlightNumber,
string From,
int Carousel);
Namespace Example
Public Structure BaggageInfo
Implements IEquatable(Of BaggageInfo)
Public ReadOnly Property FlightNumber As Integer
Public ReadOnly Property From As String
Public ReadOnly Property Carousel As Integer
Public Sub New(flightNumber As Integer, from As String, carousel As Integer)
Me.FlightNumber = flightNumber
Me.From = from
Me.Carousel = carousel
End Sub
Public Overloads Function Equals(other As BaggageInfo) As Boolean Implements IEquatable(Of BaggageInfo).Equals
Return FlightNumber = other.FlightNumber AndAlso
From = other.From AndAlso
Carousel = other.Carousel
End Function
Public Overrides Function Equals(obj As Object) As Boolean
If TypeOf obj Is BaggageInfo Then
Return Equals(DirectCast(obj, BaggageInfo))
End If
Return False
End Function
Public Overrides Function GetHashCode() As Integer
Return HashCode.Combine(FlightNumber, From, Carousel)
End Function
Public Shared Operator =(left As BaggageInfo, right As BaggageInfo) As Boolean
Return left.Equals(right)
End Operator
Public Shared Operator <>(left As BaggageInfo, right As BaggageInfo) As Boolean
Return Not left.Equals(right)
End Operator
End Structure
End Namespace
BaggageHandlerクラスは、到着便と手荷物受取用カルーセルに関する情報を受け取る役割を担っています。 内部的には、次の 2 つのコレクションが保持されます。
-
_observers: 更新された情報を監視するクライアントのコレクション。 -
_flights: 飛行機の便と、各便に割り当てられた手荷物の受け取り場所のコレクション。
BaggageHandler クラスのソース コードを次の例に示します。
namespace Observables.Example;
public sealed class BaggageHandler : IObservable<BaggageInfo>
{
private readonly Lock _lock = new();
private readonly HashSet<IObserver<BaggageInfo>> _observers = [];
private readonly HashSet<BaggageInfo> _flights = [];
public IDisposable Subscribe(IObserver<BaggageInfo> observer)
{
BaggageInfo[] snapshot;
lock (_lock)
{
// Check whether observer is already registered. If not, add it.
if (!_observers.Add(observer))
{
return new Unsubscriber<BaggageInfo>(_lock, _observers, observer);
}
// Snapshot existing data while holding the lock.
snapshot = [.. _flights];
}
// Provide observer with existing data outside the lock.
foreach (BaggageInfo item in snapshot)
{
observer.OnNext(item);
}
return new Unsubscriber<BaggageInfo>(_lock, _observers, observer);
}
// Called to indicate all baggage is now unloaded.
public void BaggageStatus(int flightNumber) =>
BaggageStatus(flightNumber, string.Empty, 0);
public void BaggageStatus(int flightNumber, string from, int carousel)
{
var info = new BaggageInfo(flightNumber, from, carousel);
IObserver<BaggageInfo>[] snapshot;
// Carousel is assigned, so add new info object to list.
if (carousel > 0)
{
lock (_lock)
{
if (!_flights.Add(info))
{
return;
}
snapshot = [.. _observers];
}
foreach (IObserver<BaggageInfo> observer in snapshot)
{
observer.OnNext(info);
}
}
else if (carousel is 0)
{
// Baggage claim for flight is done.
lock (_lock)
{
if (_flights.RemoveWhere(
flight => flight.FlightNumber == info.FlightNumber) == 0)
{
return;
}
snapshot = [.. _observers];
}
foreach (IObserver<BaggageInfo> observer in snapshot)
{
observer.OnNext(info);
}
}
}
public void LastBaggageClaimed()
{
IObserver<BaggageInfo>[] snapshot;
lock (_lock)
{
snapshot = [.. _observers];
_observers.Clear();
}
foreach (IObserver<BaggageInfo> observer in snapshot)
{
observer.OnCompleted();
}
}
}
Namespace Example
Public NotInheritable Class BaggageHandler
Implements IObservable(Of BaggageInfo)
Private ReadOnly _lock As New Object()
Private ReadOnly _observers As New HashSet(Of IObserver(Of BaggageInfo))()
Private ReadOnly _flights As New HashSet(Of BaggageInfo)()
Public Function Subscribe(observer As IObserver(Of BaggageInfo)) As IDisposable Implements IObservable(Of BaggageInfo).Subscribe
Dim snapshot As BaggageInfo()
SyncLock _lock
' Check whether observer is already registered. If not, add it.
If Not _observers.Add(observer) Then
Return New Unsubscriber(Of BaggageInfo)(_lock, _observers, observer)
End If
' Snapshot existing data while holding the lock.
snapshot = _flights.ToArray()
End SyncLock
' Provide observer with existing data outside the lock.
For Each item As BaggageInfo In snapshot
observer.OnNext(item)
Next
Return New Unsubscriber(Of BaggageInfo)(_lock, _observers, observer)
End Function
' Called to indicate all baggage is now unloaded.
Public Sub BaggageStatus(flightNumber As Integer)
BaggageStatus(flightNumber, String.Empty, 0)
End Sub
Public Sub BaggageStatus(flightNumber As Integer, from As String, carousel As Integer)
Dim info As New BaggageInfo(flightNumber, from, carousel)
Dim snapshot As IObserver(Of BaggageInfo)()
' Carousel is assigned, so add new info object to list.
If carousel > 0 Then
SyncLock _lock
If Not _flights.Add(info) Then
Return
End If
snapshot = _observers.ToArray()
End SyncLock
For Each observer As IObserver(Of BaggageInfo) In snapshot
observer.OnNext(info)
Next
ElseIf carousel = 0 Then
' Baggage claim for flight is done.
SyncLock _lock
If _flights.RemoveWhere(
Function(flight) flight.FlightNumber = info.FlightNumber) = 0 Then
Return
End If
snapshot = _observers.ToArray()
End SyncLock
For Each observer As IObserver(Of BaggageInfo) In snapshot
observer.OnNext(info)
Next
End If
End Sub
Public Sub LastBaggageClaimed()
Dim snapshot As IObserver(Of BaggageInfo)()
SyncLock _lock
snapshot = _observers.ToArray()
_observers.Clear()
End SyncLock
For Each observer As IObserver(Of BaggageInfo) In snapshot
observer.OnCompleted()
Next
End Sub
End Class
End Namespace
更新された情報を受信するクライアントは、 BaggageHandler.Subscribe メソッドを呼び出します。 クライアントが以前に通知をサブスクライブしていない場合は、クライアントの IObserver<T> 実装への参照が _observers コレクションに追加されます。
オーバーロードされた BaggageHandler.BaggageStatus メソッドを呼び出して、フライトからの手荷物がアンロードされているか、またはアンロードされていないことを示すことができます。 最初の場合、このメソッドにはフライト番号、フライトが出発した空港、荷物が降ろされているカルーセルが渡されます。 2 番目のケースでは、メソッドにはフライト番号のみが渡されます。 アンロード中の手荷物については、メソッドに渡された BaggageInfo 情報が _flights コレクションに存在するかどうかを確認します。 そうでない場合、メソッドは情報を追加し、各オブザーバーの OnNext メソッドを呼び出します。 手荷物がアンロードされなくなったフライトの場合、このメソッドは、そのフライトに関する情報が _flights コレクションに保存されているかどうかを確認します。 その場合、メソッドは各オブザーバーのOnNext メソッドを呼び出し、BaggageInfo コレクションから_flights オブジェクトを削除します。
その日の最後のフライトが着陸し、その手荷物が処理されると、 BaggageHandler.LastBaggageClaimed メソッドが呼び出されます。 このメソッドは、各オブザーバーの OnCompleted メソッドを呼び出して、すべての通知が完了したことを示し、 _observers コレクションをクリアします。
プロバイダーの Subscribe メソッドは、IDisposable メソッドが呼び出される前にオブザーバーが通知の受信を停止できるようにするOnCompleted実装を返します。 この Unsubscriber クラスのソース コードを次の例に示します。 クラスが BaggageHandler.Subscribe メソッドでインスタンス化されると、 _lock オブジェクトへの参照、 _observers コレクション、およびコレクションに追加されるオブザーバーへの参照が渡されます。 これらの参照はローカル変数に割り当てられます。 オブジェクトの Dispose メソッドが呼び出されると、ロック内の _observers コレクションからオブザーバーが削除されます。
namespace Observables.Example;
internal sealed class Unsubscriber<T> : IDisposable
{
private readonly Lock _lock;
private readonly ISet<IObserver<T>> _observers;
private readonly IObserver<T> _observer;
internal Unsubscriber(
Lock @lock,
ISet<IObserver<T>> observers,
IObserver<T> observer) => (_lock, _observers, _observer) = (@lock, observers, observer);
public void Dispose()
{
lock (_lock)
{
_observers.Remove(_observer);
}
}
}
Namespace Example
Friend NotInheritable Class Unsubscriber(Of T)
Implements IDisposable
Private ReadOnly _lock As Object
Private ReadOnly _observers As ISet(Of IObserver(Of T))
Private ReadOnly _observer As IObserver(Of T)
Friend Sub New(lock As Object, observers As ISet(Of IObserver(Of T)), observer As IObserver(Of T))
_lock = lock
_observers = observers
_observer = observer
End Sub
Public Sub Dispose() Implements IDisposable.Dispose
SyncLock _lock
_observers.Remove(_observer)
End SyncLock
End Sub
End Class
End Namespace
次の例では、IObserver<T>という名前のArrivalsMonitor実装を示します。これは、手荷物の請求情報を表示する基本クラスです。 情報は、発信都市の名前でアルファベット順に表示されます。
ArrivalsMonitorのメソッドはoverridable (Visual Basic の場合) または virtual (C# の場合) としてマークされるため、派生クラスでオーバーライドできます。
namespace Observables.Example;
public class ArrivalsMonitor : IObserver<BaggageInfo>
{
private readonly string _name;
private readonly Lock _lock = new();
private readonly List<string> _flights = [];
private readonly string _format = "{0,-20} {1,5} {2, 3}";
private IDisposable? _cancellation;
public ArrivalsMonitor(string name)
{
ArgumentException.ThrowIfNullOrEmpty(name);
_name = name;
}
public virtual void Subscribe(BaggageHandler provider) =>
_cancellation = provider.Subscribe(this);
public virtual void Unsubscribe()
{
Interlocked.Exchange(ref _cancellation, null)?.Dispose();
lock (_lock)
{
_flights.Clear();
}
}
public virtual void OnCompleted()
{
lock (_lock)
{
_flights.Clear();
}
}
// No implementation needed: Method is not called by the BaggageHandler class.
public virtual void OnError(Exception e)
{
// No implementation.
}
// Update information.
public virtual void OnNext(BaggageInfo info)
{
bool updated = false;
lock (_lock)
{
// Flight has unloaded its baggage; remove from the monitor.
if (info.Carousel is 0)
{
string flightNumber = $"{info.FlightNumber,5}";
for (int index = _flights.Count - 1; index >= 0; index--)
{
string flightInfo = _flights[index];
if (flightInfo.Substring(21, 5).Equals(flightNumber))
{
updated = true;
_flights.RemoveAt(index);
}
}
}
else
{
// Add flight if it doesn't exist in the collection.
string flightInfo = string.Format(_format, info.From, info.FlightNumber, info.Carousel);
if (_flights.Contains(flightInfo) is false)
{
_flights.Add(flightInfo);
updated = true;
}
}
if (updated)
{
_flights.Sort();
Console.WriteLine($"Arrivals information from {_name}");
foreach (string flightInfo in _flights)
{
Console.WriteLine(flightInfo);
}
Console.WriteLine();
}
}
}
}
Imports System.Threading
Namespace Example
Public Class ArrivalsMonitor
Implements IObserver(Of BaggageInfo)
Private ReadOnly _name As String
Private ReadOnly _lock As New Object()
Private ReadOnly _flights As New List(Of String)()
Private ReadOnly _format As String = "{0,-20} {1,5} {2, 3}"
Private _cancellation As IDisposable
Public Sub New(name As String)
If String.IsNullOrEmpty(name) Then
Throw New ArgumentException("Value cannot be null or empty.", NameOf(name))
End If
_name = name
End Sub
Public Overridable Sub Subscribe(provider As BaggageHandler)
_cancellation = provider.Subscribe(Me)
End Sub
Public Overridable Sub Unsubscribe()
Dim previous = Interlocked.Exchange(_cancellation, Nothing)
previous?.Dispose()
SyncLock _lock
_flights.Clear()
End SyncLock
End Sub
Public Overridable Sub OnCompleted() Implements IObserver(Of BaggageInfo).OnCompleted
SyncLock _lock
_flights.Clear()
End SyncLock
End Sub
' No implementation needed: Method is not called by the BaggageHandler class.
Public Overridable Sub OnError([error] As Exception) Implements IObserver(Of BaggageInfo).OnError
' No implementation.
End Sub
' Update information.
Public Overridable Sub OnNext(info As BaggageInfo) Implements IObserver(Of BaggageInfo).OnNext
Dim updated As Boolean = False
SyncLock _lock
' Flight has unloaded its baggage; remove from the monitor.
If info.Carousel = 0 Then
Dim flightNumber As String = String.Format("{0,5}", info.FlightNumber)
For index As Integer = _flights.Count - 1 To 0 Step -1
Dim flightInfo As String = _flights(index)
If flightInfo.Substring(21, 5).Equals(flightNumber) Then
updated = True
_flights.RemoveAt(index)
End If
Next
Else
' Add flight if it doesn't exist in the collection.
Dim flightInfo As String = String.Format(_format, info.From, info.FlightNumber, info.Carousel)
If Not _flights.Contains(flightInfo) Then
_flights.Add(flightInfo)
updated = True
End If
End If
If updated Then
_flights.Sort()
Console.WriteLine($"Arrivals information from {_name}")
For Each flightInfo As String In _flights
Console.WriteLine(flightInfo)
Next
Console.WriteLine()
End If
End SyncLock
End Sub
End Class
End Namespace
ArrivalsMonitor クラスには、SubscribeメソッドとUnsubscribeメソッドが含まれます。
Subscribe メソッドを使用すると、クラスはプライベート変数にIDisposableする呼び出しによって返されるSubscribe実装を保存できます。
Unsubscribe メソッドを使用すると、クラスはプロバイダーのDispose実装を呼び出すことによって、通知のサブスクライブを解除できます。
ArrivalsMonitor には、 OnNext、 OnError、および OnCompleted メソッドの実装も用意されています。 大量のコードが含まれているのは、 OnNext 実装だけです。 このメソッドは、到着便の出発地空港と手荷物が利用可能なカルーセルに関する情報を保持する、プライベートで並べ替えられた汎用の List<T> オブジェクトで動作します。
BaggageHandler クラスが新しいフライト到着を報告する場合、OnNext メソッドの実装は、そのフライトに関する情報を一覧に追加します。
BaggageHandlerクラスがフライトの手荷物がアンロードされたことを報告した場合、OnNextメソッドはそのフライトをリストから削除します。 変更が行われるたびに、リストが並べ替えられてコンソールに表示されます。
次の例には、 BaggageHandler クラスと ArrivalsMonitor クラスの 2 つのインスタンスをインスタンス化し、 BaggageHandler.BaggageStatus メソッドを使用して到着フライトに関する情報を追加および削除するアプリケーション エントリ ポイントが含まれています。 いずれの場合も、オブザーバーは更新プログラムを受け取り、手荷物受取情報を正しく表示します。
using Observables.Example;
BaggageHandler provider = new();
ArrivalsMonitor observer1 = new("BaggageClaimMonitor1");
ArrivalsMonitor observer2 = new("SecurityExit");
provider.BaggageStatus(712, "Detroit", 3);
observer1.Subscribe(provider);
provider.BaggageStatus(712, "Kalamazoo", 3);
provider.BaggageStatus(400, "New York-Kennedy", 1);
provider.BaggageStatus(712, "Detroit", 3);
observer2.Subscribe(provider);
provider.BaggageStatus(511, "San Francisco", 2);
provider.BaggageStatus(712);
observer2.Unsubscribe();
provider.BaggageStatus(400);
provider.LastBaggageClaimed();
Imports Observables.Example
Imports System.Threading
Module Program
Sub Main(args As String())
Dim provider As New BaggageHandler()
Dim observer1 As New ArrivalsMonitor("BaggageClaimMonitor1")
Dim observer2 As New ArrivalsMonitor("SecurityExit")
provider.BaggageStatus(712, "Detroit", 3)
observer1.Subscribe(provider)
provider.BaggageStatus(712, "Kalamazoo", 3)
provider.BaggageStatus(400, "New York-Kennedy", 1)
provider.BaggageStatus(712, "Detroit", 3)
observer2.Subscribe(provider)
provider.BaggageStatus(511, "San Francisco", 2)
provider.BaggageStatus(712)
observer2.Unsubscribe()
provider.BaggageStatus(400)
provider.LastBaggageClaimed()
End Sub
End Module
関連するコンテンツ
.NET