Sdílet prostřednictvím


Control.MailboxProcessor<'Msg> – třída (F#)

Agent zpracování zprávy, která zpracuje asynchronní výpočtu.

Cesta k oboru názvů nebo modul: Microsoft.FSharp.Control

Sestavení: FSharp.Core (v FSharp.Core.dll)

[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
 class
  interface IDisposable
  new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.Post : 'Msg -> unit
  member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  member this.Receive : ?int -> Async<'Msg>
  member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
  member this.Start : unit -> unit
  static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  member this.TryReceive : ?int -> Async<'Msg option>
  member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
  member this.add_Error : Handler<Exception> -> unit
  member this.CurrentQueueLength :  int
  member this.DefaultTimeout :  int with get, set
  member this.Error :  IEvent<Exception>
  member this.remove_Error : Handler<Exception> -> unit
 end

Poznámky

Agent zapouzdří frontou zpráv podporující více autory a jeden Čtenář agenta.Autoři agenta odesílat zprávy pomocí metody Post a jeho varianty.Agent může čekat na zprávy pomocí metody Receive nebo TryReceive nebo procházet všechny dostupné zprávy pomocí metody skenování nebo TryScan.

Tento typ s názvem FSharpMailboxProcessor v sestavení .net.Pokud typ přístupu z .net jazyka než F# nebo prostřednictvím reflexe se tento název použijte.

Konstruktory

Člen

Description

Nový

Vytvoří zástupce.body Funkce slouží ke generování asynchronní výpočtu provedeny Agent.Tato funkce není spuštěn, dokud Start se nazývá.

Členové instance

Člen

Description

add_Error

Vyvolá se při spuštění agenta výsledkem výjimka.

CurrentQueueLength

Vrátí počet nezpracovaných zpráv ve frontě zpráv Agent.

Hodnota DefaultTimeout

Časový limit výjimku zvýší, pokud není v tomto množství času přijatých zpráv.Ve výchozím nastavení se používá bez časového limitu.

Chyba

Vyvolá se při spuštění agenta výsledkem výjimka.

POST

Zprávy do fronty zpráv MailboxProcessor, účtuje asynchronně.

PostAndAsyncReply

Odešle zprávu do agenta a očekávat odpověď na kanálu asynchronně.

PostAndReply

Odešle zprávu do agenta a očekávat odpověď na kanálu synchronně.

PostAndTryAsyncReply

Jako AsyncPostAndReply, ale nevrátí žádné Pokud odpověď v časovém limitu.

Příjem

Čeká na zprávu.Tato první zpráva v pořadí doručení spotřebuje.

remove_Error

Vyvolá se při spuštění agenta výsledkem výjimka.

Skenování

Vyhledává zprávy pohledem prostřednictvím zpráv v pořadí příchodu do scanner vrátí hodnotu některé.Ostatní zprávy zůstávají ve frontě.

Start

Spuštění agenta.

TryPostAndReply

Jako PostAndReply, ale nevrátí žádné Pokud odpověď v časovém limitu.

TryReceive

Čeká na zprávu.Tato první zpráva v pořadí doručení spotřebuje.

TryScan

Vyhledává zprávy pohledem prostřednictvím zpráv v pořadí příchodu do scanner vrátí hodnotu některé.Ostatní zprávy zůstávají ve frontě.

Statické členy

Člen

Description

Start

Vytvoří a spustí agent.body Funkce slouží ke generování asynchronní výpočtu provedeny Agent.

Příklad

Následující příklad ukazuje základní použití MailboxProcessor třídy.

open System
open Microsoft.FSharp.Control

type Message(id, contents) =
    static let mutable count = 0
    member this.ID = id
    member this.Contents = contents
    static member CreateMessage(contents) =
        count <- count + 1
        Message(count, contents)

let mailbox = new MailboxProcessor<Message>(fun inbox ->
    let rec loop count =
        async { printfn "Message count = %d. Waiting for next message." count
                let! msg = inbox.Receive()
                printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
                return! loop( count + 1) }
    loop 0)

mailbox.Start()

mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))


Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Vzorový výstup

  
  
  
  
  
  
  
  
  

