Partager via


MailboxProcessor.Scan<'Msg,'T>, méthode (F#)

Recherche un message en consultant les messages dans l'ordre d'arrivée jusqu'à ce qu'une fonction fournie retourne une valeur Some. Les autres messages demeurent dans la file d'attente.

Espace de noms/Chemin du module : Microsoft.FSharp.Control

Assembly : FSharp.Core (in FSharp.Core.dll)

// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>

// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)

Paramètres

  • scanner
    Type : 'Msg -> Async<'T> option

    Fonction qui retourne None si le message doit être ignoré ou Some si le message doit être traité et supprimé de la file d'attente.

  • timeout
    Type : int

    Délai d'attente facultatif en millisecondes. -1 est la valeur par défaut, qui correspond à Infinite().

Exceptions

Exception

Condition

TimeoutException

Levée lorsque le délai d'attente est dépassé.

Valeur de retour

Un calcul asynchrone (objet Async) qui scanner a créé à partir du message lu.

Notes

Cette méthode doit être utilisée dans le corps de l'agent. Pour chaque agent, un lecteur simultané au maximum pouvant être actif, un seul appel simultané à Receive, TryReceive, Scan ou TryScan peut être actif. Le corps de la fonction scanner est verrouillé pendant son exécution, mais le verrou est libéré avant l'exécution du flux de travail asynchrone.

Exemple

L'exemple suivant présente l'utilisation de la méthode Scan. Dans ce code, les agents de processeur de boîte aux lettres gèrent une série de travaux simulés qui exécutent et calculent un résultat.

open System

let numProcs = Environment.ProcessorCount

type Job<'Result> = int  * Async<'Result>

// Request to run a job, or
// Completed notification (with proc id and jobId
type RequestMessage<'Result> = 
   | Request of Job<'Result>
   | Completed of int * int

// Contains the id of the proc and the job
type RunMessage<'Result> = int * Job<'Result>

let random = System.Random()
// The program computes the Nth prime numbers for various
// values of N.
// This number determines how large values of N are.
let multiplier = 5000

// Generates mock jobs using Async.Sleep.
let createJob(id:int, computation, input:int) =
    let job = async {
        let result = computation(input)
        return result
        }
    id, job

let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs

let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->
    // First try to identify an idle proc by calling tryFindIndex.
    // If there is an idle proc, scan for a request and run it.
    // If there is not an idle proc, scan for an idle notification.
    // No timeout given, so scan may wait indefinitely either to receive
    // a new request, or for a proc to signal that it's idle.  Meanwhile,
    // messages build up in the queue.
    // An array indicating whether each proc is idle.
    let idleStatus = Array.create numProcs true

    let rec loop (count) =
        async {
            let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus
            match idleId with
            | Some id ->
                do! inbox.Scan(function | Request((jobId, _) as job) ->
                                            Some(async { 
                                                idleStatus.[id] <- false
                                                printfn "Job #%d submitted." jobId
                                                execAgents.[id].Post(id, job) })
                                        | Completed _ -> None)

            | None ->
                do! inbox.Scan(function | Request _ -> None
                                        | Completed (id, jobId) -> 
                                            Some(async { idleStatus.[id] <- true }))
            do! loop (count + 1)
        }
    loop 0)

for procId in 0 .. numProcs - 1 do
    execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->
        let rec loop (count) =
            async {
                let! procId, (jobId, job) = inbox.Receive()
                // Start the job
                // Post to the controller inbox when complete.
                // The exception and cancellation continuations are not used.
                printfn "Job #%d started on procId %d." jobId procId
                Async.Start(async {
                    let! result = job
                    printfn "Job #%d completed." jobId
                    printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())
                    controllerAgent.Post(Completed(procId, jobId))
                    })
                do! loop (count + 1)
                }
        loop 0)
    execAgents.[procId].Start()

controllerAgent.Start()

let numJobs = 10

printfn "Number Of Logical Processors: %d" numProcs

let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }

let nthPrime n = Seq.initInfinite (fun n -> n) 
               |> Seq.filter (fun n -> isprime n)
               |> Seq.nth (n - 1)

let rec loop (count) =
    let jobId = (numJobs - count)
    let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )
    printfn "Requesting job #%d" jobId
    controllerAgent.Post(Request(job))
    // Delay
    System.Threading.Thread.Sleep(1000);
    match count with
    | 0 -> ()
    | _ -> loop (count - 1)
loop (numJobs - 1)


printfn "Done submitting jobs. Press Enter to exit when ready."
Console.ReadLine() |> ignore

Un exemple de session suit.

                                                  

Plateformes

Windows 7, Windows Vista SP2, Windows XP SP3, Windows XP x64 SP2, Windows Server 2008 R2, Windows Server 2008 SP2, Windows Server 2003 SP2

Informations de version

Runtime F#

Pris en charge dans : 2.0, 4.0

Silverlight

Prise en charge dans : 3

Voir aussi

Référence

Control.MailboxProcessor<'Msg>, classe (F#)

Microsoft.FSharp.Control, espace de noms (F#)

Historique des modifications

Date

Historique

Motif

Janvier 2011

Ajout d'un exemple de code

Améliorations apportées aux informations.

Avril 2011

A corrigé des informations sur le comportement du délai d'attente.

Résolution des bogues de contenu.