Freigeben über


F# for Parallel and Asynchronous Programming - PDC 2009

image

Last November at PDC 2009 in Los Angeles I gave a talk on F# for Parallel and Asynchronous Programming. 

The talk begins by covering basic F# concepts, and then focuses on four challenging issues related to concurrency and the tools F# brings for addressing these - immutability, async workflows, and agents.  In trying to cover this ground in a 60 minute talk, there were naturally a lot of relevant topics I had to skip, and I also got a number of questions about things I'd glossed over after the talk.  So I thought I'd use this post to discuss some of the other features and topics I would have highlighted in this presentation if there'd been more time.
 

Note:   The topics covered below mostly assume you've either watched the session video or taken a look through the source code attached above.  If you haven't yet - check out the talk.

 

Some F# Parallel and Async Topics


PSeq

The talk briefly shows the use of a PSeq module to write code like the following:

 let rec sumOfSquares nums =     
    nums    
    |> PSeq.map sqr    
    |> PSeq.sum

As a few folks have noted, the F# core library does not actually provide this PSeq module.  In the demo code, I was just using my own little implementation of this, to show off the benefits of the more declarative programming style used here. 

The PSeq I used is just a very simple wrapper around the functionality provided in PLinq - for example, PSeq.map just calls ParallelEnumerable.Select.  We are expecting a future release of the F# PowerPack to include this wrapper library.  In the the meantime, you can check out Matt Podwysocki's blog post on using PLinq from F#, or Talbot Crowell's recent post with an updated Beta2 version.

 

Using Begin/End Methods with F# Async

In showing the ease of turning synchronous code which calls long-running I/O operations into asynchronous code, I showed adding the pieces highlighed in red below to the code below to make it async:

 let downloadImageAsync(blob : CloudBlob) =
  async { 
    let !  pixels = blob.AsyncDownloadByteArray()
    let fileName = "thumbs-" + blob.Uri.Segments.[blob.Uri.Segments.Length-1]
    use outStream =  File.OpenWrite(fileName)
    do !  outStream.AsyncWrite(pixels, 0, pixels.Length)
    return fileName   } 

I got a very good question after the talk though: "Where did the AsyncDownloadByteArray function come from?" 

In general, for any asynchronous API provided on the platform, it can be simply wrapped and exposed as a method returning Async<T> to be used within F# async workflows as above.  In the core F# libraries, we wrap many of the core async APIs in .NET Framework, providing extension methods on the relevant types.  This means that functions like Stream.AsyncWrite used above are available by default in F#.  However, CloudBlob.DownloadByteArray is defined in the libraries provided with the Windows Azure SDK, and so we need to create a wrapper.

In fact, the Azure SDK doesn't even provide a BeginDownloadByteArray/EndDownloadByteArray pair - but it does provide a more fundamental async operation BeginDownloadToStream/EndDownloadToStream:

 public IAsyncResult BeginDownloadToStream(Stream target, AsyncCallback callback, object state);
 public void EndDownloadToStream(IAsyncResult asyncResult);

Notably, it's likely that the reason for not including the higher level async operations is just due to the substantial complexity of authoring and maintaining these methods in C# today. Had the Azure team written their API in F#, it's likely they would have found it sufficiently easy to add these.

So, what does the wrapper look like?

 type CloudBlob with
     /// Asynchronously download blob contents as an array
    member blob.AsyncDownloadByteArray() = async {
        let ms = new MemoryStream(int blob.Properties.Length)
        do! Async.FromBeginEnd(
                    (fun (cb,s) -> blob.BeginDownloadToStream(ms,cb,s)),
                     blob.EndDownloadToStream)
        return ms.ToArray() }

Some notes on this implementation:

Async.FromBeginEnd

The key tool for wrapping many .NET async APIs is to use Async.FromBeginEnd.  This method takes two functions as arguments - the Begin/End pair of the method being wrapped - and returns a function of type Async<T> where T is the type returned from the End method. When directly wrapping a method, this is often trivial - member foo.AsyncBar() = Async.FromBeginEnd(foo.BeginBar, foo.EndBar).

Composing Async Operations

