Поделиться через


MailboxProcessor.Scan<'Msg,'T> Method (F#)

Scans for a message by looking through messages in arrival order until a provided function returns a Some value. Other messages remain in the queue.

Namespace/Module Path: Microsoft.FSharp.Control

Assembly: FSharp.Core (in FSharp.Core.dll)

// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>

// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)

Parameters

  • scanner
    Type: 'Msg -> Async<'T> option

    A function that returns None if the message is to be skipped, or Some if the message is to be processed and removed from the queue.

  • timeout
    Type: int

    An optional timeout in milliseconds. Defaults to -1 which corresponds to Infinite().

Exceptions

Exception

Condition

TimeoutException

Thrown when the timeout is exceeded.

Return Value

An asynchronous computation (Async object) that scanner built off the read message.

Remarks

This method is for use within the body of the agent. For each agent, at most one concurrent reader may be active, so no more than one concurrent call to Receive, TryReceive, Scan or TryScan may be active. The body of the scanner function is locked during its execution, but the lock is released before the execution of the asynchronous workflow.

Example

The following example shows how to use the Scan method. In this code, mailbox processor agents manage a series of simulated jobs that run and compute a result.

open System

let numProcs = Environment.ProcessorCount

type Job<'Result> = int  * Async<'Result>

// Request to run a job, or
// Completed notification (with proc id and jobId
type RequestMessage<'Result> = 
   | Request of Job<'Result>
   | Completed of int * int

// Contains the id of the proc and the job
type RunMessage<'Result> = int * Job<'Result>

let random = System.Random()
// The program computes the Nth prime numbers for various
// values of N.
// This number determines how large values of N are.
let multiplier = 5000

// Generates mock jobs using Async.Sleep.
let createJob(id:int, computation, input:int) =
    let job = async {
        let result = computation(input)
        return result
        }
    id, job

let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs

let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->
    // First try to identify an idle proc by calling tryFindIndex.
    // If there is an idle proc, scan for a request and run it.
    // If there is not an idle proc, scan for an idle notification.
    // No timeout given, so scan may wait indefinitely either to receive
    // a new request, or for a proc to signal that it's idle.  Meanwhile,
    // messages build up in the queue.
    // An array indicating whether each proc is idle.
    let idleStatus = Array.create numProcs true

    let rec loop (count) =
        async {
            let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus
            match idleId with
            | Some id ->
                do! inbox.Scan(function | Request((jobId, _) as job) ->
                                            Some(async { 
                                                idleStatus.[id] <- false
                                                printfn "Job #%d submitted." jobId
                                                execAgents.[id].Post(id, job) })
                                        | Completed _ -> None)

            | None ->
                do! inbox.Scan(function | Request _ -> None
                                        | Completed (id, jobId) -> 
                                            Some(async { idleStatus.[id] <- true }))
            do! loop (count + 1)
        }
    loop 0)

for procId in 0 .. numProcs - 1 do
    execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->
        let rec loop (count) =
            async {
                let! procId, (jobId, job) = inbox.Receive()
                // Start the job
                // Post to the controller inbox when complete.
                // The exception and cancellation continuations are not used.
                printfn "Job #%d started on procId %d." jobId procId
                Async.Start(async {
                    let! result = job
                    printfn "Job #%d completed." jobId
                    printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())
                    controllerAgent.Post(Completed(procId, jobId))
                    })
                do! loop (count + 1)
                }
        loop 0)
    execAgents.[procId].Start()

controllerAgent.Start()

let numJobs = 10

printfn "Number Of Logical Processors: %d" numProcs

let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }

let nthPrime n = Seq.initInfinite (fun n -> n) 
               |> Seq.filter (fun n -> isprime n)
               |> Seq.nth (n - 1)

let rec loop (count) =
    let jobId = (numJobs - count)
    let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )
    printfn "Requesting job #%d" jobId
    controllerAgent.Post(Request(job))
    // Delay
    System.Threading.Thread.Sleep(1000);
    match count with
    | 0 -> ()
    | _ -> loop (count - 1)
loop (numJobs - 1)


printfn "Done submitting jobs. Press Enter to exit when ready."
Console.ReadLine() |> ignore

A sample session follows.

Number Of Logical Processors: 2
Requesting job #1
Job #1 submitted.
Job #1 started on procId 0.
Requesting job #2
Job #2 submitted.
Job #2 started on procId 1.
Requesting job #3
Requesting job #4
Requesting job #5
Requesting job #6
Requesting job #7
Requesting job #8
Requesting job #9
Requesting job #10
Job #1 completed.
Nth Prime for N = 5000 is 48611.
Job #3 submitted.
Job #3 started on procId 0.
Done submitting jobs. Press Enter to exit when ready.
Job #2 completed.
Nth Prime for N = 10000 is 104729.
Job #4 submitted.
Job #4 started on procId 1.
Job #3 completed.
Nth Prime for N = 15000 is 163841.
Job #5 submitted.
Job #5 started on procId 0.
Job #4 completed.
Nth Prime for N = 20000 is 224737.
Job #6 submitted.
Job #6 started on procId 1.
Job #5 completed.
Nth Prime for N = 25000 is 287117.






















Platforms

Windows 7, Windows Vista SP2, Windows XP SP3, Windows XP x64 SP2, Windows Server 2008 R2, Windows Server 2008 SP2, Windows Server 2003 SP2

Version Information

F# Runtime

Supported in: 2.0, 4.0

Silverlight

Supported in: 3

See Also

Reference

Control.MailboxProcessor<'Msg> Class (F#)

Microsoft.FSharp.Control Namespace (F#)

Change History

Date

History

Reason

January 2011

Added code example.

Information enhancement.

April 2011

Corrected information about timeout behavior.

Content bug fix.