Následující příklad ukazuje použití MailboxProcessor k vytvoření jednoduchého agenta, který přijímá různé typy zpráv a vrátí odpovídající odpovědi.Tento agent serveru představuje trh maker, který je kupní a prodejní agent na burze, nastaví nabídka a požádejte ceny majetku.Klientům můžete dotaz pro ceny, nebo koupit a prodat akcie.

open System

type AssetCode = string

type Asset(code, bid, ask, initialQuantity) =
    let mutable quantity = initialQuantity
    member this.AssetCode = code
    member this.Bid = bid
    member this.Ask = ask
    member this.Quantity with get() = quantity and set(value) = quantity <- value


type OrderType =
    | Buy of AssetCode * int
    | Sell of AssetCode * int

type Message =
    | Query of AssetCode * AsyncReplyChannel<Reply>
    | Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
    | Failure of string
    | Info of Asset
    | Notify of OrderType

let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
                new Asset("BBB", 20.0, 20.10, 1000000);
                new Asset("CCC", 30.0, 30.15, 1000000) |]

let codeAssetMap = assets
                   |> Array.map (fun asset -> (asset.AssetCode, asset))
                   |> Map.ofArray

let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0

let marketMaker = new MailboxProcessor<Message>(fun inbox ->
    let rec Loop() =
        async {
            let! message = inbox.Receive()
            match message with
            | Query(assetCode, replyChannel) ->
                match (Map.tryFind assetCode codeAssetMap) with
                | Some asset ->
                    printfn "Replying with Info for %s" (asset.AssetCode)
                    replyChannel.Reply(Info(asset))
                | None -> replyChannel.Reply(Failure("Asset code not found."))
            | Order(order, replyChannel) ->
                match order with
                | Buy(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (quantity < asset.Quantity) then
                            asset.Quantity <- asset.Quantity - quantity
                            totalCash <- totalCash + float quantity * asset.Ask
                            printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
                                    quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient shares to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
                | Sell(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
                            asset.Quantity <- asset.Quantity + quantity
                            totalCash <- totalCash - float quantity * asset.Bid
                            printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
                                    quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient cash to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient cash to cover order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
            do! Loop()
        }
    Loop())

marketMaker.Start()

// Query price. 
let reply1 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for AAA"
    Query("AAA", replyChannel))

// Test Buy Order. 
let reply2 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for BBB"
    Order(Buy("BBB", 100), replyChannel))

// Test Sell Order. 
let reply3 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for CCC"
    Order(Sell("CCC", 100), replyChannel))

// Test incorrect code. 
let reply4 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for WrongCode"
    Order(Buy("WrongCode", 100), replyChannel))

// Test too large a number of shares. 

let reply5 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with large number of shares of AAA."
    Order(Buy("AAA", 1000000000), replyChannel))

// Too large an amount of money for one transaction. 

let reply6 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with too large of a monetary amount."
    Order(Sell("AAA", 100000000), replyChannel))

let random = new Random()
let nextTransaction() =
    let buyOrSell = random.Next(2)
    let asset = assets.[random.Next(3)]
    let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
    match buyOrSell with
    | n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
    | _ -> Sell(asset.AssetCode, quantity)

let simulateOne() =
   async {
       let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
           let transaction = nextTransaction()
           match transaction with
           | Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
           | Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
           Order(transaction, replyChannel))
       printfn "%s" (reply.ToString())
    }

let simulate =
    async {
        while (true) do 
            do! simulateOne()
            // Insert a delay so that you can see the results more easily. 
            do! Async.Sleep(1000)
    }

Async.Start(simulate)

Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Vzorový výstup

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  

Platformy

Windows 8, Windows 7, Windows Server 2012 Windows Server 2008 R2

Informace o verzi

F# základní verze knihovny

Podporovány: 2.0, 4.0, přenosné

Viz také

Referenční dokumentace

Microsoft.FSharp.Control – obor názvů (F#)