There are generally two ways to create Async<T> values - call a function which returns one, like Async.FromBeginEnd, Stream.AsyncWrite, or Async.Parallel - or write an async {.}  block to include any arbitrary F# code as part of the composition.  In the case above, we need to both call the underlying Begin/End pair, but also construct a stream to download into, and ultimately return the array of bytes left in the resulting stream.  This composition of asynchronous operations with some synchronous code is easy to do inside the async {.} block.

Other Async Interop Utilities

We saw details above of wrapping Begin/End pairs as Async<T> objects - there are a few other interesting interop scenarios which are nicely supported by the Async API in F#:

  • Async.AsBeginEnd - coverts an async operation into a set of Begin/End/Cancel functions which can be used to expose an implementation of the .NET Asynchronous Programming Model to other .NET consumers.
  • Async.AwaitEvent/AwaitWaitHandle/AwaitIAsyncResult/AwaitTask - there are many other async models used in .NET beyond the APM, and F# provides primitives for easily wrapping any of these.

 

Exceptions with Async

In the talk, I commented on how hard it is to do correct exception handling with typical asynchronous programming tools available today in C#/VB.  But I didn't actually show how much easier this is with F# async workflows.  Here's what it looks like to make our downloadImageAsync function robust to the many exceptions that could possibly be raised during it's execution:

 let downloadImageAsync(blob : CloudBlob) =  async {
    try
        let! pixels = blob.AsyncDownloadByteArray()
        let fileName = "thumbs-" + blob.Uri.Segments.[blob.Uri.Segments.Length-1]
        use outStream =  File.OpenWrite(fileName)
        do! outStream.AsyncWrite(pixels, 0, pixels.Length)
        return fileName
    with
    | e ->
        Log.Write(e.ToString())
        return ""  }

The key here is that exception handling is no different for async code than for synchronous code.  Under the hood, F# will ensure that exceptions occurring anywhere in the workflow are bubbled up through the non-blocking calls and ultimately handled by the exception handler code.  This will happen as expected even when the code executes on multiple different threads during it's operation.

 

Cancellation with Async

When we have long-running operations in our programs, as well as making them non-blocking, we very often want to make them cancellable.  F# async workflows support cancellation automatically, inserting cancellation checks at each let! or do!, and also at each iteration of a loop.  Cancellation is tracked and triggered using the new unified cancellation model in the .NET Framework, "System.Threading.CancellationToken".  For example, if we want to make downloadImageAsync cancellable, we simply need to pass in a cancellation token when we start the workflow, and then hook up a button to cancel a running operation.  The deltas to the PhotoViewer application are indicated in red below:

F# code:

 member this.DownloadAll(cancellationToken) =
    let work =
        container.ListBlobs()
        |> Seq.map (fun blob ->
            downloadImageAsync(container.GetBlobReference(blob.Uri.ToString())))
        |> Async.Parallel
    Async.StartAsTask(work, cancellationToken = cancellationToken) 

C# client code:

 CancellationTokenSource cancellableWork;
 

private void Start_Click(object sender, RoutedEventArgs e){
cancellableWork = new CancellationTokenSource();
downloader.DownloadAll(cancellableWork.Token);
}

 
private void Cancel_Click(object sender, RoutedEventArgs e){
    if (cancellableWork != null)
        cancellableWork.Cancel();
}
  

Other Great F# Async Samples

Don has recently posted two great articles on some other end-to-end samples built on F# Async.  These posts cover many of the same core F# Async topics, plus some additional topics not touched on here.  The samples are also a lot of fun, using data pulled from Twitter and Bing Translation services.

 

Summary

F#'s immutability, async workflows and agents are useful features for enabling easier parallel and asynchronous programming in application code.  The PDC talk provides a good starting point for understanding how these features can be applied.  The topics above cover a few of the related topics that I've been asked about since then, but there is also much, much more that can be enabled on top of these core features.

Comments

  • Anonymous
    February 01, 2010
    One thing that should probably be mentioned along with Exceptions with Async: if you want to throw an exception from within an async { } block, you need to write "return raise", not just "raise". That one tripped me up.

  • Anonymous
    February 04, 2010
    Matt - That's right.  As a rule, every branch of an async block must "return" a value.  Raise is not treated specially here, so there still must be a "return", even if it will not be reachable at runtime.