Async and Parallel Design Patterns in F#: Agents
In part 3 of this series, we explore lightweight, reactive agents in F# and look at some typical design patterns associated with these agents, including isolated internal state.
- Part 1 describes how F# is a parallel and reactive language with support for light-weight reactions, and gives patterns for parallel CPU asyncs and parallel I/O asyncs.
- Part 2 described a pattern for reporting partial results by raising events
Pattern #4 – Your First Agent
Let’s take a look at your first asynchronous agent.
type Agent<'T> = MailboxProcessor<'T>
let agent =
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
printfn "got message '%s'" msg } )
The agent performs repeated asynchronous waits for messages, and prints each message it receives. In this case, each message is a string, and the type of “agent” is:
agent : Agent<string>
We can send messages to the agent as follows:
agent.Post "hello!"
which duly prints:
got message 'hello!'
and multiple messages as follows:
for i in 1 .. 10000 do
agent.Post (sprintf "message %d" i)
which duly prints 10000 responses.
You can think of an agent as hiding a message queue (or channel), plus a reaction to be run when a message arrives. An agent often has an asynchronous loop that asynchronously waits for messages and then processes them. In the example above, the code executed by the agent is the “while” loop.
Agents will already be familiar to many readers. Erlang, is, of course, built around agents (there called processes). More recently, an experimental .NET incubation language called Axum has highlighted the importance of agent-based programming. Axum has been influential on the design of agents in F#, and vice-versa. Other languages with lightweight threading also emphasize agent-based decomposition and design.
The examples above begin with a type abbreviation to map the name “Agent” to the F# library implementation of in-memory agents called “MailboxProcessor”. You can also use the longer name, if you wish, but I prefer to use the shorter name.
Your First 100,000 Agents
Agents are lightweight, because they are based on F# asynchronous programming. For example, you can define hundreds of thousands of agents, or even more, within one .NET process. For example, let’s define 100,000 simple agents:
let agents =
[ for i in 0 .. 100000 ->
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
if i % 10000 = 0 then
printfn "agent %d got message '%s'" i msg } ) ]
You can send a message to each agent as follows:
for agent in agents do
agent.Post "ping!"
Every 10,000th agent reports when its message has been processed. The collection of agents processes these messages quickly, in a few seconds. Agents and in-memory message processing is very fast.
Obviously, agents do not map directly to .NET threads – you can’t create 100,000 threads in a single application (even 1,000 is often too many on a 32-bit O/S). Instead, when an agent is waiting to receive a message it is represented as a simple callback, encompassing a few object allocations and the closure of references held by the agent. After a message is received, the work of the agent is scheduled and executed through a pool of threads (by default, the .NET thread pool).
While it is rare to require 100,000 agents in a program, there are compelling examples where 2,000+ are routinely required. We’ll look at some of these below.
Requests in Scalable Web Servers
The notion of an asynchronous agent is in fact a design pattern that recurs in many different settings in F# coding. In F#, we often use the word “agent” for any asynchronous computation that is in-flight, though particularly ones with loops, or which consume messages, or which generate results.
For example, in future posts we will look at building scalable TCP and HTTP server applications in F#, deploying them to the cloud using EC2 and Windows Azure. One example we’ll use is a quote server, which accepts incoming TCP or HTTP connections and returns a stream of quotes to each client. Each client receives one quote every second. The service is ultimately published as a single URL or REST API.
In that implementation, each accepted client request sparks an asynchronous agent, whose definition is as roughly follows (in this case, we repeatedly write the same price for the AAPL stock, for illustrative purposes):
open System.Net.Sockets
/// serve up a stream of quotes
let serveQuoteStream (client: TcpClient) = async {
let stream = client.GetStream()
while true do
do! stream.AsyncWrite( "AAPL 200.38"B )
do! Async.Sleep 1000.0 // sleep one second
}
Each agent runs until the client disconnects. Because agents are lightweight, the quote service can support many thousands of simultaneous clients within a single machine (and can scale further by using cloud hosting). The actual number of agents in flight depends on the number of clients being served.
The above example gives a hint at how sweet asynchronous network protocol programming can be with F# - network protocols become asynchronous agents reading and writing data to streams. We’ll be looking at many more examples of scalable TCP/HTTP programming in F# in later posts.
Agents and Isolated State (the imperative way)
One of the keys to successful agent programming in F# is isolation. Isolation means that resources exist which “belong” to a specific agent, and are not accessed except by that agent. This means isolated state is protected from concurrent access and data races.
The constructs of F# make it easy to use lexical scoping to achieve isolation within asynchronous agents. For example, the following code uses a dictionary to accumulate a count of the different messages sent to an agent. The internal dictionary is lexically private to the asynchronous agent, and no capability to read or write to the dictionary is made available “outside” of the agent. This means the mutable state in the dictionary is isolated. Non-concurrent, safe access is guaranteed.
type Agent<'T> = MailboxProcessor<'T>
open System.Collections.Generic
let agent =
Agent.Start(fun inbox ->
async { let strings = Dictionary<string,int>()
while true do
let! msg = inbox.Receive()
if strings.ContainsKey msg then
strings.[msg] <- strings.[msg] + 1
else
strings.[msg] <- 0
printfn "message '%s' now seen '%d' times" msg strings.[msg] } )
State isolation is a fundamental technique in F# and is not just restricted to agents. For example, the following code represents an isolated use of both a stream reader and a ResizeArray (the preferred F# name for System.Collections.Generics.List, and note that an equivalent of this function exists in the .NET libraries as System.IO.File.ReadAllLines)
let readAllLines (file:string) =
use reader = new System.IO.StreamReader(file)
let lines = ResizeArray<_>()
while not reader.EndOfStream do
lines.Add (reader.ReadLine())
lines.ToArray()
Here, the reader and ResizeArrray are never used outside the function. In the case of agents and other long-lived computations, isolation is not just temporal – the state is permanently separated and isolated even though other processing is continuing in the program.
While isolation is ultimately a dynamic property, it is often enforced lexically. For example, consider the lazy, on-demand version of reading all lines uses an isolated reader:
let readAllLines (file:string) =
seq { use reader = new System.IO.StreamReader(file)
while not reader.EndOfStream do
yield reader.ReadLine() }
Isolated state often includes mutable values. For these, reference cells are used in F#. For example, the code below keeps a count of all messages by using a reference cell.
let agent =
Agent.Start(fun inbox ->
async { let count = ref 0
while true do
let! msg = inbox.Receive()
incr count
printfn "now seen a total of '%d' messages" !count } )
Again, the mutable state is isolated, and safe, single-threaded access is guaranteed.
Agents with Loops and Isolated State (the functional way)
F# programmers know that there are two ways to write loops in F# - using the “imperative” while/for plus mutable accumulators (ref or mutable), or using the “functional” style of one or more recursive functions, passing accumulated state as parameters. For example, a loop to count the lines in a file can be written imperatively:
let countLines (file:string) =
use reader = new System.IO.StreamReader(file)
let count = ref 0
while not reader.EndOfStream do
reader.ReadLine() |> ignore
incr count
!count
or recursively:
let countLines (file:string) =
use reader = new System.IO.StreamReader(file)
let rec loop n =
if reader.EndOfStream then n
else
reader.ReadLine() |> ignore
loop (n+1)
loop 0
For localized uses, either approach is fine: the functional approach is a little more advanced, but greatly reduces the amount of explicit mutation in your code and is often more general. F# programmers are expected to be capable of reading either, and able to translate a “while” loop into an equivalent “let rec” (it makes a good interview question!)
Interestingly, this same rule applies to writing asynchronous loops: you can use either the imperative “while”, or the functional “let rec” to define your loops. For example, here is a version of an agent which counts its messages using a recursive asynchronous function:
let agent =
Agent.Start(fun inbox ->
let rec loop n = async {
let! msg = inbox.Receive()
printfn "now seen a total of %d messages" (n+1)
return! loop (n+1)
}
loop 0 )
Here’s how we post messages to this agent:
for i in 1 .. 10000 do
agent.Post (sprintf "message %d" i)
When we do this, we get the following output:
now seen a total of 0 messages
now seen a total of 1 messages
....
now seen a total of 10000 messages
To recap, the two common patterns used to define mailbox agents are the imperative:
let agent =
Agent.Start(fun inbox ->
async {
// isolated imperative state goes here
...
while <condition> do
// read messages and respond
...
})
And the recursive/functional:
let agent =
Agent.Start(fun inbox ->
let rec loop arg1 arg2 = async {
// receive and process messages here
...
return! loop newArg1 newArg2
}
loop initialArg1 initialArg2 )
Again, either approach is reasonable in F# - using recursive asynchronous functions is slightly more advanced, but more functional and more general.
Messages and Union Types
It is common to use a discriminated union for a message type. For example, in an agent-based version of the DirectX sample that I like showing, we use the following message type for the simulation engine:
type Message =
| PleaseTakeOneStep
| PleaseAddOneBall of Ball
The simulation engine is then this agent:
let simulationEngine =
Agent.Start(fun inbox ->
async { while true do
// Wait for a message
let! msg = inbox.Receive()
// Process a message
match msg with
| PleaseTakeOneStep -> state.Transform moveBalls
| PleaseAddOneBall ball -> state.AddObject ball })
Using strongly typed messages like this is a good idea in many circumstances. However, when interoperating with other messaging machinery, you shouldn’t be afraid to use heterogeneous types such as “obj” and “string” as your message type, and have the agent do the decoding with runtime typing and parsing.
Parameterizing and Abstracting Agents
Agents are just an F# coding design pattern. This means you can use all the usual techniques of F# code to parameterize, abstract and reuse fragments of agent definitions. For example, you might parameterize the serveQuoteStream function used above by the time between transmitted quotes:
open System.Net.Sockets
/// serve up a stream of quotes
let serveQuoteStream (client: TcpClient, periodMilliseconds: int) = async {
let stream = client.GetStream()
while true do
do! stream.AsyncWrite( "AAPL 439.2"B )
do! Async.Sleep periodMilliseconds
}
This means different requests in your quote server can have different periods.
Similarly, you might abstract whole classes of agents using function parameters:
let iteratingAgent job =
Agent.Start(fun inbox ->
async { while true do
let! msg = inbox.Receive()
do! job msg })
let foldingAgent job initialState =
Agent.Start(fun inbox ->
let rec loop state = async {
let! msg = inbox.Receive()
let! state = job state msg
return! loop state
}
loop initialState )
You can use the first as follows:
let agent1 = iteratingAgent (fun msg -> async { do printfn "got message '%s'" msg })
and the second:
let agent2 =
foldingAgent (fun state msg ->
async { if state % 1000 = 0 then printfn "count = '%d'" msg;
return state + 1 }) 0
Reporting Results from Agents
In future posts we will look at techniques for accessing partial results from executing agents, for example, using the PostAndAsyncReply method available on each MailboxProcessor agent. These techniques are important when creating networks of communicating agents.
However, often that is overkill, and instead we only need to report results to some supervising arena such as a GUI. As such, one simple way of reporting partial results from any agent is to use the design pattern discussed in part 2 of this series. An example is shown below that builds an agent which samples the message stream every 1000th message and routes the sampled events to the GUI or other hosting thread.
// Receive messages and raise an event on each 1000th message
type SamplingAgent() =
// The event that is raised
let sample = new Event<string>()
// Capture the synchronization context to allow us to raise events
// back on the GUI thread
let syncContext = SynchronizationContext.CaptureCurrent()
// The internal mailbox processor agent
let agent =
new MailboxProcessor<_>(fun inbox ->
async { let count = ref 0
while true do
let! msg = inbox.Receive()
incr count
if !count % 1000 = 0 then
syncContext.RaiseEvent sample msg })
/// Post a message to the agent
member x.Post msg = agent.Post msg
/// Start the agent
member x.Start () = agent.Start()
/// Raised every 1000'th message
member x.Sample = sample.Publish
[ Note: this uses the two extension methods SynchronizationContext (CaptureCurrent and RaiseEvent) described in part 2 of this series. ]
You can use this agent as follows:
let agent = SamplingAgent()
agent.Sample.Add (fun s -> printfn "sample: %s" s)
agent.Start()
for i = 0 to 10000 do
agent.Post (sprintf "message %d" i)
As expected, this reports a sampling of messages posted to the agent:
sample: message 999
sample: message 1999
sample: message 2999
sample: message 3999
sample: message 4999
sample: message 5999
sample: message 6999
sample: message 7999
sample: message 8999
sample: message 9999
Agents and Errors
We all know mistakes and exceptions happen. Good error detection, reporting and logging is essential in agent-based programming. Let’s look at how to detect and forward errors when using F# in-memory agents (mailbox processors).
Firstly, part of the magic of F# asynchronous workflows is that exceptions are caught and propagated automatically within async { ... } blocks, even across asynchronous waits and I/O operations. You can also use try/with, try/finally and use constructs within async { ... } blocks to catch exceptions and release resources. This means we only need deal with uncaught errors in agents.
When an uncaught error occurs in a mailbox processor agent, the Error event on the agent is raised. A common pattern is to forward all errors to a supervising process, for example:
type Agent<'T> = MailboxProcessor<'T>
let supervisor =
Agent<System.Exception>.Start(fun inbox ->
async { while true do
let! err = inbox.Receive()
printfn "an error occurred in an agent: %A" err })
let agent =
new Agent<int>(fun inbox ->
async { while true do
let! msg = inbox.Receive()
if msg % 1000 = 0 then
failwith "I don't like that cookie!" })
agent.Error.Add(fun error -> supervisor.Post error)
agent.Start()
It can be convenient to pipeline these configuration operations as follows:
let agent =
new Agent<int>(fun inbox ->
async { while true do
let! msg = inbox.Receive()
if msg % 1000 = 0 then
failwith "I don't like that cookie!" })
|> Agent.reportErrorsTo supervisor
|> Agent.start
Using a helper module:
module Agent =
let reportErrorsTo (supervisor: Agent<exn>) (agent: Agent<_>) =
agent.Error.Add(fun error -> supervisor.Post error); agent
let start (agent: Agent<_>) = agent.Start(); agent
Here is an example where we create 10000 agents, some of which report errors:
let supervisor =
Agent<int * System.Exception>.Start(fun inbox ->
async { while true do
let! (agentId, err) = inbox.Receive()
printfn "an error '%s' occurred in agent %d" err.Message agentId })
let agents =
[ for agentId in 0 .. 10000 ->
let agent =
new Agent<string>(fun inbox ->
async { while true do
let! msg = inbox.Receive()
if msg.Contains("agent 99") then
failwith "I don't like that cookie!" })
agent.Error.Add(fun error -> supervisor.Post (agentId,error))
agent.Start()
(agentId, agent) ]
When sending these messages:
for (agentId, agent) in agents do
agent.Post (sprintf "message to agent %d" agentId )
We see the following:
an error 'I don't like that cookie!' occurred in agent 99
an error 'I don't like that cookie!' occurred in agent 991
an error 'I don't like that cookie!' occurred in agent 992
an error 'I don't like that cookie!' occurred in agent 993
...
an error 'I don't like that cookie!' occurred in agent 9999
This section has just dealt with errors in F# in-memory MailboxProcessor agents. Other kinds of agents (for example, agents representing server-side requests) should also be designed and architected to handle or forward errors gracefully and restart appropriately.
Summary
Isolated agents are a programming pattern that recurs again and again across multiple programming domains, from device driver programming to user interfaces to distributed programming to scalable communication servers. Every time you write an object, thread or asynchronous worker to manage a long-running communication (e.g. sending data to your computer’s speakers, or reading data from the network, or reacting to a stream of incoming events), you are writing a kind of agent. Every time you write an ASP.NET web page handler, you are writing a form of agent (one whose state is reconstituted on each invocation). In all cases, it is normal that some or all of the state associated with the communication is isolated.
Isolated agents are a means to an end – e.g. implementing scalable programming algorithms, including scalable request servers and scalable distributed programming algorithms. Like all asynchronous and parallel design patterns, they should not be over used. However, they are an elegant, powerful and efficient technique when used wisely.
F# is unique amongst the managed languages that come with Visual Studio 2010 in its integrated support for both lightweight asynchronous computations and in-memory agents. In F#, async agents can be written in a compositional way, without requiring code to be written using callbacks and inverted control. There are some tradeoffs here – for example, in future articles we will be looking at how to publish your agents using the standard APM pattern of the .NET libraries. However, the benefits are high: control, scaling and structured CPU and I/O parallelism when you need it, while keeping the full performance of .NET native code for your synchronous, CPU-bound code.
Indeed, there are few other .NET or JVM-based languages that support lightweight reactive agents at all – in early .NET it was said to be “impossible” because of the costs of threads. In this context, F#’s integration of “async { ... }” in 2007 can be seen as somewhat of a breakthrough in applied language design – it allows lightweight, compositional async programming and reactive agents in the context of an industrially accepted and interoperable programming platform. Along with the Axum language prototype (which has been influential on F#), F# has proven that an asynchronous language feature is a feasible way to break through the logjam of “do we make threads lightweight or not” that currently bedevils industrial runtime system design.
F# async programming can be seen as an implementation of resumptions, and there are many precursors here, for example OCaml delimited continuations, Haskell embeddings of monadic concurrency and papers emphasising the importance of resumptions with regard to concurrency.
You can use F# asynchronous agents on .NET 2.0, 3.5, 4.0, on Linux/Mono/Mac and on Silverlight. Indeed, you can even use F# async programming when F# is translated to Javascript using the WebSharper platform. Enjoy!
Comments
Anonymous
February 20, 2010
I love your blog entries. I have some experience with message passing distributed programming using MPI but I am new to agents and F#. In MPI, each instance of the program becomes a sort of agent and you pass messages between the processes. How can you pass messages from one agent to another? Is it to misunderstand the language feature to ask such a question?Anonymous
March 07, 2010
Hi, thanks for this post. I have a question if you do not mind. Is that possible to use agent in this way: let agent = new Agent<int>(fun inbox -> async { Seq.initInfinite(fun i -> inbox.Receive()) |> Seq.iter((* Do something here*))) } ) Hi Mike, There are ways to do this, using an "AsyncSeq" type, different to a "Seq" type, but they aren't covered here, and need a bit of work on top of basic F# async programmingAnonymous
March 17, 2010
Can F# Isolated Agents Pattern be a pardigm for Intel Single Chip Computer (SCC)?Anonymous
March 19, 2010
Hi Don. Is Isolated agents the pardigm for NoC on SoC eXtreme manycore architectures?Anonymous
March 22, 2010
Thanks for the in-depth look at async and Agent. I'm really looking forward to your upcoming posts on creating HTTP and TCP server applications. Thanks again!!Anonymous
June 11, 2010
Thanks for an interesting article. Coming from NodeJS, I really appreciate the let! sugar. I have a background in C/Python/Java/Haskell. This is my first Microsoft language - I'm using Mono. Where can I find performance comparisons for F# as a server vs NodeJS/Haskell/C?Anonymous
August 16, 2010
Don, You noted you'd be posting a blog on scalable servers. Is that still in the works? I'm very interested to see what you are doing. Thanks, RyanAnonymous
January 19, 2011
Related to isolation in the functional example, if using something like a StreamReader, would you need to start the loop with a new StreamReader and pass it along with every call to the loop? Would you initialize the state outside the loop or within?Anonymous
March 07, 2012
I just tried this pattern out. It works like a dream. So much easier than writing the parallel processing infrastructure from the ground up.