Using F# agents and cancellation tokens to manage background jobs
In a previous post, I created an F# agent that runs background computations. However, it was limited in that it allowed the user no control over the jobs. Really, I want a more flexible system that allows the user to cancel currently running jobs. To implement this, I used F# agents (the MailboxProcessor class) to start jobs and to process when jobs finish. I implemented cancellation by keeping a collection of running jobs and using cancellation tokens, a .NET Framework 4.0 feature.
I created a JobState discriminated union to represent whether a job is not started, running, canceled or completed.
// The state of the job.
type JobState =
| NotStarted
| Running
| Canceled
| Completed
I created a Job type that is a record. It includes a cancellation token, an asynchronous computation, and state information.
// a Job consists of a job ID, a computation, a cancellation token source,
// and a state.
type Job = {
id : int
comp : Async<Result>
token : CancellationToken
state : JobState
}
The function to create the jobs themselves is still creating mock jobs. The job is represented by an asynchronous workflow, which sleeps for a while. But instead of one call to Sleep, it calls Sleep in a loop for a short time. At each appearance of a let!, do!, or other ! keyword (in this case, each invocation of Async.Sleep), F# checks for cancellation automatically. You do not have to monitor the value of IsCancellationRequsted. In this case, the loop is terminated when cancellation occurs, and if you started the job with Async.StartWithContinuations, then the cancellation continuation is called.
// Generates mock jobs using Async.Sleep
let createJob(id:int, token:CancellationToken) =
let job = async {
// Let the time be a random number between 1 and 10000
// And the mock computed result is a floating point value
let time = random.Next(10000)
let result = 2.0 + random.NextDouble()
let count = ref 0
// Start child jobs
while (!count <= 100) do
do! Async.Sleep(time / 100)
count := !count + 1
return result
}
{ id = id; comp = job; token = token; state = JobState.NotStarted }
Cancellation is supported by the .NET Framework 4.0 and involved when you use cancellation tokens. There are a few classes involved when using cancellation tokens. One class, CancellationTokenSource, creates a CancellationToken, which is just an identification of what you want to cancel. Each of these background jobs will be represented by an asynchronous work item and those work items will have an associated CancellationToken. The cancellation token is passed in as an optional argument when you start the jobs in the call to StartWithContinuations.
Cancellation tokens are really useful when you have multiple jobs that depend on each other, and you want to cancel all related jobs at once, so this example adds the ability to create child jobs. The child jobs all share the same cancellation token as a parent job, so they are cancelled when the parent job is cancelled.
Initially, I introduced a global mutable array (called jobs) to store information, including the job state, which indicates whether the job is running, canceled, etc. But, a global mutable value needs to have synchronized access since there are multiple threads writing to it. You can use F# agents to synchronize access to global mutable state such as a shared collection. I decided to try that in my job collection. Instead of modifying the array directly, I created the type JobCollection, and then I created an agent that is internal to this class, and I wrote a public indexed property whose setter posts to that agent when I modify the array.
There are three different arrays in JobCollection: one for the jobs, one for the token sources, and one for the parentIds, which is only relevant for child jobs. They should all have the same length. I could have certainly decided to put some of this in the Job type itself, but in this implementation, it's the JobCollection' s responsibility to manage parent/child relationships and make sure that the tokens in each Job come from the right token source.
// The ID of the job whose status changed and the new status.
type JobStatusChangedMessage = int * JobState
type JobCollection(numJobs) =
let mutable tokenSources = Array.init numJobs (fun _ -> new CancellationTokenSource())
// The problem is that an array is mutable shared data, so all updates will be
// managed by a jobsStatus agent.
let mutable jobs = Array.init numJobs (fun id ->
createJob(id, tokenSources.[id].Token))
let mutable parentIds = Array.create numJobs None
let jobsStatusAgent = MailboxProcessor<JobStatusChangedMessage>.Start(fun inbox ->
let rec loop n =
async {
let! jobId, jobState = inbox.Receive()
jobs.[jobId] <- { jobs.[jobId] with state = jobState }
do! loop (n + 1)
}
loop (0))
member this.Item
with get(index) =
jobs.[index]
and set index (value : Job) =
jobsStatusAgent.Post(index, value.state)
member this.TokenSource with get(index) = tokenSources.[index]
member this.ParentId with get(index) = parentIds.[index]
member this.Length = jobs.Length
// Child jobs have the same CancellationToken as the parent job, so
// if you cancel the parent job, the child jobs are also cancelled.
// This function locks the collection during these modifications.
member this.AddChildJobs(numJobs, parentId) =
lock this (fun () ->
let firstNewId = jobs.Length
let newJobs = Array.init numJobs (fun id -> createJob(firstNewId + id, jobs.[parentId].token))
let newTokenSources = Array.create numJobs tokenSources.[parentId]
let newParentIds = Array.create numJobs (Some(parentId))
jobs <- Array.append jobs newJobs
tokenSources <- Array.append tokenSources newTokenSources
parentIds <- Array.append parentIds newParentIds)
You could also use a lock to enforce synchronized access, as I did in the JobCollection method AddChildJobs. This method locks the collection in order to make modifications to the internal arrays. In a more serious implementation, it might be good to compare the performance of managed synchronized access using a lock to that of using a MailboxProcessor.
I created two agents that manage the jobs. One of the agents is called runAgent and it starts the jobs by calling Async.StartWithContinuations. In the call to StartWithContinuations, which is highlighted in green in the code, I specify the code I want to run when the job completes. In this case, the code posts a message to another agent called completeAgent, as well as the code I want to run when the job is canceled. The second argument is the exception continuation, which is not used.
// This agent processes when jobs are completed.
let completeAgent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop n =
async {
let! (id, result) = inbox.Receive()
printAgent.Post <| sprintf "The result of job #%d is %f" id result
// Remove from the list of jobs.
jobs.[id] <- { jobs.[id] with state = JobState.Completed}
do! loop (n + 1)
}
loop (0))
// This agent starts each job in the order it is received.
let runAgent = MailboxProcessor<Job>.Start(fun inbox ->
let rec loop n =
async {
let! job = inbox.Receive()
let str = sprintf "Starting job #%d" job.id
match jobs.ParentId(job.id) with
| Some id -> printAgent.Post <| sprintf "%s with parentId #%d" str id
| None -> printAgent.Post str
// Add the new job information to the list of running jobs.
jobs.[job.id] <- { jobs.[job.id] with state = JobState.Running }
// Start the job.
Async.StartWithContinuations(job.comp,
(fun result -> completeAgent.Post(job.id, result)),
(fun _ -> ()),
(fun cancelException -> printAgent.Post <| sprintf "Canceled job #%d" job.id),
job.token)
do! loop (n + 1)
}
loop (0))
I decided to create an agent for printing to the console, also, to make sure that a full string is printed completely before another string is printed, instead of having the output interleaved.
let printAgent = MailboxProcessor<string>.Start(fun inbox ->
let rec loop n =
async {
let! str = inbox.Receive()
printfn "%s" str
do! loop (n + 1)
}
loop (0))
This means I can use the following syntax to print. Notice the use of the reverse pipe (<|) which means I don't have to use parentheses:
printAgent.Post <| sprintf "The result of job #%d is %f" id result
Here is the complete code:
module JobAgentsWithCancellation =
open System
open System.Threading
let random = System.Random()
// The state of the job.
type JobState =
| NotStarted
| Running
| Canceled
| Completed
type Result = double
// a Job consists of a job ID, a computation, a cancellation token source,
// and a state.
type Job = {
id : int
comp : Async<Result>
token : CancellationToken
state : JobState
}
type Message = int * Result
// Generates mock jobs using Async.Sleep
let createJob(id:int, token:CancellationToken) =
let job = async {
// Let the time be a random number between 1 and 10000
// And the mock computed result is a floating point value
let time = random.Next(10000)
let result = 2.0 + random.NextDouble()
let count = ref 0
// Start child jobs
while (!count <= 100) do
do! Async.Sleep(time / 100)
count := !count + 1
return result
}
{ id = id; comp = job; token = token; state = JobState.NotStarted }
// The ID of the job whose status changed and the new status.
type JobStatusChangedMessage = int * JobState
type JobCollection(numJobs) =
let mutable tokenSources = Array.init numJobs (fun _ -> new CancellationTokenSource())
// The problem is that an array is mutable shared data, so all updates will be
// managed by a jobsStatus agent.
let mutable jobs = Array.init numJobs (fun id ->
createJob(id, tokenSources.[id].Token))
let mutable parentIds = Array.create numJobs None
let jobsStatusAgent = MailboxProcessor<JobStatusChangedMessage>.Start(fun inbox ->
let rec loop n =
async {
let! jobId, jobState = inbox.Receive()
jobs.[jobId] <- { jobs.[jobId] with state = jobState }
do! loop (n + 1)
}
loop (0))
member this.Item
with get(index) =
jobs.[index]
and set index (value : Job) =
jobsStatusAgent.Post(index, value.state)
member this.TokenSource with get(index) = tokenSources.[index]
member this.ParentId with get(index) = parentIds.[index]
member this.Length = jobs.Length
// Child jobs have the same CancellationToken as the parent job, so
// if you cancel the parent job, the child jobs are also cancelled.
// This function locks the collection during these modifications.
member this.AddChildJobs(numJobs, parentId) =
lock this (fun () ->
let firstNewId = jobs.Length
let newJobs = Array.init numJobs (fun id -> createJob(firstNewId + id, jobs.[parentId].token))
let newTokenSources = Array.create numJobs tokenSources.[parentId]
let newParentIds = Array.create numJobs (Some(parentId))
jobs <- Array.append jobs newJobs
tokenSources <- Array.append tokenSources newTokenSources
parentIds <- Array.append parentIds newParentIds)
let numJobs = 10
let jobs = new JobCollection(numJobs)
jobs.AddChildJobs(4, 2)
let printAgent = MailboxProcessor<string>.Start(fun inbox ->
let rec loop n =
async {
let! str = inbox.Receive()
printfn "%s" str
do! loop (n + 1)
}
loop (0))
// This agent processes when jobs are completed.
let completeAgent = MailboxProcessor<Message>.Start(fun inbox ->
let rec loop n =
async {
let! (id, result) = inbox.Receive()
printAgent.Post <| sprintf "The result of job #%d is %f" id result
// Remove from the list of jobs.
jobs.[id] <- { jobs.[id] with state = JobState.Completed}
do! loop (n + 1)
}
loop (0))
// This agent starts each job in the order it is received.
let runAgent = MailboxProcessor<Job>.Start(fun inbox ->
let rec loop n =
async {
let! job = inbox.Receive()
let str = sprintf "Starting job #%d" job.id
match jobs.ParentId(job.id) with
| Some id -> printAgent.Post <| sprintf "%s with parentId #%d" str id
| None -> printAgent.Post str
// Add the new job information to the list of running jobs.
jobs.[job.id] <- { jobs.[job.id] with state = JobState.Running }
// Start the job.
Async.StartWithContinuations(job.comp,
(fun result -> completeAgent.Post(job.id, result)),
(fun _ -> ()),
(fun cancelException -> printAgent.Post <| sprintf "Canceled job #%d" job.id),
job.token)
do! loop (n + 1)
}
loop (0))
for id in 0 .. jobs.Length - 1 do
runAgent.Post(jobs.[id])
let cancelJob(cancelId) =
if (cancelId >= 0 && cancelId < numJobs && jobs.[cancelId].state = JobState.Running) then
jobs.[cancelId] <- { jobs.[cancelId] with state = JobState.Canceled }
jobs.TokenSource(cancelId).Cancel()
printAgent.Post <| sprintf "Cancelling job #%d" cancelId
else
printAgent.Post <| sprintf "Job #%d could not be canceled." cancelId
printAgent.Post <| "Specify a job by number to cancel it, and then press Enter."
let mutable finished = false
while not finished do
let input = System.Console.ReadLine()
let a = ref 0
if (Int32.TryParse(input, a) = true) then
cancelJob(!a)
else
printAgent.Post <| "Closing."
finished <- true
Here is an example session that shows that job #2 has four children, and that if you cancel job #2, all the child jobs are cancelled also.
Specify a job by number to cancel it, and then press Enter.
Starting job #0
Starting job #1
Starting job #2
Starting job #3
Starting job #4
Starting job #5
Starting job #6
Starting job #7
Starting job #8
Starting job #9
Starting job #10 with parentId #2
Starting job #11 with parentId #2
Starting job #12 with parentId #2
Starting job #13 with parentId #2
2
Cancelling job #2
Canceled job #13
Canceled job #2
Canceled job #11
Canceled job #10
Canceled job #12
The result of job #3 is 2.801492
The result of job #0 is 2.164704
The result of job #7 is 2.930089
The result of job #5 is 2.191637
The result of job #6 is 2.558175
The result of job #1 is 2.398534
The result of job #8 is 2.462594
The result of job #9 is 2.575942
The result of job #4 is 2.051622
In the next post, we'll look at utilizing multiple CPU cores by limiting the number of running jobs to be equal to the number of processors available.
Gordon
Comments
Anonymous
January 27, 2011
Isn't it true that F# async workflows handle CancellationTokens relatively automatically? I was under the impression you only needed to manually check the IsCancellationRequested property in C#. tomasp.net/.../async-csharp-differences.aspxAnonymous
January 27, 2011
@Joel - I was going to post the same question. From what I've understood, IsCancellationRequested will be automatically checked at every '!' in an async.Anonymous
January 27, 2011
Yes it is true, thanks Joel and Robert. The check for IsCancellationRequested is not necessary in F#. However, it's still necessary to break the Async.Sleep up into short calls, since it wants to finish its full sleep time before checking again for cancellation.Anonymous
January 27, 2011
I've updated the blog post so that the check for IsCancellationRequested is removed.