Поделиться через


Модуль Control.Observable (F#)

Основные операции над событием первого класса и другими наблюдаемыми объектами.

Пространство имен/путь к модулю: Microsoft.FSharp.Control

Сборка: FSharp.Core (в FSharp.Core.dll)

module Observable

Значения

Значение

Описание

add : ('T -> unit) -> IObservable<'T> -> unit

Создает наблюдатель, который осуществляет постоянную подписку на заданный наблюдаемый объект и вызывает заданную функцию для каждого наблюдения.

choose : ('T -> 'U option) -> IObservable<'T> -> IObservable<'U>

Возвращает наблюдаемый объект, который выбирает из источника проекцию наблюдений с помощью заданной функции. Возвращаемый объект будет активировать наблюдения, для которых разделитель возвращает значение Some. Кроме того, возвращаемый объект распространяет все ошибки, поступающие из источника, и завершается при завершении источника.

filter : ('T -> bool) -> IObservable<'T> -> IObservable<'T>

Возвращает наблюдаемый объект, который фильтрует наблюдения из источника по указанной функции. Наблюдаемый объект будет видеть только те наблюдения, для которых предикат возвращает значение true. Предикат выполняется один раз для каждого наблюдателя-подписчика. Кроме того, возвращаемый объект распространяет наблюдения ошибок, поступающие из источника, и завершается при завершении источника.

MAP : ('T -> 'U) -> IObservable<'T> -> IObservable<'U>

Возвращает наблюдаемый объект, который преобразует наблюдения из источника с помощью заданной функции. Функция преобразования выполняется один раз для каждого наблюдателя-подписчика. Кроме того, возвращаемый объект распространяет наблюдения ошибок, поступающие из источника, и завершается при завершении источника.

слияние : IObservable<'T> -> IObservable<'T> -> IObservable<'T>

Возвращает наблюдаемый объект для объединенных наблюдений из источников. Возвращаемый объект распространяет значения успеха и ошибок из любого источника и завершается при завершении обоих источников.

pairwise : IObservable<'T> -> IObservable<'T * 'T>

Возвращает новый наблюдаемый объект, который активирует второй и последующие вызовы входного наблюдаемого объекта. Вызов N входного наблюдаемого объекта передает аргументы от вызовов N-1 и N в виде пары. Аргумент, переданный активации N-1, содержится в скрытом внутреннем состоянии до N-ной активации.

раздел : ('T -> bool) -> IObservable<'T> -> IObservable<'T> * IObservable<'T>

Возвращает два наблюдаемых объекта, которые разделяют наблюдения из источника с помощью заданной функции. Первый наблюдаемый объект активирует наблюдения для тех значений, для которых предикат возвращает значение true. Второй наблюдаемый объект активирует наблюдения для тех значений, для которых предикат возвращает значение false. Предикат выполняется один раз для каждого наблюдателя-подписчика. Оба наблюдаемых объекта распространяют все наблюдения ошибок, поступающие из источника, и завершаются при завершении источника.

scan : ('U -> 'T -> 'U) -> 'U -> IObservable<'T> -> IObservable<'T>

Возвращает наблюдаемый объект, который распределяет элемент состояния для каждого наблюдателя и применяет заданную функцию накопления к последовательным значениям, поступающим из входных данных. Возвращаемый объект активирует наблюдения для каждого вычисленного значения состояния, исключая начальное значение. Возвращаемый объект распространяет все ошибки, поступающие из источника, и завершается при завершении источника.

split : ('T -> Choice<'U1,'U2>) -> IObservable<'T> -> IObservable<'U1> * IObservable<'U2>

Возвращает два наблюдаемых объекта, которые разделяют наблюдения из источника с помощью заданной функции. Первый объект будет активировать наблюдения, для которых разделитель возвращает Choice1Of2. Второй объект будет активировать наблюдения y, для которых разделитель возвращает Choice2Of2. Разделитель выполняется один раз для каждого наблюдателя-подписчика. Оба наблюдаемых объекта распространяют наблюдения ошибок, поступающие из источника, и завершаются при завершении источника.

subscribe : ('T -> unit) -> IObservable<'T> -> IDisposable

Создает наблюдателя, который осуществляет подписку на заданный наблюдаемый объект и вызывает заданную функцию при каждом наблюдении.

Пример

В следующем примере показано, как использовать наблюдаемые объекты. Класс ObserverSource, определенный в этом примере — это класс общего назначения многократного использования, который можно использовать в качестве источника наблюдаемых событий. Здесь приведены примеры использования некоторых функций в этом модуле; для функций, не продемонстрированных здесь, см. примеры кода в Модуль Control.Event (F#).

open System
open System.Diagnostics

// Represents a stream of IObserver events.
type ObservableSource<'T>() =

    let protect function1 =
        let mutable ok = false
        try 
            function1()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver method threw an exception.")

    let mutable key = 0

    // Use a Map, not a Dictionary, because callers might unsubscribe in the OnNext
    // method, so thread-safe snapshots of subscribers to iterate over are needed.
    let mutable subscriptions = Map.empty : Map<int, IObserver<'T>>

    let next(obs) = 
        subscriptions |> Seq.iter (fun (KeyValue(_, value)) -> 
            protect (fun () -> value.OnNext(obs)))

    let completed() = 
        subscriptions |> Seq.iter (fun (KeyValue(_, value)) -> 
            protect (fun () -> value.OnCompleted()))

    let error(err) = 
        subscriptions |> Seq.iter (fun (KeyValue(_, value)) -> 
            protect (fun () -> value.OnError(err)))

    let thisLock = new obj()

    let obs = 
        { new IObservable<'T> with
            member this.Subscribe(obs) =
                let key1 =
                    lock thisLock (fun () ->
                        let key1 = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(key1, obs)
                        key1)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(key1)) } }

    let mutable finished = false

    // The source ought to call these methods in serialized fashion (from
    // any thread, but serialized and non-reentrant).
    member this.Next(obs) =
        Debug.Assert(not finished, "IObserver is already finished")
        next obs

    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()

    member this.Error(err) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error err

    // The IObservable object returned is thread-safe; you can subscribe 
    // and unsubscribe (Dispose) concurrently.
    member this.AsObservable = obs

// Create a source.
let source = new ObservableSource<int>()

// Get an IObservable from the source.
let obs = source.AsObservable 

// Add a simple subscriber.
let unsubA = obs |> Observable.subscribe (fun x -> printfn "A: %d" x)

// Send some messages from the source.
// Output: A: 1
source.Next(1)
// Output: A: 2
source.Next(2)

// Add another subscriber. This subscriber has a filter.
let unsubB =
    obs
    |> Observable.filter (fun num -> num % 2 = 0)
    |> Observable.subscribe (fun num -> printfn "B: %d" num)

// Send more messages from the source.
// Output: A: 3
source.Next(3)
// Output: A: 4
//         B: 4
source.Next(4)

// Have subscriber A unsubscribe.
unsubA.Dispose()

// Send more messages from the source.
// No output
source.Next(5)
// Output: B: 6
source.Next(6)

// If you use add, there is no way to unsubscribe from the event.
obs |> Observable.add(fun x -> printfn "C: %d" x)

// Now add a subscriber that only does positive numbers and transforms
// the numbers into another type, here a string.
let unsubD =
    obs |> Observable.choose (fun int1 ->
             if int1 >= 0 then None else Some(int1.ToString()))
        |> Observable.subscribe(fun string1 -> printfn "D: %s" string1)

let unsubE =
    obs |> Observable.filter (fun int1 -> int1 >= 0)
        |> Observable.subscribe(fun int1 -> printfn "E: %d" int1)

let unsubF =
    obs |> Observable.map (fun int1 -> int1.ToString())
        |> Observable.subscribe (fun string1 -> printfn "F: %s" string1)


Платформы

Windows 7, Windows Vista с пакетом обновления 2 (SP2), Windows XP с пакетом обновления 3 (SP3), Windows XP x64 с пакетом обновления 2 (SP2), Windows Server 2008 R2, Windows Server 2008 с пакетом обновления 2 (SP2), Windows Server 2003 с пакетом обновления 2 (SP2)

Сведения о версии

Среда выполнения F#

Поддерживается в версиях 2.0, 4.0

Silverlight

Поддерживается в версии 3

См. также

Ссылки

Пространство имен Microsoft.FSharp.Control (F#)

Журнал изменений

Дата

Журнал

Причина

Октябрь 2010

Добавлен пример кода.

Улучшение информации.