IObservable<T> 接口

定义

定义基于推送的通知的提供程序。

generic <typename T>
public interface class IObservable
public interface IObservable<out T>
type IObservable<'T> = interface
Public Interface IObservable(Of Out T)

类型参数

T

提供通知信息的对象。

这是协变类型参数。 即,可以使用指定的类型,也可以使用派生程度较高的任何类型。 有关协变和逆变的详细信息,请参阅泛型中的协变和逆变
派生

示例

以下示例演示观察程序设计模式。 它定义一个 Location 包含纬度和经度信息的类。

public struct Location
{
   double lat, lon;

   public Location(double latitude, double longitude)
   {
      this.lat = latitude;
      this.lon = longitude;
   }

   public double Latitude
   { get { return this.lat; } }

   public double Longitude
   { get { return this.lon; } }
}
[<Struct>]
type Location =
    { Latitude: double
      Longitude: double }
Public Structure Location
   Dim lat, lon As Double

   Public Sub New(ByVal latitude As Double, ByVal longitude As Double)
      Me.lat = latitude
      Me.lon = longitude
   End Sub

   Public ReadOnly Property Latitude As Double
      Get
         Return Me.lat
      End Get
   End Property

   Public ReadOnly Property Longitude As Double
      Get
         Return Me.lon
      End Get
   End Property
End Structure

LocationTracker 类提供 IObservable<T> 实现。 其 TrackLocation 方法传递了一个可为 Location null 的对象,该对象包含纬度和经度数据。 Location如果值不是null,该方法TrackLocation将调用OnNext每个观察程序的方法。

public class LocationTracker : IObservable<Location>
{
   public LocationTracker()
   {
      observers = new List<IObserver<Location>>();
   }

   private List<IObserver<Location>> observers;

   public IDisposable Subscribe(IObserver<Location> observer)
   {
      if (! observers.Contains(observer))
         observers.Add(observer);
      return new Unsubscriber(observers, observer);
   }

   private class Unsubscriber : IDisposable
   {
      private List<IObserver<Location>>_observers;
      private IObserver<Location> _observer;

      public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
      {
         this._observers = observers;
         this._observer = observer;
      }

      public void Dispose()
      {
         if (_observer != null && _observers.Contains(_observer))
            _observers.Remove(_observer);
      }
   }

   public void TrackLocation(Nullable<Location> loc)
   {
      foreach (var observer in observers) {
         if (! loc.HasValue)
            observer.OnError(new LocationUnknownException());
         else
            observer.OnNext(loc.Value);
      }
   }

   public void EndTransmission()
   {
      foreach (var observer in observers.ToArray())
         if (observers.Contains(observer))
            observer.OnCompleted();

      observers.Clear();
   }
}
type Unsubscriber(observers: ResizeArray<IObserver<Location>>, observer: IObserver<Location>) =
    interface IDisposable with
        member _.Dispose() =
            if observer <> null && observers.Contains observer then
                observers.Remove observer |> ignore

type LocationTracker() =
    let observers = ResizeArray<IObserver<Location>>()

    interface IObservable<Location> with
        member _.Subscribe(observer) =
            if observers.Contains observer |> not then
                observers.Add observer
            new Unsubscriber(observers, observer)

    member _.TrackLocation(loc: Nullable<Location>) =
        for observer in observers do
            if not loc.HasValue then
                observer.OnError LocationUnknownException
            else
                observer.OnNext loc.Value

    member _.EndTransmission() =
        for observer in observers.ToArray() do
            if observers.Contains observer then
                observer.OnCompleted()
        observers.Clear()
Public Class LocationTracker : Implements IObservable(Of Location)

   Public Sub New()
      observers = New List(Of IObserver(Of Location))
   End Sub

   Private observers As List(Of IObserver(Of Location))

   Public Function Subscribe(ByVal observer As System.IObserver(Of Location)) As System.IDisposable _
                            Implements System.IObservable(Of Location).Subscribe
      If Not observers.Contains(observer) Then
         observers.Add(observer)
      End If
      Return New Unsubscriber(observers, observer)
   End Function

   Private Class Unsubscriber : Implements IDisposable
      Private _observers As List(Of IObserver(Of Location))
      Private _observer As IObserver(Of Location)

      Public Sub New(ByVal observers As List(Of IObserver(Of Location)), ByVal observer As IObserver(Of Location))
         Me._observers = observers
         Me._observer = observer
      End Sub

      Public Sub Dispose() Implements IDisposable.Dispose
         If _observer IsNot Nothing AndAlso _observers.Contains(_observer) Then
            _observers.Remove(_observer)
         End If
      End Sub
   End Class

   Public Sub TrackLocation(ByVal loc As Nullable(Of Location))
      For Each observer In observers
         If Not loc.HasValue Then
            observer.OnError(New LocationUnknownException())
         Else
            observer.OnNext(loc.Value)
         End If
      Next
   End Sub

   Public Sub EndTransmission()
      For Each observer In observers.ToArray()
         If observers.Contains(observer) Then observer.OnCompleted()
      Next
      observers.Clear()
   End Sub
