Control.MailboxProcessor<'Msg> 类 (F#)

执行异步计算的消息处理代理。

命名空间/模块路径:Microsoft.FSharp.Control

程序集:FSharp.Core(在 FSharp.Core.dll 中)

[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
 class
  interface IDisposable
  new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.Post : 'Msg -> unit
  member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  member this.Receive : ?int -> Async<'Msg>
  member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
  member this.Start : unit -> unit
  static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  member this.TryReceive : ?int -> Async<'Msg option>
  member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
  member this.add_Error : Handler<Exception> -> unit
  member this.CurrentQueueLength :  int
  member this.DefaultTimeout :  int with get, set
  member this.Error :  IEvent<Exception>
  member this.remove_Error : Handler<Exception> -> unit
 end

备注

该代理将封装支持多个编写器和单个读取器代理的消息队列。 编写器通过使用 Post 方法及其变体向该代理发送消息。 该代理可能会使用 Receive 或 TryReceive 方法等待消息,也可能使用 Scan 或 TryScan 方法扫描所有可用的消息。

此类型在 .NET 程序集中名为 FSharpMailboxProcessor。 如果从 F# 以外的 .NET 语言中访问类型,或通过反射访问类型,请使用此名称。

构造函数

成员

说明

new

创建一个代理。 body 函数用于生成由代理执行的异步计算。 直到调用 Start 后才会执行此函数。

实例成员

成员

说明

add_Error

在代理的执行导致异常时发生。

CurrentQueueLength

返回代理的消息队列中未处理消息的数目。

DefaultTimeout

如果在此时间量内未收到消息,则引发超时异常。 默认情况下,不使用超时。

错误

在代理的执行导致异常时发生。

Post

以异步方式将消息发送到 MailboxProcessor 的消息队列。

PostAndAsyncReply

以异步方式将消息发送到代理并等待通道答复。

PostAndReply

以同步方式将消息发送到代理并在通道上等待答复。

PostAndTryAsyncReply

与 AsyncPostAndReply 类似,但如果在超时时间内没有答复,则返回 None。

Receive

等待消息。 这将按到达顺序使用第一条消息。

remove_Error

在代理的执行导致异常时发生。

Scan

通过按到达顺序浏览消息来扫描消息,直至 scanner 返回 Some 值为止。 其他消息保留在队列中。

Start

启动代理。

TryPostAndReply

与 PostAndReply 类似,但如果在超时时段内没有答复,则会返回 None。

TryReceive

等待消息。 这将按到达顺序使用第一条消息。

TryScan

通过按到达顺序浏览消息来扫描消息,直至 scanner 返回 Some 值为止。 其他消息保留在队列中。

静态成员

成员

说明

Start

创建并启动一个代理。 body 函数用于生成由代理执行的异步计算。

示例

下面的示例演示 MailboxProcessor 类的基本用法。

open System
open Microsoft.FSharp.Control

type Message(id, contents) =
    static let mutable count = 0
    member this.ID = id
    member this.Contents = contents
    static member CreateMessage(contents) =
        count <- count + 1
        Message(count, contents)

let mailbox = new MailboxProcessor<Message>(fun inbox ->
    let rec loop count =
        async { printfn "Message count = %d. Waiting for next message." count
                let! msg = inbox.Receive()
                printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
                return! loop( count + 1) }
    loop 0)

mailbox.Start()

mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))


Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

示例输出

  
  
  
  
  
  
  
  
  

以下示例演示如何使用 MailboxProcessor 创建一个接受各种类型邮件并返回适当答复的简单代理。 此服务器代理代表市场经纪人,即股票交易所的买卖代理,其职能时设定投标以及对资产进行询价。 客户端可以查询价格,或购买和出售共享。

open System

type AssetCode = string

type Asset(code, bid, ask, initialQuantity) =
    let mutable quantity = initialQuantity
    member this.AssetCode = code
    member this.Bid = bid
    member this.Ask = ask
    member this.Quantity with get() = quantity and set(value) = quantity <- value


type OrderType =
    | Buy of AssetCode * int
    | Sell of AssetCode * int

type Message =
    | Query of AssetCode * AsyncReplyChannel<Reply>
    | Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
    | Failure of string
    | Info of Asset
    | Notify of OrderType

let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
                new Asset("BBB", 20.0, 20.10, 1000000);
                new Asset("CCC", 30.0, 30.15, 1000000) |]

let codeAssetMap = assets
                   |> Array.map (fun asset -> (asset.AssetCode, asset))
                   |> Map.ofArray

let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0

let marketMaker = new MailboxProcessor<Message>(fun inbox ->
    let rec Loop() =
        async {
            let! message = inbox.Receive()
            match message with
            | Query(assetCode, replyChannel) ->
                match (Map.tryFind assetCode codeAssetMap) with
                | Some asset ->
                    printfn "Replying with Info for %s" (asset.AssetCode)
                    replyChannel.Reply(Info(asset))
                | None -> replyChannel.Reply(Failure("Asset code not found."))
            | Order(order, replyChannel) ->
                match order with
                | Buy(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (quantity < asset.Quantity) then
                            asset.Quantity <- asset.Quantity - quantity
                            totalCash <- totalCash + float quantity * asset.Ask
                            printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
                                    quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient shares to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
                | Sell(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
                            asset.Quantity <- asset.Quantity + quantity
                            totalCash <- totalCash - float quantity * asset.Bid
                            printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
                                    quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient cash to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient cash to cover order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
            do! Loop()
        }
    Loop())

marketMaker.Start()

// Query price.
let reply1 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for AAA"
    Query("AAA", replyChannel))

// Test Buy Order.
let reply2 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for BBB"
    Order(Buy("BBB", 100), replyChannel))

// Test Sell Order.
let reply3 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for CCC"
    Order(Sell("CCC", 100), replyChannel))

// Test incorrect code.
let reply4 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for WrongCode"
    Order(Buy("WrongCode", 100), replyChannel))

// Test too large a number of shares.

let reply5 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with large number of shares of AAA."
    Order(Buy("AAA", 1000000000), replyChannel))

// Too large an amount of money for one transaction.

let reply6 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with too large of a monetary amount."
    Order(Sell("AAA", 100000000), replyChannel))

let random = new Random()
let nextTransaction() =
    let buyOrSell = random.Next(2)
    let asset = assets.[random.Next(3)]
    let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
    match buyOrSell with
    | n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
    | _ -> Sell(asset.AssetCode, quantity)

let simulateOne() =
   async {
       let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
           let transaction = nextTransaction()
           match transaction with
           | Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
           | Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
           Order(transaction, replyChannel))
       printfn "%s" (reply.ToString())
    }

let simulate =
    async {
        while (true) do
            do! simulateOne()
            // Insert a delay so that you can see the results more easily.
            do! Async.Sleep(1000)
    }

Async.Start(simulate)

Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

示例输出

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  

平台

Windows 8,Windows 7,Windows server 2012中,Windows server 2008 R2

版本信息

F#核心库版本

支持:2.0,4.0,可移植

请参见

参考

Microsoft.FSharp.Control 命名空间 (F#)