Control.MailboxProcessor<'Msg>, classe (F#)
Agent de traitement des messages qui exécute un calcul asynchrone.
Espace de noms/Chemin du module : Microsoft.FSharp.Control
Assembly : FSharp.Core (in FSharp.Core.dll)
[<Sealed>]
[<AutoSerializable(false)>]
type MailboxProcessor<'Msg> =
class
interface IDisposable
new MailboxProcessor : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
member this.Post : 'Msg -> unit
member this.PostAndAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> Async<'Reply>
member this.PostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * int option -> 'Reply
member this.PostAndTryAsyncReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> Async<'Reply option>
member this.Receive : ?int -> Async<'Msg>
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
member this.Start : unit -> unit
static member Start : (MailboxProcessor<'Msg> -> Async<unit>) * ?CancellationToken -> MailboxProcessor<'Msg>
member this.TryPostAndReply : (AsyncReplyChannel<'Reply> -> 'Msg) * ?int -> 'Reply option
member this.TryReceive : ?int -> Async<'Msg option>
member this.TryScan : ('Msg -> Async<'T> option) * ?int -> Async<'T option>
member this.add_Error : Handler<Exception> -> unit
member this.CurrentQueueLength : int
member this.DefaultTimeout : int with get, set
member this.Error : IEvent<Exception>
member this.remove_Error : Handler<Exception> -> unit
end
L'agent encapsule une file d'attente de messages qui prend en charge plusieurs writers et un agent de lecteur unique.Les writers envoient des messages à l'agent à l'aide de la méthode Post et de ses variantes.L'agent peut attendre les messages à l'aide des méthodes Receive ou TryReceive ou analyser tous les messages disponibles à l'aide de la méthode Scan ou TryScan.
Ce type se nomme FSharpMailboxProcessor dans l'assembly .NET.Si vous accédez au type à partir d'un langage .NET autre que F#, ou par réflexion, utilisez ce nom.
Membre |
Description |
---|---|
Crée un agent.La fonction body est utilisée pour générer le calcul asynchrone exécuté par l'agent.Cette fonction n'est exécutée qu'une fois Start appelé. |
Membre |
Description |
---|---|
Se produit lorsque l'exécution de l'agent génère une exception. |
|
Retourne le nombre de messages non traités dans la file d'attente des messages de l'agent. |
|
Lève une exception de délai d'attente si aucun message n'est reçu dans cet intervalle.Par défaut, aucun délai d'attente n'est utilisé. |
|
Se produit lorsque l'exécution de l'agent génère une exception. |
|
Publie un message dans la file d'attente de messages de MailboxProcessor, de façon asynchrone. |
|
Publie un message à un agent et attend une réponse sur le canal, de façon asynchrone. |
|
Publie un message à un agent et attend une réponse sur le canal, de façon synchrone. |
|
Semblable à AsyncPostAndReply, mais retourne None en l'absence de réponse pendant le délai d'attente. |
|
Attend un message.Le premier message dans l'ordre d'arrivée sera utilisé. |
|
Se produit lorsque l'exécution de l'agent génère une exception. |
|
Recherche un message en consultant les messages dans l'ordre d'arrivée jusqu'à ce que le scanner retourne une valeur Some.Les autres messages demeurent dans la file d'attente. |
|
Démarre l'agent. |
|
Semblable à PostAndReply, mais retourne None en l'absence de réponse pendant le délai d'attente. |
|
Attend un message.Le premier message dans l'ordre d'arrivée sera utilisé. |
|
Recherche un message en consultant les messages dans l'ordre d'arrivée jusqu'à ce que le scanner retourne une valeur Some.Les autres messages demeurent dans la file d'attente. |
Membre |
Description |
---|---|
Crée et démarre un agent.La fonction body est utilisée pour générer le calcul asynchrone exécuté par l'agent. |
L'exemple suivant illustre l'utilisation de base de la classe MailboxProcessor.
open System
open Microsoft.FSharp.Control
type Message(id, contents) =
static let mutable count = 0
member this.ID = id
member this.Contents = contents
static member CreateMessage(contents) =
count <- count + 1
Message(count, contents)
let mailbox = new MailboxProcessor<Message>(fun inbox ->
let rec loop count =
async { printfn "Message count = %d. Waiting for next message." count
let! msg = inbox.Receive()
printfn "Message received. ID: %d Contents: %s" msg.ID msg.Contents
return! loop( count + 1) }
loop 0)
mailbox.Start()
mailbox.Post(Message.CreateMessage("ABC"))
mailbox.Post(Message.CreateMessage("XYZ"))
Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore
Résultat de l'exemple
L'exemple suivant montre comment utiliser MailboxProcessor pour créer un agent simple qui accepte différents types de messages et retourne les réponses appropriées.Cet agent serveur représente un opérateur en bourse, qui est un agent d'achat et de vente d'une bourse qui définit des enchères et demande les prix des ressources.Les clients peuvent demander des prix ou acheter et vendre des actions.
open System
type AssetCode = string
type Asset(code, bid, ask, initialQuantity) =
let mutable quantity = initialQuantity
member this.AssetCode = code
member this.Bid = bid
member this.Ask = ask
member this.Quantity with get() = quantity and set(value) = quantity <- value
type OrderType =
| Buy of AssetCode * int
| Sell of AssetCode * int
type Message =
| Query of AssetCode * AsyncReplyChannel<Reply>
| Order of OrderType * AsyncReplyChannel<Reply>
and Reply =
| Failure of string
| Info of Asset
| Notify of OrderType
let assets = [| new Asset("AAA", 10.0, 10.05, 1000000);
new Asset("BBB", 20.0, 20.10, 1000000);
new Asset("CCC", 30.0, 30.15, 1000000) |]
let codeAssetMap = assets
|> Array.map (fun asset -> (asset.AssetCode, asset))
|> Map.ofArray
let mutable totalCash = 00.00
let minCash = -1000000000.0
let maxTransaction = 1000000.0
let marketMaker = new MailboxProcessor<Message>(fun inbox ->
let rec Loop() =
async {
let! message = inbox.Receive()
match message with
| Query(assetCode, replyChannel) ->
match (Map.tryFind assetCode codeAssetMap) with
| Some asset ->
printfn "Replying with Info for %s" (asset.AssetCode)
replyChannel.Reply(Info(asset))
| None -> replyChannel.Reply(Failure("Asset code not found."))
| Order(order, replyChannel) ->
match order with
| Buy(assetCode, quantity) ->
match (Map.tryFind assetCode codeAssetMap) with
| Some asset ->
if (quantity < asset.Quantity) then
asset.Quantity <- asset.Quantity - quantity
totalCash <- totalCash + float quantity * asset.Ask
printfn "Replying with Notification:\nBought %d units of %s at price $%f. Total purchase $%f."
quantity asset.AssetCode asset.Ask (asset.Ask * float quantity)
printfn "Marketmaker balance: $%10.2f" totalCash
replyChannel.Reply(Notify(Buy(asset.AssetCode, quantity)))
else
printfn "Insufficient shares to fulfill order for %d units of %s."
quantity asset.AssetCode
replyChannel.Reply(Failure("Insufficient shares to fulfill order."))
| None -> replyChannel.Reply(Failure("Asset code not found."))
| Sell(assetCode, quantity) ->
match (Map.tryFind assetCode codeAssetMap) with
| Some asset ->
if (float quantity * asset.Bid <= maxTransaction && totalCash - float quantity * asset.Bid > minCash) then
asset.Quantity <- asset.Quantity + quantity
totalCash <- totalCash - float quantity * asset.Bid
printfn "Replying with Notification:\nSold %d units of %s at price $%f. Total sale $%f."
quantity asset.AssetCode asset.Bid (asset.Bid * float quantity)
printfn "Marketmaker balance: $%10.2f" totalCash
replyChannel.Reply(Notify(Sell(asset.AssetCode, quantity)))
else
printfn "Insufficient cash to fulfill order for %d units of %s."
quantity asset.AssetCode
replyChannel.Reply(Failure("Insufficient cash to cover order."))
| None -> replyChannel.Reply(Failure("Asset code not found."))
do! Loop()
}
Loop())
marketMaker.Start()
// Query price.
let reply1 = marketMaker.PostAndReply(fun replyChannel ->
printfn "Posting message for AAA"
Query("AAA", replyChannel))
// Test Buy Order.
let reply2 = marketMaker.PostAndReply(fun replyChannel ->
printfn "Posting message for BBB"
Order(Buy("BBB", 100), replyChannel))
// Test Sell Order.
let reply3 = marketMaker.PostAndReply(fun replyChannel ->
printfn "Posting message for CCC"
Order(Sell("CCC", 100), replyChannel))
// Test incorrect code.
let reply4 = marketMaker.PostAndReply(fun replyChannel ->
printfn "Posting message for WrongCode"
Order(Buy("WrongCode", 100), replyChannel))
// Test too large a number of shares.
let reply5 = marketMaker.PostAndReply(fun replyChannel ->
printfn "Posting message with large number of shares of AAA."
Order(Buy("AAA", 1000000000), replyChannel))
// Too large an amount of money for one transaction.
let reply6 = marketMaker.PostAndReply(fun replyChannel ->
printfn "Posting message with too large of a monetary amount."
Order(Sell("AAA", 100000000), replyChannel))
let random = new Random()
let nextTransaction() =
let buyOrSell = random.Next(2)
let asset = assets.[random.Next(3)]
let quantity = Array.init 3 (fun _ -> random.Next(1000)) |> Array.sum
match buyOrSell with
| n when n % 2 = 0 -> Buy(asset.AssetCode, quantity)
| _ -> Sell(asset.AssetCode, quantity)
let simulateOne() =
async {
let! reply = marketMaker.PostAndAsyncReply(fun replyChannel ->
let transaction = nextTransaction()
match transaction with
| Buy(assetCode, quantity) -> printfn "Posting BUY %s %d." assetCode quantity
| Sell(assetCode, quantity) -> printfn "Posting SELL %s %d." assetCode quantity
Order(transaction, replyChannel))
printfn "%s" (reply.ToString())
}
let simulate =
async {
while (true) do
do! simulateOne()
// Insert a delay so that you can see the results more easily.
do! Async.Sleep(1000)
}
Async.Start(simulate)
Console.WriteLine("Press any key...")
Console.ReadLine() |> ignore
Résultat de l'exemple
Windows 8, Windows 7, Windows Server 2012, Windows Server 2008 R2
Versions de bibliothèque principale F#
Prise en charge dans : 2,0, 4,0, portables