Control.Observable 模块 (F#)

针对第一类事件和其他可观测对象的基本操作。

命名空间/模块路径: Microsoft.FSharp.Control

程序集:FSharp.Core(在 FSharp.Core.dll 中)

module Observable

说明

添加 : ('T -> unit) -> IObservable<'T> -> unit

创建一个观察器,用于永久性订阅给定的可观察对象并为每个观察值调用给定函数。

choose : ('T -> 'U option) -> IObservable<'T> -> IObservable<'U>

返回一个可观测对象,该对象使用给定函数从源中选择观测值的投影。返回的对象将触发拆分器为其返回 Some 的观测值。返回的对象还会传播从源中生成的所有错误,并在源完成时完成。

filter : ('T -> bool) -> IObservable<'T> -> IObservable<'T>

返回一个可观测对象,该对象利用给定函数对源的观测值进行筛选。该可观测对象将只会看到谓词为其返回 true 的那些观测值。将为每个已订阅的观测者执行一次该谓词。返回的对象还会传播从源中生成的错误观测值,并在源完成时完成。

map : ('T -> 'U) -> IObservable<'T> -> IObservable<'U>

返回一个可观察对象,该对象利用给定函数转换源的观察值。会为每个订阅的观察器执行一次转换函数。返回的对象还会传播从源中生成的错误观测值,并在源完成时完成。

merge : IObservable<'T> -> IObservable<'T> -> IObservable<'T>

为源中合并的观测值返回一个可观测对象。返回的对象将传播从其中一个源生成的成功和错误值,并在两个源都完成时完成。

pairwise : IObservable<'T> -> IObservable<'T * 'T>

返回新的可观测对象,在第二次以及后续触发输入可观测对象时,将会触发该对象。输入可观测对象的第 N 次触发可将第 N-1 次和第 N 次触发中的参数作为一个值对进行传递。传递给第 N-1 个触发的参数将保持为隐藏内部状态,直至第 N 个触发发生为止。

Partition — 分区 : ('T -> bool) -> IObservable<'T> -> IObservable<'T> * IObservable<'T>

返回两个可观测对象,这两个对象利用给定函数对源的观测值进行分区。第一个对象将对该谓词为其返回 true 的那些值触发观测值。第二个对象将对该谓词为其返回 false 的那些值触发观测值。将为每个已订阅的观测者执行一次该谓词。这两个对象还会传播从源中生成的所有错误观测值,并且每个对象都在源完成时完成。

scan : ('U -> 'T -> 'U) -> 'U -> IObservable<'T> -> IObservable<'T>

返回一个可观察对象,该对象为每个观察器分配状态项,并将给定累积函数应用于输入产生的连续值。返回的对象将为每个计算的状态值(初始值除外)触发观察值。返回的对象将传播从源中生成的所有错误,并在源完成时完成。

split : ('T -> Choice<'U1,'U2>) -> IObservable<'T> -> IObservable<'U1> * IObservable<'U2>

返回两个可观察对象,这两个对象利用给定函数对源的观察值进行拆分。第一个对象将触发拆分器为其返回 Choice1Of2 的观测值。第二个对象将触发拆分器为其返回 Choice2Of2 的观测值 y。将为每个已订阅的观测者执行一次拆分器。这两个对象还会传播从源中生成的错误观察值,并且每个对象都在源完成时完成。

subscribe : ('T -> unit) -> IObservable<'T> -> IDisposable

创建一个订阅给定的可观测对象并为每个观测值调用给定函数的观测者。

示例

以下代码示例演示如何使用可观察对象。此示例中定义的 ObserverSource 是一个通用的可重用类,可以作为观察事件的源来使用。在模块中使用某些功能的示例如下所示;对不在这里展示出的函数可以引用 Control.Event 模块 (F#) 中的代码示例。

open System
open System.Diagnostics

// Represents a stream of IObserver events.
type ObservableSource<'T>() =

    let protect function1 =
        let mutable ok = false
        try 
            function1()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver method threw an exception.")

    let mutable key = 0

    // Use a Map, not a Dictionary, because callers might unsubscribe in the OnNext
    // method, so thread-safe snapshots of subscribers to iterate over are needed.
    let mutable subscriptions = Map.empty : Map<int, IObserver<'T>>

    let next(obs) = 
        subscriptions |> Seq.iter (fun (KeyValue(_, value)) -> 
            protect (fun () -> value.OnNext(obs)))

    let completed() = 
        subscriptions |> Seq.iter (fun (KeyValue(_, value)) -> 
            protect (fun () -> value.OnCompleted()))

    let error(err) = 
        subscriptions |> Seq.iter (fun (KeyValue(_, value)) -> 
            protect (fun () -> value.OnError(err)))

    let thisLock = new obj()

    let obs = 
        { new IObservable<'T> with
            member this.Subscribe(obs) =
                let key1 =
                    lock thisLock (fun () ->
                        let key1 = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(key1, obs)
                        key1)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(key1)) } }

    let mutable finished = false

    // The source ought to call these methods in serialized fashion (from
    // any thread, but serialized and non-reentrant).
    member this.Next(obs) =
        Debug.Assert(not finished, "IObserver is already finished")
        next obs

    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()

    member this.Error(err) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error err

    // The IObservable object returned is thread-safe; you can subscribe 
    // and unsubscribe (Dispose) concurrently.
    member this.AsObservable = obs

// Create a source.
let source = new ObservableSource<int>()

// Get an IObservable from the source.
let obs = source.AsObservable 

// Add a simple subscriber.
let unsubA = obs |> Observable.subscribe (fun x -> printfn "A: %d" x)

// Send some messages from the source.
// Output: A: 1
source.Next(1)
// Output: A: 2
source.Next(2)

// Add another subscriber. This subscriber has a filter.
let unsubB =
    obs
    |> Observable.filter (fun num -> num % 2 = 0)
    |> Observable.subscribe (fun num -> printfn "B: %d" num)

// Send more messages from the source.
// Output: A: 3
source.Next(3)
// Output: A: 4
//         B: 4
source.Next(4)

// Have subscriber A unsubscribe.
unsubA.Dispose()

// Send more messages from the source.
// No output
source.Next(5)
// Output: B: 6
source.Next(6)

// If you use add, there is no way to unsubscribe from the event.
obs |> Observable.add(fun x -> printfn "C: %d" x)

// Now add a subscriber that only does positive numbers and transforms
// the numbers into another type, here a string.
let unsubD =
    obs |> Observable.choose (fun int1 ->
             if int1 >= 0 then None else Some(int1.ToString()))
        |> Observable.subscribe(fun string1 -> printfn "D: %s" string1)

let unsubE =
    obs |> Observable.filter (fun int1 -> int1 >= 0)
        |> Observable.subscribe(fun int1 -> printfn "E: %d" int1)

let unsubF =
    obs |> Observable.map (fun int1 -> int1.ToString())
        |> Observable.subscribe (fun string1 -> printfn "F: %s" string1)


平台

Windows 8,Windows 7,Windows server 2012中,Windows server 2008 R2

版本信息

F#核心库版本

支持:2.0,4.0,可移植

请参见

参考

Microsoft.FSharp.Control 命名空间 (F#)