Udostępnij za pośrednictwem


Control.MailboxProcessor<'Msg> — Klasa (F#)

Agent przetwarzania wiadomości, która wykonuje asynchronicznego obliczeń.

Ścieżka obszaru nazw/modułu: Microsoft.FSharp.Control

Zgromadzenie: FSharp.Core (w FSharp.Core.dll)

[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
 class
  interface IDisposable
  new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.Post : 'Msg -> unit
  member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
  member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
  member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
  member this.Receive : ?int -> Async<'Msg>
  member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
  member this.Start : unit -> unit
  static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
  member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
  member this.TryReceive : ?int -> Async<'Msg option>
  member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
  member this.add_Error : Handler<Exception> -> unit
  member this.CurrentQueueLength :  int
  member this.DefaultTimeout :  int with get, set
  member this.Error :  IEvent<Exception>
  member this.remove_Error : Handler<Exception> -> unit
 end

Uwagi

Agent hermetyzuje kolejki komunikatów, która obsługuje wielu pisarzy i agenta pojedynczego czytnika.Autorzy wysyłać wiadomości do agenta przy użyciu metody Post i jego odmiany.Agent może czekać na wiadomości przy użyciu metody Receive lub TryReceive lub przeskanuj wszystkie dostępne wiadomości przy użyciu metody skanowania lub TryScan.

Ten typ o nazwie FSharpMailboxProcessor w zestawie .net.Jeśli dostęp do typu języka .net niż F# lub przez odbicie, należy użyć tej nazwy.

Konstruktory

Członkowskie

Opis

Nowy

Tworzy agenta.body Funkcja jest używana do generowania asynchronicznego obliczenia wykonywane przez agenta.Ta funkcja nie jest wykonywana aż do Start jest wywoływana.

Wystąpienie członków

Członkowskie

Opis

add_Error

Występuje, gdy wykonanie agenta powoduje wygenerowanie wyjątku.

CurrentQueueLength

Zwraca liczbę nieprzetworzonych wiadomości w kolejce wiadomości agenta.

Wartość elementu DefaultTimeout

Jeśli wiadomości nie zostały odebrane w tym czasie, wzbudza wyjątek limitu czasu.Domyślnie używany jest brak limitu czasu.

Błąd

Występuje, gdy wykonanie agenta powoduje wygenerowanie wyjątku.

POST

Ogłoszenia wiadomości do kolejki wiadomości MailboxProcessor asynchronicznie.

PostAndAsyncReply

Ogłoszenia wiadomości agenta i poczekać na odpowiedź na kanale, asynchronicznie.

PostAndReply

Ogłoszenia wiadomości agenta i poczekać na odpowiedź na kanale, synchronicznie.

PostAndTryAsyncReply

Podobnie jak AsyncPostAndReply, ale nie zwraca Jeśli odpowiedzi przed upływem limitu czasu.

Odbierać

Czeka na komunikat.Zajmie to pierwszej wiadomości w kolejności przybycia.

remove_Error

Występuje, gdy wykonanie agenta powoduje wygenerowanie wyjątku.

Skanowanie

Skanowanie wiadomości, przeglądając wiadomości w kolejności przybycia do scanner zwraca pewną wartość.Inne wiadomości pozostają w kolejce.

Start

Uruchamia agenta.

TryPostAndReply

Podobnie jak PostAndReply, ale nie zwraca Jeśli odpowiedzi przed upływem limitu czasu.

TryReceive

Czeka na komunikat.Zajmie to pierwszej wiadomości w kolejności przybycia.

TryScan

Skanowanie wiadomości, przeglądając wiadomości w kolejności przybycia do scanner zwraca pewną wartość.Inne wiadomości pozostają w kolejce.

Elementy statyczne

Członkowskie

Opis

Start

Tworzy i uruchamia agenta.body Funkcja jest używana do generowania asynchronicznego obliczenia wykonywane przez agenta.

Przykład

Poniższy przykład ilustruje wykorzystanie podstawowych MailboxProcessor klasy.

open System
open Microsoft.FSharp.Control

type Message(id, contents) =
    static let mutable count = 0
    member this.ID = id
    member this.Contents = contents
    static member CreateMessage(contents) =
        count <- count + 1
        Message(count, contents)

let mailbox = new MailboxProcessor<Message>(fun inbox ->
    let rec loop count =
        async { printfn "Message count = %d. Waiting for next message." count
                let! msg = inbox.Receive()
                printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
                return! loop( count + 1) }
    loop 0)

mailbox.Start()

mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))


Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Przykładowe dane wyjściowe

  
  
  
  
  
  
  
  
  

