Partager via


Using C# 2.0 iterators to simplify writing asynchronous code (part 2)

Previous article describes the idea of using C# 2.0 iterators to write asynchronous code, now it's time to implement the utility class that "runs" the iterator. The utility turns out to be very light, I'm glad some readers reported they've already implemented their own version.

Lets list requirements for the utility:

  1. The user code should run sequentially. The code can "jump" between threads, but the main benefit of the approach is to let user write sequential code.
  2. The iterator should only resume when all the asynchronous operations finished, and previous moveNext() has yield the result.
  3. The utility should provide centralized exception handling.
  4. If the iterator function has finally clause, C# converts it to Dispose() method of IDisposable interface. We need to call it at the end of iteration.

I've played with different ways to use the utility, and settled on the following pattern:

AsyncEnumerator ae = new AsyncEnumerator();
ae.Start(UserFunction(/*user params*/..., ae));

The AsyncEnumerator class exposes following public API:

public sealed class AsyncEnumerator
{
public AsyncEnumerator() {}

    // The user-supplied exception handler,
// if it returns true the enumeration stops.
public Predicate<Exception> Catch
{
get { }
set { }
}

// User should never call this delegate directly, but
// should pass this callback to any async API it calls.
public AsyncCallback Callback
{
get { }
}

    public void Start(IEnumerable<Int32> enumerable) {}
}

Now how do we run the user code sequentially, and only resume it after all async operations have finished? The first iteration is invoked directly (i.e. we simply call enumerator.MoveNext, but to make the Start return immediately, we do it on thread pool thread). To decide when to invoke following iterations, let's keep a counter of outstanding async operations (this value it returned by user's yield return statement and is accessible by enumerator.Current). When an async operation finishes and our callback is called, we decrement this counter by 1 (of course, we should use interlocked operations since callbacks may get called in parallel). Once the counter reaches 0, all the outstanding async operations have completed and we should resume the iterator (i.e. call enumerator.MoveNext again).

The only tricky part is that callback(s) may get called before the iterator returns from MoveNext() call - this is why I've added the requirement that iterator can only be resumed after moveNext() has returned. The callback might even be called on the same thread that called the asynchronous method - if there is enough information to complete this call synchronously. The way we handle it is to allow the number of outstanding async operations be negative - we start with 0; each async callback decrements the count by 1; and when iterator.MoveNext() returns we increment it by iterator.Current. If after any of these operations the number of outstanding async operations again becomes 0, we know that all async operations have finished and the iterator code has exited with 'yield return', so it is safe to advance the iterator again: 

Thus there is a race between two threads that may resume iterator: the original thread that called previous iterator.MoveNext() and the thread that reported end of the last async operation. But it's OK, since our contract allows iterator code to migrate between threads, we only guarantee that a single thread runs it at a time.

public sealed class AsyncEnumerator
{
private IEnumerator<Int32> m_enumerator;
private Int32 m_PendingAsyncOps = 0;

    public void Start(IEnumerable<Int32> enumerable)
{
m_enumerator = enumerable.GetEnumerator();

// let's start the enumeration asynchronously too
ThreadPool.QueueUserWorkItem(delegate { Advance(); });
}

    // This is first implementation of Advance, to be revised below.
private void Advance()
{
while(true)
{
if (!m_enumerator.MoveNext())
break;

// The enumerator returns the number of async ops it initiated
// Add this to m_PendingAsyncOps
int pendingAsyncOps = Interlocked.Add(ref m_PendingAsyncOps, m_enumerator.Current);

if (pendingAsyncOps != 0)
                break; // quit this thread, let other thread resume iteration

/* No pending Async Ops, continue enumeration */
}
}

// The delegate for this function is passed to all async calls that
// user makes and thus it is automatically called by async API
// when an async operation finishes.
// It is never invoked by our code or directly by user.
private void Advance(IAsyncResult ar)
{
// An async op completed, subtract 1 from m_PendingAsyncOps
Int32 pendingAsyncOps = Interlocked.Decrement(ref m_PendingAsyncOps);
if (m_PendingAsyncOps == 0)
{
// The last pending async op completed, continue enumeration
Advance();
}
}
}