End Class

Location如果值为值null,该方法TrackLocation将实例化一个LocationUnknownException对象,如以下示例所示。 然后,它会调用每个观察程序 OnError 的方法,并传递该 LocationUnknownException 对象。 请注意, LocationUnknownException 派生自 Exception,但不添加任何新成员。

public class LocationUnknownException : Exception
{
   internal LocationUnknownException()
   { }
}
exception LocationUnknownException
Public Class LocationUnknownException : Inherits Exception
   Friend Sub New()
   End Sub
End Class

观察程序通过调用对象IObservable<T>.Subscribe的方法注册以接收通知TrackLocation,该方法将对观察程序对象的引用分配给专用泛型List<T>对象。 该方法返回一个对象,该对象是一个UnsubscriberIDisposable实现,使观察程序能够停止接收通知。 该 LocationTracker 类还包括一个 EndTransmission 方法。 如果没有其他位置数据可用,该方法将调用每个观察程序的方法,然后清除观察程序 OnCompleted 的内部列表。

在此示例中,类 LocationReporter 提供 IObserver<T> 实现。 它显示有关控制台当前位置的信息。 它的构造函数包括一个 name 参数,该参数使 LocationReporter 实例能够在其字符串输出中标识自身。 它还包括一个 Subscribe 方法,该方法包装对提供程序方法的 Subscribe 调用。 这样,该方法就可以向私有变量分配返回 IDisposable 的引用。 该 LocationReporter 类还包括一个 Unsubscribe 方法,该方法调用 IDisposable.Dispose 该方法 IObservable<T>.Subscribe 返回的对象的方法。 以下代码定义 LocationReporter 类。

using System;

public class LocationReporter : IObserver<Location>
{
   private IDisposable unsubscriber;
   private string instName;

   public LocationReporter(string name)
   {
      this.instName = name;
   }

   public string Name
   {  get{ return this.instName; } }

   public virtual void Subscribe(IObservable<Location> provider)
   {
      if (provider != null)
         unsubscriber = provider.Subscribe(this);
   }

   public virtual void OnCompleted()
   {
      Console.WriteLine("The Location Tracker has completed transmitting data to {0}.", this.Name);
      this.Unsubscribe();
   }

   public virtual void OnError(Exception e)
   {
      Console.WriteLine("{0}: The location cannot be determined.", this.Name);
   }

   public virtual void OnNext(Location value)
   {
      Console.WriteLine("{2}: The current location is {0}, {1}", value.Latitude, value.Longitude, this.Name);
   }

   public virtual void Unsubscribe()
   {
      unsubscriber.Dispose();
   }
}
open System

type LocationReporter(name) =
    let mutable unsubscriber = Unchecked.defaultof<IDisposable>

    member _.Name = name

    member this.Subscribe(provider: IObservable<Location>) =
        if provider <> null then
            unsubscriber <- provider.Subscribe this

    member _.Unsubscribe() =
        unsubscriber.Dispose()

    interface IObserver<Location> with
        member this.OnCompleted() =
            printfn $"The Location Tracker has completed transmitting data to {name}."
            this.Unsubscribe()

        member _.OnError(_) =
            printfn $"{name}: The location cannot be determined."

        member _.OnNext(value) =
            printfn $"{name}: The current location is {value.Latitude}, {value.Longitude}"
Public Class LocationReporter : Implements IObserver(Of Location)
   Dim unsubscriber As IDisposable
   Dim instName As String

   Public Sub New(ByVal name As String)
      Me.instName = name
   End Sub

   Public ReadOnly Property Name As String
      Get
         Return instName
      End Get
   End Property

   Public Overridable Sub Subscribe(ByVal provider As IObservable(Of Location))
      If provider Is Nothing Then Exit Sub
      unsubscriber = provider.Subscribe(Me)
   End Sub

   Public Overridable Sub OnCompleted() Implements System.IObserver(Of Location).OnCompleted
      Console.WriteLine("The Location Tracker has completed transmitting data to {0}.", Me.Name)
      Me.Unsubscribe()
   End Sub

   Public Overridable Sub OnError(ByVal e As System.Exception) Implements System.IObserver(Of Location).OnError
      Console.WriteLine("{0}: The location cannot be determined.", Me.Name)
   End Sub

   Public Overridable Sub OnNext(ByVal value As Location) Implements System.IObserver(Of Location).OnNext
      Console.WriteLine("{2}: The current location is {0}, {1}", value.Latitude, value.Longitude, Me.Name)
   End Sub

   Public Overridable Sub Unsubscribe()
      unsubscriber.Dispose()
   End Sub