Poniższy przykład pokazuje, jak używać MailboxProcessor do tworzenia prostych agenta, który akceptuje różnych typów wiadomości i zwraca odpowiednie odpowiedzi.Ten agent serwera reprezentuje animator rynku, który jest agent zakupów i sprzedaży, na giełdzie ustawia oferty i zażądać cen aktywów.Klienci mogą kwerendy dla cen, lub kupić i sprzedaży udziałów.

open System

type AssetCode = string

type Asset(code, bid, ask, initialQuantity) =
    let mutable quantity = initialQuantity
    member this.AssetCode = code
    member this.Bid = bid
    member this.Ask = ask
    member this.Quantity with get() = quantity and set(value) = quantity <- value


type OrderType =
    | Buy of AssetCode * int
    | Sell of AssetCode * int

type Message =
    | Query of AssetCode * AsyncReplyChannel<Reply>
    | Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
    | Failure of string
    | Info of Asset
    | Notify of OrderType

let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
                new Asset("BBB", 20.0, 20.10, 1000000);
                new Asset("CCC", 30.0, 30.15, 1000000) |]

let codeAssetMap = assets
                   |> Array.map (fun asset -> (asset.AssetCode, asset))
                   |> Map.ofArray

let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0

let marketMaker = new MailboxProcessor<Message>(fun inbox ->
    let rec Loop() =
        async {
            let! message = inbox.Receive()
            match message with
            | Query(assetCode, replyChannel) ->
                match (Map.tryFind assetCode codeAssetMap) with
                | Some asset ->
                    printfn "Replying with Info for %s" (asset.AssetCode)
                    replyChannel.Reply(Info(asset))
                | None -> replyChannel.Reply(Failure("Asset code not found."))
            | Order(order, replyChannel) ->
                match order with
                | Buy(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (quantity < asset.Quantity) then
                            asset.Quantity <- asset.Quantity - quantity
                            totalCash <- totalCash + float quantity * asset.Ask
                            printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
                                    quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient shares to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
                | Sell(assetCode, quantity) ->
                    match (Map.tryFind assetCode codeAssetMap) with
                    | Some asset ->
                        if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
                            asset.Quantity <- asset.Quantity + quantity
                            totalCash <- totalCash - float quantity * asset.Bid
                            printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
                                    quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
                            printfn "Marketmaker balance: $%10.2f" totalCash
                            replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
                        else
                            printfn "Insufficient cash to fulfill order for %d units of %s."
                                    quantity asset.AssetCode
                            replyChannel.Reply(Failure("Insufficient cash to cover order."))
                    | None -> replyChannel.Reply(Failure("Asset code not found."))
            do! Loop()
        }
    Loop())

marketMaker.Start()

// Query price. 
let reply1 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for AAA"
    Query("AAA", replyChannel))

// Test Buy Order. 
let reply2 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for BBB"
    Order(Buy("BBB", 100), replyChannel))

// Test Sell Order. 
let reply3 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for CCC"
    Order(Sell("CCC", 100), replyChannel))

// Test incorrect code. 
let reply4 = marketMaker.PostAndReply(fun replyChannel -> 
    printfn "Posting message for WrongCode"
    Order(Buy("WrongCode", 100), replyChannel))

// Test too large a number of shares. 

let reply5 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with large number of shares of AAA."
    Order(Buy("AAA", 1000000000), replyChannel))

// Too large an amount of money for one transaction. 

let reply6 = marketMaker.PostAndReply(fun replyChannel ->
    printfn "Posting message with too large of a monetary amount."
    Order(Sell("AAA", 100000000), replyChannel))

let random = new Random()
let nextTransaction() =
    let buyOrSell = random.Next(2)
    let asset = assets.[random.Next(3)]
    let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
    match buyOrSell with
    | n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
    | _ -> Sell(asset.AssetCode, quantity)

let simulateOne() =
   async {
       let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
           let transaction = nextTransaction()
           match transaction with
           | Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
           | Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
           Order(transaction, replyChannel))
       printfn "%s" (reply.ToString())
    }

let simulate =
    async {
        while (true) do 
            do! simulateOne()
            // Insert a delay so that you can see the results more easily. 
            do! Async.Sleep(1000)
    }

Async.Start(simulate)

Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore

Przykładowe dane wyjściowe

  
  
  
  
  
  
  
  
  
  
  
  
  
  
  
  

Platformy

Windows 8, Windows 7, Windows Server 2012 Windows Server 2008 R2

Informacje o wersji

F# Core wersji biblioteki

Obsługiwane: 2.0, 4.0, przenośne

Zobacz też

Informacje

Microsoft.FSharp.Control — Przestrzeń nazw (F#)