Finally, last requirements: exception hanlding and calling Dispose() method at the end of iteration. Both changes only affect void Advance() function that works with iterator - we need to catch the exception and call the user-provider handler, and at the end of iteration call Dispose():

    private void Advance()
{
bool end = true; // Assume that we'll stop the iteration

while(true)
{
try
{
end = !m_enumerator.MoveNext();
}
catch (Exception e)
{
// An exception occurred, catch it and execute the callback.
// The callback returns true to end the iteration or false to continue it
end = (m_catch != null) ? m_catch(e) : true;
}
if (end)
break;

// The enumerator returns the number of async ops it initiated
// Add this to m_PendingAsyncOps
int pendingAsyncOps = Interlocked.Add(ref m_PendingAsyncOps, m_enumerator.Current);

if (pendingAsyncOps != 0)
break; // quit this thread, let other thread resume iteration

/* No pending Async Ops, continue enumeration */
}

if (end)
{
// If nothing else to do, execute iterator's finally code
IDisposable d = m_enumerator as IDisposable;
if (d != null) d.Dispose();
}
}

Now the only missing piece is the public API. We have public Callback property that returns AsyncCallback delegate - we just return the second void Advance(IAsyncResult ar) function, but to avoid creating new delegate for each async call, lets cache it. And exception handler is just a delegate that user can provide:

public sealed class AsyncEnumerator
{
private Predicate<Exception> m_catch;

// Delegate for passing to async methods
// (to avoid creating new delegate for each call).
private AsyncCallback callback;

public AsyncEnumerator()
{
this.callback = Advance;
}

/// <summary>
/// The user-supplied exception handler, should return true to stop enumerator
/// </summary>
public Predicate<Exception> Catch
{
get { return m_catch; }
set { m_catch = value; }
}

/// <summary>
/// The callback that user code should pass to all async APIs
/// </summary>
public AsyncCallback Callback
{
get { return this.callback; }
}
}

This is all for today. For next blog entry, I plan to do performance measurements, and compare code that uses this utility with code that uses threads and synchronous calls. But I've not done this yet, so you are welcome to post your results!

Comments

  • Anonymous
    April 01, 2006
    The comment has been removed

  • Anonymous
    April 01, 2006
    Very interesting, I should have googled before posting this :)

    Indeed, yours and Miguel de Icaza blogs describe almost the same usage as I described.

    I believe the synchronization logic is necessary though - C#-generated iterator is not thread-safe, and both implementations (at least as described in blogs) suffer from the problem solved in my article: they do not handle correctly the case when async operation finishes before or at the same time when the thread that started it produces the value with 'yield return'. And this is not just theoretical problem - I've found this could happen with HttpWebRequest class.

    The complexity of implementation is arguable, but with some added complexity in the library, it lets one write simpler code: by centralizing exception handling, ensuring finally block in the iterator is called, and avoiding unnecessary anonymous delegates in user code – the user writes simple sequential code (this is also the case for Miguel's version).

    But anyway it is the same idea, thanks for letting me know – it was very educating to look at other implementations! I'm adding your and Miguels blogs to my RSS agregator :)

  • Anonymous
    April 01, 2006
    The comment has been removed

  • Anonymous
    April 01, 2006
    Thanks, I now understand how you code works - yes, it looks thread-safe.

    ***

    About C# compiler doing the code transformation - this was exactly the same wish I've got too :)

    The second thought was that LISP with its (arguably evil, but powerful) macro facility would allow implementing this in a library, rather than modifying core language. Unfortunately, I don't see a way to have LISP-like macro for a language with complex syntax like C#.

  • Anonymous
    April 04, 2007
    you both rock, my hat is off to both of you sirs! thank you for your conversation, i find it very interesting and compelling!

  • Anonymous
    May 03, 2007
    Thanks for a great article.  Most of the examples I've seen for using sockets don't deal with the real life issues such as the one you mention with HttpWebRequest.  M(ost examples don't deal with packet fragmentation either, simply assuming that any aribitrary number of bytes they read is the entire message and can be converted to a string with a single call to an encoding function.)  I appreciate the complication that you added to your code to make it more realistic.