Поделиться через


Using MailboxProcessor.Scan for communication between F# agents

In a previous post, I presented a background job manager in F# using the MailboxProcessor.  In this post, I'll try to keep the number of background jobs equal to the number of processors on the machine.  The number of processors that is meaningful here is the number of CPUs in the machine times the number of cores in each CPU.  On my system, there are only two, but newer or more high-end machines have many cores, perhaps 4, 8, or 16.

With these advances in hardware, obviously you want to make as efficient use as possible of all the cores on the machine.  Let's imagine that you have a series of long computations to do, and you want to maximize the use of CPU.  One way to do this is to use F# agents, namely the MailboxProcessor class, to manage these computations as jobs.  An F# agent is something that processes messages in a queue. 

To manage these background jobs, I created an F# agent called controllerAgent that accepts requests for jobs to be run.  I then created an array of agents equal to the number of processors on the machine.  The .NET Framework provides the property Environment.ProcessorCount which conveniently gives you this.

In this example, the computation is one I used in a previous blog entry, computing the Nth prime number.  You will no doubt be able to come up with a more efficient algorithm for doing this (such as this one by Jon Harrop), but since I'm just trying to show how to run a computation and view its result, this will do fine. 

In this example, the array of agents that represent each processor is called  execAgents.  Initially, I created it as an array of nulls.  This is because its initialization code uses the controllerAgent.  What I have here are mutually self-referential entities.  In F#, definitions of a value must precede the use of the value.  By defining the execAgents array first, I can reference it in the initialization code for controllerAgent.

In previous blog posts, I used Receive to process each message.  In this example, I use ScanScan allows me to look through the queue for the appropriate type of message.  The standard use of Scan is with cooperative agents.  Agent A is processing messages, but needs some help from agent B, so it sends a message to agent B.  Agent B replies, and agent A uses Scan to find the reply.  In this example, the controllerAgent processes the messages and signals the execAgent to start a job, but if all the execAgents are busy, it waits for a message from an execAgent saying it has completed a job.

// Assume we have n processors on a machine and always want 1 job running per processor.

// A set of agents represent processors, which can be either busy or idle.

// The controller accepts requests and assigns the job to one of the processor queues.

// The main processor keeps track of whether each node is busy or idle. If one is idle,

// the job is assigned immediately. If there is no idle processor, the message box is scanned

// until a message is found indicating a job was completed.

 

open System

 

let numProcs = Environment.ProcessorCount

 

type Job<'Result> = int  * Async<'Result>

 

// Request to run a job, or

// completed notification (with proc ID and jobId)

type RequestMessage<'Result> =

   | Request of Job<'Result>

   | Completed of int * int

 

// Contains the ID of the processor and the job.

type RunMessage<'Result> = int * Job<'Result>

 

let random = System.Random()

// The program computes the Nth prime numbers for various

// values of N.

// This number determines how large values of N are.

let multiplier = 5000

 

// Generates jobs.

let createJob(id:int, computation, input:int) =

    let job = async {

        let result = computation(input)

        return result

        }

    id, job

 

let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs

 

let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->

        // First try to identify an idle proc by calling tryFindIndex.

        // If there is an idle processor, scan for a request and run it.

        // If there is not an idle processor, scan for an idle notification.

        // No timeout given, so scan may wait indefinitely either to receive

        // a new request, or for a processor to signal that it's idle. Meanwhile,

        // messages build up in the queue.

        // An array indicating whether each processor is idle.

        let idleStatus = Array.create numProcs true

 

        let rec loop (count) =

            async {

                let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus

                match idleId with

                | Some id ->

                    do! inbox.Scan(function | Request((jobId, _) as job) ->

                                                Some(async {

                                                    idleStatus.[id] <- false

                                                    printfn "Job #%d submitted." jobId

                                                    execAgents.[id].Post(id, job) })

                                            | Completed _ -> None)

 

                | None ->

                    do! inbox.Scan(function | Request _ -> None

                                            | Completed (id, jobId) ->

                                                Some(async { idleStatus.[id] <- true }))

                do! loop (count + 1)

            }

        loop 0)

 

for procId in 0 .. numProcs - 1 do

    execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->

        let rec loop (count) =

            async {

                let! procId, (jobId, job) = inbox.Receive()

                // Start the job

                // Post to the controller inbox when complete.

                printfn "Job #%d started on procId %d." jobId procId

                Async.Start(async {

                    let! result = job

                    printfn "Job #%d completed." jobId

                    printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())

                    controllerAgent.Post(Completed(procId, jobId))

                    })

                do! loop (count + 1)

                }

        loop 0)

    execAgents.[procId].Start()

 

controllerAgent.Start()

 

let numJobs = 10

 

printfn "Number Of Logical Processors: %d" numProcs

 

let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }

 

let nthPrime n = Seq.initInfinite (fun n -> n)

               |> Seq.filter (fun n -> isprime n)

               |> Seq.nth (n - 1)

 

let rec loop (count) =

    let jobId = (numJobs - count)

    let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )

    printfn "Requesting job #%d" jobId

    controllerAgent.Post(Request(job))

    // Delay

    System.Threading.Thread.Sleep(1000);

    match count with

    | 0 -> ()

    | _ -> loop (count - 1)

loop (numJobs - 1)

 

 

printfn "Done submitting jobs. Press Enter to exit when ready."

Console.ReadLine() |> ignore

 

 The output shows that the number of jobs running at any given time is limited to two.

Number Of Logical Processors: 2
Requesting job #1
Job #1 submitted.
Job #1 started on procId 0.
Job #1 completed.
Nth Prime for N = 5000 is 48611.
Requesting job #2
Job #2 submitted.
Job #2 started on procId 1.
Requesting job #3
Job #3 submitted.
Job #3 started on procId 0.
Job #2 completed.
Nth Prime for N = 10000 is 104729.
Requesting job #4
Job #4 submitted.
Job #4 started on procId 1.
Requesting job #5
Requesting job #6
Job #3 completed.
Nth Prime for N = 15000 is 163841.
Job #5 submitted.
Job #5 started on procId 0.
Requesting job #7
Requesting job #8
Requesting job #9
Requesting job #10
Job #4 completed.
Nth Prime for N = 20000 is 224737.
Job #6 submitted.
Job #6 started on procId 1.
Done submitting jobs. Press Enter to exit when ready.
Job #5 completed.
Nth Prime for N = 25000 is 287117.
Job #7 submitted.
Job #7 started on procId 0.
Job #6 completed.
Nth Prime for N = 30000 is 350377.
Job #8 submitted.
Job #8 started on procId 1.
Job #7 completed.
Nth Prime for N = 35000 is 414977.
Job #9 submitted.
Job #9 started on procId 0.
Job #8 completed.
Nth Prime for N = 40000 is 479909.
Job #10 submitted.
Job #10 started on procId 1.
Job #9 completed.
Nth Prime for N = 45000 is 545747.
Job #10 completed.
Nth Prime for N = 50000 is 611953.

This is the last of this series of posts on the F# MailboxProcessor.