End Class

然后,以下代码实例化提供程序和观察程序。

using System;

class Program
{
   static void Main(string[] args)
   {
      // Define a provider and two observers.
      LocationTracker provider = new LocationTracker();
      LocationReporter reporter1 = new LocationReporter("FixedGPS");
      reporter1.Subscribe(provider);
      LocationReporter reporter2 = new LocationReporter("MobileGPS");
      reporter2.Subscribe(provider);

      provider.TrackLocation(new Location(47.6456, -122.1312));
      reporter1.Unsubscribe();
      provider.TrackLocation(new Location(47.6677, -122.1199));
      provider.TrackLocation(null);
      provider.EndTransmission();
   }
}
// The example displays output similar to the following:
//      FixedGPS: The current location is 47.6456, -122.1312
//      MobileGPS: The current location is 47.6456, -122.1312
//      MobileGPS: The current location is 47.6677, -122.1199
//      MobileGPS: The location cannot be determined.
//      The Location Tracker has completed transmitting data to MobileGPS.
open System

// Define a provider and two observers.
let provider = LocationTracker()
let reporter1 = LocationReporter "FixedGPS"
reporter1.Subscribe provider
let reporter2 = LocationReporter "MobileGPS"
reporter2.Subscribe provider

provider.TrackLocation { Latitude = 47.6456; Longitude = -122.1312 }
reporter1.Unsubscribe()
provider.TrackLocation { Latitude = 47.6677; Longitude = -122.1199 }
provider.TrackLocation(Nullable())
provider.EndTransmission()
// The example displays output similar to the following:
//      FixedGPS: The current location is 47.6456, -122.1312
//      MobileGPS: The current location is 47.6456, -122.1312
//      MobileGPS: The current location is 47.6677, -122.1199
//      MobileGPS: The location cannot be determined.
//      The Location Tracker has completed transmitting data to MobileGPS.
Module Module1
   Dim provider As LocationTracker

   Sub Main()
      ' Define a provider and two observers.
      provider = New LocationTracker()
      Dim reporter1 As New LocationReporter("FixedGPS")
      reporter1.Subscribe(provider)
      Dim reporter2 As New LocationReporter("MobileGPS")
      reporter2.Subscribe(provider)

      provider.TrackLocation(New Location(47.6456, -122.1312))
      reporter1.Unsubscribe()
      provider.TrackLocation(New Location(47.6677, -122.1199))
      provider.TrackLocation(Nothing)
      provider.EndTransmission()
   End Sub
End Module
' The example displays output similar to the following:
'       FixedGPS: The current location is 47.6456, -122.1312
'       MobileGPS: The current location is 47.6456, -122.1312
'       MobileGPS: The current location is 47.6677, -122.1199
'       MobileGPS: The location cannot be determined.
'       The Location Tracker has completed transmitting data to MobileGPS.

注解

IObserver<T>IObservable<T>接口为基于推送的通知提供了通用机制,也称为观察程序设计模式。 接口 IObservable<T> 表示 (提供程序) 发送通知的类; IObserver<T> 接口表示 (观察程序) 接收通知的类。 T 表示提供通知信息的类。 在某些基于推送的通知中 IObserver<T> ,实现并 T 可以表示同一类型。

提供程序必须实现单个方法, Subscribe该方法指示观察者希望接收基于推送的通知。 方法的调用方传递观察者的实例。 该方法返回一个 IDisposable 实现,使观察者能够在提供程序停止发送通知之前随时取消通知。

在任何给定时间,给定的提供程序可能具有零、一个或多个观察程序。 提供程序负责存储对观察程序的引用,并确保它们在发送通知之前有效。 接口 IObservable<T> 不会对观察程序数或发送通知的顺序做出任何假设。

提供程序通过调用 IObserver<T> 方法向观察者发送以下三种类型的通知:

  • 当前数据。 提供程序可以调用 IObserver<T>.OnNext 该方法来传递具有 T 当前数据、已更改数据或新数据的对象。

  • 错误条件。 提供程序可以调用 IObserver<T>.OnError 该方法,以通知观察者发生了一些错误情况。

  • 没有进一步的数据。 提供程序可以调用 IObserver<T>.OnCompleted 该方法,以通知观察者已完成发送通知。

方法

Subscribe(IObserver<T>)

通知提供程序观察程序将接收通知。

适用于

另请参阅