次の方法で共有


MailboxProcessor.Scan<'Msg,'T> メソッド (F#)

更新 : 2011 年 1 月

指定された関数が Some 値を返すまで、到着順にメッセージを検索して、メッセージをスキャンします。 他のメッセージはキューに残ります。

名前空間/モジュール パス: Microsoft.FSharp.Control

アセンブリ: FSharp.Core (FSharp.Core.dll 内)

// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>

// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)

パラメーター

  • scanner
    型: 'Msg -> Async<'T> option

    メッセージをスキップする場合は None、メッセージを処理してキューから削除する場合は Some を返す関数。

  • timeout
    型: int

    オプションのタイムアウト (ミリ秒単位)。 既定値は Infinite() に対応する -1 です。

例外

例外

状態

TimeoutException

タイムアウト値を超えたときにスローされます。

戻り値

既読メッセージ以外から scanner が構築した非同期計算 (Async オブジェクト)。

解説

このメソッドは、エージェントの本体の内部で使用されます。 同時にアクティブにできるリーダーはエージェントごとに最大 1 つであるため、ReceiveTryReceiveScan、または TryScan の呼び出しは一度に 1 つしかアクティブにできません。 scanner 関数の本体は実行中にロックされますが、ロックは非同期ワークフローの実行前に解放されます。

使用例

Scan メソッドを使用する方法の例を次に示します。 このコードでは、メールボックス プロセッサ エージェントが、実行して結果を計算する一連のシミュレートされたジョブを管理しています。

open System

let numProcs = Environment.ProcessorCount

type Job<'Result> = int  * Async<'Result>

// Request to run a job, or
// Completed notification (with proc id and jobId
type RequestMessage<'Result> = 
   | Request of Job<'Result>
   | Completed of int * int

// Contains the id of the proc and the job
type RunMessage<'Result> = int * Job<'Result>

let random = System.Random()
// The program computes the Nth prime numbers for various
// values of N.
// This number determines how large values of N are.
let multiplier = 5000

// Generates mock jobs using Async.Sleep.
let createJob(id:int, computation, input:int) =
    let job = async {
        let result = computation(input)
        return result
        }
    id, job

let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs

let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->
    // First try to identify an idle proc by calling tryFindIndex.
    // If there is an idle proc, scan for a request and run it.
    // If there is not an idle proc, scan for an idle notification.
    // No timeout given, so scan may wait indefinitely either to receive
    // a new request, or for a proc to signal that it's idle.  Meanwhile,
    // messages build up in the queue.
    // An array indicating whether each proc is idle.
    let idleStatus = Array.create numProcs true

    let rec loop (count) =
        async {
            let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus
            match idleId with
            | Some id ->
                do! inbox.Scan(function | Request((jobId, _) as job) ->
                                            Some(async { 
                                                idleStatus.[id] <- false
                                                printfn "Job #%d submitted." jobId
                                                execAgents.[id].Post(id, job) })
                                        | Completed _ -> None)

            | None ->
                do! inbox.Scan(function | Request _ -> None
                                        | Completed (id, jobId) -> 
                                            Some(async { idleStatus.[id] <- true }))
            do! loop (count + 1)
        }
    loop 0)

for procId in 0 .. numProcs - 1 do
    execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->
        let rec loop (count) =
            async {
                let! procId, (jobId, job) = inbox.Receive()
                // Start the job
                // Post to the controller inbox when complete.
                // The exception and cancellation continuations are not used.
                printfn "Job #%d started on procId %d." jobId procId
                Async.Start(async {
                    let! result = job
                    printfn "Job #%d completed." jobId
                    printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())
                    controllerAgent.Post(Completed(procId, jobId))
                    })
                do! loop (count + 1)
                }
        loop 0)
    execAgents.[procId].Start()

controllerAgent.Start()

let numJobs = 10

printfn "Number Of Logical Processors: %d" numProcs

let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }

let nthPrime n = Seq.initInfinite (fun n -> n) 
               |> Seq.filter (fun n -> isprime n)
               |> Seq.nth (n - 1)

let rec loop (count) =
    let jobId = (numJobs - count)
    let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )
    printfn "Requesting job #%d" jobId
    controllerAgent.Post(Request(job))
    // Delay
    System.Threading.Thread.Sleep(1000);
    match count with
    | 0 -> ()
    | _ -> loop (count - 1)
loop (numJobs - 1)


printfn "Done submitting jobs. Press Enter to exit when ready."
Console.ReadLine() |> ignore

以下にサンプル セッションを示します。

                                                  

プラットフォーム

Windows 7、Windows Vista SP2、Windows XP SP3、Windows XP x64 SP2、Windows Server 2008 R2、Windows Server 2008 SP2、Windows Server 2003 SP2

バージョン情報

F# ランタイム

サポート対象: 2.0、4.0

Silverlight

サポート: 3

参照

その他の技術情報

Control.MailboxProcessor<'Msg> クラス (F#)

Microsoft.FSharp.Control 名前空間 (F#)

履歴の変更

日付

履歴

理由

2011 年 1 月

コード例を追加。

情報の拡充

2011 年 4 月

タイムアウトの動作に関する情報を修正。

コンテンツ バグ修正