MailboxProcessor.Scan<'Msg,'T> メソッド (F#)
更新 : 2011 年 1 月
指定された関数が Some 値を返すまで、到着順にメッセージを検索して、メッセージをスキャンします。 他のメッセージはキューに残ります。
名前空間/モジュール パス: Microsoft.FSharp.Control
アセンブリ: FSharp.Core (FSharp.Core.dll 内)
// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>
// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)
型: 'Msg -> Async<'T> optionメッセージをスキップする場合は None、メッセージを処理してキューから削除する場合は Some を返す関数。
型: intオプションのタイムアウト (ミリ秒単位)。 既定値は Infinite() に対応する -1 です。
例外 |
状態 |
タイムアウト値を超えたときにスローされます。 |
既読メッセージ以外から scanner が構築した非同期計算 (Async オブジェクト)。
このメソッドは、エージェントの本体の内部で使用されます。 同時にアクティブにできるリーダーはエージェントごとに最大 1 つであるため、Receive、TryReceive、Scan、または TryScan の呼び出しは一度に 1 つしかアクティブにできません。 scanner 関数の本体は実行中にロックされますが、ロックは非同期ワークフローの実行前に解放されます。
Scan メソッドを使用する方法の例を次に示します。 このコードでは、メールボックス プロセッサ エージェントが、実行して結果を計算する一連のシミュレートされたジョブを管理しています。
open System
let numProcs = Environment.ProcessorCount
type Job<'Result> = int * Async<'Result>
// Request to run a job, or
// Completed notification (with proc id and jobId
type RequestMessage<'Result> =
| Request of Job<'Result>
| Completed of int * int
// Contains the id of the proc and the job
type RunMessage<'Result> = int * Job<'Result>
let random = System.Random()
// The program computes the Nth prime numbers for various
// values of N.
// This number determines how large values of N are.
let multiplier = 5000
// Generates mock jobs using Async.Sleep.
let createJob(id:int, computation, input:int) =
let job = async {
let result = computation(input)
return result
id, job
let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs
let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->
// First try to identify an idle proc by calling tryFindIndex.
// If there is an idle proc, scan for a request and run it.
// If there is not an idle proc, scan for an idle notification.
// No timeout given, so scan may wait indefinitely either to receive
// a new request, or for a proc to signal that it's idle. Meanwhile,
// messages build up in the queue.
// An array indicating whether each proc is idle.
let idleStatus = Array.create numProcs true
let rec loop (count) =
async {
let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus
match idleId with
| Some id ->
do! inbox.Scan(function | Request((jobId, _) as job) ->
Some(async {
idleStatus.[id] <- false
printfn "Job #%d submitted." jobId
execAgents.[id].Post(id, job) })
| Completed _ -> None)
| None ->
do! inbox.Scan(function | Request _ -> None
| Completed (id, jobId) ->
Some(async { idleStatus.[id] <- true }))
do! loop (count + 1)
loop 0)
for procId in 0 .. numProcs - 1 do
execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->
let rec loop (count) =
async {
let! procId, (jobId, job) = inbox.Receive()
// Start the job
// Post to the controller inbox when complete.
// The exception and cancellation continuations are not used.
printfn "Job #%d started on procId %d." jobId procId
Async.Start(async {
let! result = job
printfn "Job #%d completed." jobId
printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())
controllerAgent.Post(Completed(procId, jobId))
do! loop (count + 1)
loop 0)
let numJobs = 10
printfn "Number Of Logical Processors: %d" numProcs
let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }
let nthPrime n = Seq.initInfinite (fun n -> n)
|> Seq.filter (fun n -> isprime n)
|> Seq.nth (n - 1)
let rec loop (count) =
let jobId = (numJobs - count)
let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )
printfn "Requesting job #%d" jobId
// Delay
match count with
| 0 -> ()
| _ -> loop (count - 1)
loop (numJobs - 1)
printfn "Done submitting jobs. Press Enter to exit when ready."
Console.ReadLine() |> ignore
以下にサンプル セッションを示します。
