MailboxProcessor.Scan<'Msg,'T>-Methode (F#)

Sucht nach einer Meldung, indem die Meldungen in der Reihenfolge ihres Eintreffens geprüft werden, bis eine angegebene Funktion einen Some-Wert zurückgibt. Andere Meldungen verbleiben in der Warteschlange.

Namespace/Modulpfad: Microsoft.FSharp.Control

Assembly: FSharp.Core (in FSharp.Core.dll)

// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>

// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)


  • scanner
    Typ: 'Msg -> Async<'T> option

    Eine Funktion zur Rückgabe von None, wenn die Meldung übersprungen werden soll, oder Some, wenn die Meldung verarbeitet und aus der Warteschlange entfernt werden soll.

  • timeout
    Typ: int

    Ein optionales Timeout in Millisekunden. Wird standardmäßig auf -1 festgelegt, was Infinite() entspricht.





Wird ausgelöst, wenn das Timeout überschritten wird.


Eine asynchrone Berechnung (Async-Objekt), die vom scanner aus der gelesenen Meldung erstellt wurde.


Diese Methode ist für die Verwendung im Coderumpf des Agents vorgesehen. Für jeden Agent darf maximal ein Reader gleichzeitig aktiv sein. Es darf also nur maximal ein Aufruf von Receive, TryReceive, Scan oder TryScan aktiv sein. Der Text der scanner-Funktion wird gesperrt während seiner Ausführung, die Sperre wird jedoch vor der Ausführung des asynchronen Workflows aufgehoben.


Im folgenden Beispiel wird die Verwendung der Scan-Methode gezeigt. In diesem Code verwalten Postfachprozessoragenten eine Reihe von simulierten Aufträgen, die ein Ergebnis ausführen und berechnen.

open System

let numProcs = Environment.ProcessorCount

type Job<'Result> = int  * Async<'Result>

// Request to run a job, or
// Completed notification (with proc id and jobId
type RequestMessage<'Result> = 
   | Request of Job<'Result>
   | Completed of int * int

// Contains the id of the proc and the job
type RunMessage<'Result> = int * Job<'Result>

let random = System.Random()
// The program computes the Nth prime numbers for various
// values of N.
// This number determines how large values of N are.
let multiplier = 5000

// Generates mock jobs using Async.Sleep.
let createJob(id:int, computation, input:int) =
    let job = async {
        let result = computation(input)
        return result
    id, job

let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs

let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->
    // First try to identify an idle proc by calling tryFindIndex.
    // If there is an idle proc, scan for a request and run it.
    // If there is not an idle proc, scan for an idle notification.
    // No timeout given, so scan may wait indefinitely either to receive
    // a new request, or for a proc to signal that it's idle.  Meanwhile,
    // messages build up in the queue.
    // An array indicating whether each proc is idle.
    let idleStatus = Array.create numProcs true

    let rec loop (count) =
        async {
            let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus
            match idleId with
            | Some id ->
                do! inbox.Scan(function | Request((jobId, _) as job) ->
                                            Some(async { 
                                                idleStatus.[id] <- false
                                                printfn "Job #%d submitted." jobId
                                                execAgents.[id].Post(id, job) })
                                        | Completed _ -> None)

            | None ->
                do! inbox.Scan(function | Request _ -> None
                                        | Completed (id, jobId) -> 
                                            Some(async { idleStatus.[id] <- true }))
            do! loop (count + 1)
    loop 0)

for procId in 0 .. numProcs - 1 do
    execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->
        let rec loop (count) =
            async {
                let! procId, (jobId, job) = inbox.Receive()
                // Start the job
                // Post to the controller inbox when complete.
                // The exception and cancellation continuations are not used.
                printfn "Job #%d started on procId %d." jobId procId
                Async.Start(async {
                    let! result = job
                    printfn "Job #%d completed." jobId
                    printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())
                    controllerAgent.Post(Completed(procId, jobId))
                do! loop (count + 1)
        loop 0)


let numJobs = 10

printfn "Number Of Logical Processors: %d" numProcs

let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }

let nthPrime n = Seq.initInfinite (fun n -> n) 
               |> Seq.filter (fun n -> isprime n)
               |> Seq.nth (n - 1)

let rec loop (count) =
    let jobId = (numJobs - count)
    let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )
    printfn "Requesting job #%d" jobId
    // Delay
    match count with
    | 0 -> ()
    | _ -> loop (count - 1)
loop (numJobs - 1)

printfn "Done submitting jobs. Press Enter to exit when ready."
Console.ReadLine() |> ignore

Eine Beispielsitzung folgt.



