Udostępnij za pośrednictwem



November 2019

Volume 34 Number 11

[C#]

Iterating with Async Enumerables in C# 8

By Stephen Toub

Since the beginning of .NET, enumeration of collections has been the bread-and-butter of many programs. The non-generic System.Collections.IEnumerable interface enabled code to retrieve a System.Collections.IEnumerator, which in turn provided the basic functionality of MoveNext and Current for forward iterating through each element in the source collection. The C# language simplified this iteration further via the foreach keyword:

IEnumerable src = ...;
foreach (int item in src) Use(item);

When .NET Framework 2.0 came around with generics, System.Collections.Generic.IEnumerable<T> was introduced, enabling retrieving a System.Collections.Generic.IEnumerator<T> to support strongly typed iteration, a boon for both productivity and performance (due in large part to avoiding boxing):

IEnumerable<int> src = ...;
foreach (int item in src) Use(item);

As with the non-generic interface, this use of foreach is transformed by the compiler into calls on the underlying interfaces:

IEnumerable<int> src = ...;
IEnumerator<int> e = src.GetEnumerator();
try
{
  while (e.MoveNext()) Use(e.Current);
}
finally { if (e != null) e.Dispose(); }

In addition, C# 2.0 introduced iterators, which make it simple for developers to use normal control flow constructs to author custom enumerables, with the compiler rewriting developer methods that employ “yield return” statements into state machines suitable to implement IEnumerable<T> and IEnumerator<T>:

static IEnumerable<int> Range(int start, int count)
{
  for (int i = 0; i < count; i++)
  yield return start + i;
}
...
foreach (int item in Range(10, 3))
  Console.Write(item + " "); // prints 10 11 12

(This example shows using “yield return” to create an IEnumerable<T>, but the language also supports using IEnumerator<T> instead, in which case it’s equivalent to an IEnumerable<T> being produced and then GetEnumerator being called on the result.)

Fast forward to .NET Framework 4, which introduced the Sys­tem.Threading.Tasks.Task and Task<T> types, and .NET Framework 4.5 and C# 5, which introduced the async and await keywords to drastically simplify asynchronous programming. Instead of having to write complicated callback-based “spaghetti” code, developers could again use normal control flow constructs to author their asynchronous operations, with the compiler rewriting the developers’ async methods that employ await expressions into state machines that generate Tasks and internally use callbacks:

static async Task PrintAsync(string format, int iterations, int delayMilliseconds)
{
  for (int i = 0; i < iterations; i++)
  {
    await Task.Delay(delayMilliseconds);
    Console.WriteLine(string.Format(format, i));
  }
}
...
await PrintAsync("Iteration {0}", 5, 1_000);

(Subsequent releases of C# and .NET have seen many improvements around this asynchronous support, including runtime and compiler enhancements, as well as library additions, such as the introduction of the ValueTask and ValueTask<T> types that enable using async and await with fewer allocations.)

With the compiler providing support for iterators and for async methods, a common question that’s asked addresses the combination of the two. You can use yield to author synchronous enumerables, and you can use async and await to author asynchronous operations. What about using yield return with async and await to author asynchronous enumerables?

The answer to that question comes in C# 8 and .NET Core 3.0.

A Tour Through Async Enumerables

.NET Core 3.0 introduces the new System.Collections.Gener­ic.IA­syncEnumerable<T> and System.Collections.Generic.IAsync­Enumerator<T> interfaces. These interfaces, shown in Figure 1, should look very familiar, as they closely mirror their synchronous generic counterparts, and the concepts map directly: IAsync­Enumerable<T> provides a method to get an enumerator (IAsyncEnumerator<T>), which is disposable and which provides two additional members, one for moving forward to the next element and one for getting the current element. The deviations from the synchronous counterparts should also stand out: “Async” is used as a prefix in the type names and as a suffix in the names of members that may complete asynchronously; GetAsyncEnumerator accepts an optional CancellationToken; MoveNextAsync returns a ValueTask<bool> instead of bool; and DisposeAsync returns a ValueTask instead of void. (You’ll also notice the lack of a Reset method, which the synchronous counterpart exposes, but which is effectively deprecated.)

Figure 1 New Async Interfaces

namespace System.Collections.Generic
{
  public interface IAsyncEnumerable<out T>
  {
    IAsyncEnumerator<T> GetAsyncEnumerator(
      CancellationToken cancellationToken = default);
  }
  public interface IAsyncEnumerator<out T> : IAsyncDisposable
  {
    ValueTask<bool> MoveNextAsync();
    T Current { get; }
  }
}
namespace System
{
  public interface IAsyncDisposable
  {
    ValueTask DisposeAsync();
  }
}

C# provides direct support for async enumerables, just as it does with synchronous enumerables, both for consuming and for producing them. To iterate through them, await foreach is used instead of just foreach:

await foreach (int item in RangeAsync(10, 3))
  Console.Write(item + " "); // Prints 10 11 12

And, as with the synchronous code, the compiler transforms this into code very much like you’d write manually if using the interfaces directly:

IAsyncEnumerator<int> e = RangeAsync(10, 3).GetAsyncEnumerator();
try
{
  while (await e.MoveNextAsync()) Console.Write(e.Current + " ");
}
finally { if (e != null) await e.DisposeAsync(); }

To produce an async enumerable, the language supports writing an iterator just as it does for the synchronous case, but with async IAsyncEnumerable<T> instead of IEnumerable<T> in the signature (as with the synchronous support, the language also allows IAsyncEnumerator<T> to be used as the return type instead of IAsyncEnumerable<T>):

static async IAsyncEnumerable<int> RangeAsync(int start, int count)
{
  for (int i = 0; i < count; i++)
  {
    await Task.Delay(i);
    yield return start + i;
  }
}

The optional CancellationToken argument to GetAsyncEnumerator is used as a way to request cancellation of the enumerator: At any point during the enumeration, if cancellation is requested, an in-progress or subsequent MoveNextAsync call may be interrupted and throw an OperationCanceledException (or some derived type, like a TaskCanceledException). This begs two questions:

  • If the token needs to be passed to GetAsyncEnumerator, but it’s the compiler that’s generating the GetAsyncEnumerator call for my await foreach, how do I pass in a token?
  • If the token is passed to GetAsyncEnumerator and it’s the compiler that’s generating the GetAsyncEnumerator implementation for my async iterator method, from where do I get the passed-in token?

The answer to the first question (which I’ll also come back to shortly) is that there’s a WithCancellation extension method for IAsyncEnumerable<T>. It accepts a CancellationToken as an argument, and returns a custom struct type that await foreach binds to via a pattern rather than via the IAsyncEnumerable<T> interface, letting you write code like the following:

await foreach (int item in RangeAsync(10, 3).WithCancellation(token))
  Console.Write(item + " ");

This same pattern-based binding is also used to enable a ConfigureAwait method, which can be chained in a fluent design with WithCancellation, as well:

await foreach (int item in RangeAsync(10, 3).WithCancellation(token).ConfigureAwait(false))
  Console.Write(item + " ");

The answer to the second question is a new [EnumeratorCancellation] attribute. You can add a CancellationToken parameter to your async iterator method and annotate it with this attribute. In doing so, the compiler will generate code that will cause the token passed to GetAsyncEnumerator to be visible to the body of the async iterator as that argument:

static async IAsyncEnumerable<int> RangeAsync(
  int start, int count,
  [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
  for (int i = 0; i < count; i++)
  {
    await Task.Delay(i, cancellationToken);
    yield return start + i;
  }
}

This means that if you write this code:

var cts = new CancellationTokenSource();
await foreach (int item in RangeAsync(10, 3).WithCancellation(cts.Token) { ... }

the code inside of RangeAsync will see its cancellationToken parameter equal to cts.Token. Of course, because the token is a normal parameter to the iterator method, it’s also possible to pass the token directly as an argument:

var cts = new CancellationTokenSource();
await foreach (int item in RangeAsync(10, 3, cts.Token) { ... }

in which case the body of the async iterator will similarly see cts.Token as its cancellationToken. Why two different ways to do it? Passing the token directly to the method is easier, but it doesn’t work when you’re handed an arbitrary IAsyncEnumerable<T> from some other source but still want to be able to request cancellation of everything that composes it. In corner-cases, it can also be advantageous to pass the token to GetAsyncEnumerator, as doing so avoids “burning in” the token in the case where the single enumerable will be enumerated multiple times: By passing it to GetAsyncEnumerator, a different token can be passed each time. Of course, it’s also possible that two different tokens end up getting passed into the same iterator, one as an argument to the iterator and one via GetAsyncEnumerator. In that case, the compiler-generated code handles this by creating a new linked token that will have cancellation requested when either of the two tokens has cancellation requested, and that new “combined” token will be the one the iterator body sees.

Under the Hood of Async Iterators

Async iterators are transformed by the C# compiler into a state machine, enabling the developer to write simple code while the compiler handles all the complicated intricacies to provide an efficient implementation.

Let’s consider the RangeAsync method shown earlier. To begin, the compiler emits the method the developer wrote, but with the body replaced by code that sets up and returns the enumerable object:

[AsyncIteratorStateMachine(typeof(<RangeAsync>d__1))]
static IAsyncEnumerable<int> RangeAsync(int start, int count,
  [EnumeratorCancellation] CancellationToken cancellationToken = default) =>
  new <RangeAsync>d__1(-2) {
    <>3__start = start,
    <>3__count = count,
    <>3__cancellationToken = cancellationToken
  };

You can see that the method has the same signature as was written by the developer (except for the async keyword, which, as with the async keyword on async methods prior to C# 8, only affects how the compiler compiles the method and doesn’t actually impact the method’s signature in metadata), and that its sole purpose is to initialize a new instance of the <RangeAsync>d__1 type, which is the compiler-generated IAsyncEnumerable<int> implementation outlined in Figure 2. For such a “simple” method as the developer wrote in RangeAsync, there’s a lot going on here, so I’ll break it into pieces.

Figure 2 Compiler-Generated IAsyncEnumerable<T>

[CompilerGenerated]
private sealed class <RangeAsync>d__1 :
  IAsyncEnumerable<int>, IAsyncEnumerator<int>,
  IAsyncStateMachine,
  IValueTaskSource<bool>, IValueTaskSource,
{
  private CancellationTokenSource <>x__combinedTokens;
  public CancellationToken <>3__cancellationToken;
  public int <>3__start;
  public int <>3__count;
  public AsyncIteratorMethodBuilder <>t__builder;
  public ManualResetValueTaskSourceCore<bool> <>v__promiseOfValueOrEnd;
  public int <>1__state;
  private int <>l__initialThreadId;
  private bool <>w__disposeMode;
  private CancellationToken cancellationToken;
  private int start;
  private int count;
  private int <i>5__2;
  private TaskAwaiter <>u__1;
  private int <>2__current;
  public <RangeAsync>d__1(int <>1__state) { ... }
  IAsyncEnumerator<int> IAsyncEnumerable<int>.GetAsyncEnumerator(
    Cancellationb cancellationToken) { ... }
  ValueTask<bool> IAsyncEnumerator<int>.MoveNextAsync() { ... }
  int IAsyncEnumerator<int>.Current { get { ... } }
  ValueTask IAsyncDisposable.DisposeAsync() { ... }
  void IAsyncStateMachine.MoveNext() { ... }
  ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token) { ... }
  ValueTaskSourceStatus IValueTaskSource.GetStatus(short token) { ... }
  void IValueTaskSource<bool>.OnCompleted(Action<object> continuation, object state,
    short token, ValueTaskSourceOnCompletedFlags flags) { ... }
  void IValueTaskSource.OnCompleted(Action<object> continuation, object state,
    short token, ValueTaskSourceOnCompletedFlags flags) { ... }
  bool IValueTaskSource<bool>.GetResult(short token) { ... }
  void IValueTaskSource.GetResult(short token) { ... }
}

First, note that this type not only implements IAsyncEnumerable<int>, but also a bunch of other interfaces. This isn’t necessary from a functionality perspective, but it’s critical from a performance standpoint. The compiler implementation has been designed to keep allocations incredibly low; in fact, no matter how many times an async iterator yields, the most common case is that it incurs at most two allocations of overhead. To start, this is achieved by employing the same trick that synchronous iterators employ: The same object that implements the enumerable is reused as the enumerator as long as no one else is currently using it. That’s why the object also implements IAsyncEnumerator<int>, because the same object typically doubles as both, returning itself from its GetAsyncEnumerator method.

Then there’s the IAsyncStateMachine interface. The supporting APIs in the core libraries and runtime operate over abstract state machines as represented by this interface, so in order to be able to perform awaits, the class must implement this interface (it could employ a helper type to implement the interface, but that would be more allocation).

Arguably the most interesting interfaces, however, are IValue­TaskSource<bool> and IValueTaskSource, as this gets at the heart of how async enumerables can have so little overhead. When we first designed the async enumerable interfaces, the MoveNext­Async method returned a Task<bool>. The most common case is for the MoveNextAsync method to actually complete its operation synchronously, in which case the runtime would be able to use a cached task object: Every time it completes synchronously to return a true value, the same already-completed-with-a-true-result Task<bool> could be returned, making the synchronously completing case allocation-free. However, .NET Core 2.1 introduced the ability for a ValueTask<T> to be backed not just by a T or by a Task<T>, but also by a new IValueTaskSource<T> interface. This is powerful in that a developer is able to craft an implementation of IValueTaskSource<T> that can be reset and then reused with subsequent ValueTask<T>s. (For more information, see bit.ly/2kEyo81.) By having MoveNextAsync return a ValueTask<bool> instead of a Task<bool>, the compiler can create such an IValueTaskSource<bool> implementation, with every MoveNextAsync that completes synchronously returning a ValueTask<bool> that just wraps a bool, but every MoveNextAsync that completes asynchronously returning a ValueTask<bool> that wraps one of these reusable IValueTaskSource<bool> implementations. And the compiler-generated async enumerable object not only doubles as the enumerator and not only doubles as the async state machine object, it also doubles as exactly that IValueTaskSource<bool> implementation. The same applies to DisposeAsync. Most implementations of DisposeAsync will actually complete synchronously, in which case it can just return a default ValueTask. But if it needs to complete asynchronously, the compiler-generated implementation will just return a ValueTask wrapped around this same async enumerable object, which imple­ments IValueTaskSource, as well. So, no matter how many times MoveNextAsync needs to complete asynchronously, and whether or not DisposeAsync completes asynchronously, they don’t incur any additional overhead of allocations, because they just return this instance wrapped in a ValueTask<bool> or ValueTask, respectively.

After the interfaces, you see a bunch of fields. Some of these, like <>3__start and <>3__count, are there to store the arguments passed to the entry point method (you can see them being set to the arguments in the method shown earlier). These values need to be preserved in case the enumerable is enumerated again, which is why you also see start and count fields; those get initialized to their <>3_ counterparts in the GetAsyncEnumerator method and are then the actual fields manipulated when the corresponding “param­eters” in the developer’s code in the async iterator are read and written. Other fields, like <i>5__2, represent the “locals” used in the async iterator (in this case, it’s the i iteration variable in the for loop); any “local” that needs to survive across an await boundary is “lifted” to the state machine in this fashion. But arguably the two most interesting fields are the <>t__builder and <>v__promiseOfValueOrEnd fields. The former represents the lifetime of the asynchronous execution, and provides the facilities for the async iterator to hook in with the runtime’s support for things like ExecutionContext flow (ensuring that, for example, AsyncLocal<T> values are properly flowed across awaits). The latter is a ManualResetValueTaskSourceCore<T>, a type introduced in .NET Core 3.0 to contain most of the logic necessary to properly implement IValueTaskSource<T> and IValueTaskSource; the compiler-generated class implements the interfaces, but then delegates the implementations of these interface methods to this mutable struct stored in its field:

bool IValueTaskSource<bool>.GetResult(short token) =>
  <>v__promiseOfValueOrEnd.GetResult(token);
void IValueTaskSource<bool>.OnCompleted(
  Action<object> continuation, object state,
  short token, ValueTaskSourceOnCompletedFlags flags) =>
  <>v__promiseOfValueOrEnd.OnCompleted(continuation, state, token, flags);
ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token) =>
  <>v__promiseOfValueOrEnd.GetStatus(token);

The remainder of the implementation is really about the state machine itself and moving it forward. The code the developer writes in the async iterator is moved into a MoveNext helper method (Figure 3 shows an approximate decompilation of the IL generated by the C# compiler), just as is done for synchronous iterators and async methods. There are three main ways to return out of this void-returning helper: the code yields a current value, the code awaits something that hasn’t yet completed, or the code reaches the end of the enumeration (either successfully or via an unhandled exception). When the code yields a value, it stores that value into the <>2__current field and updates the <>1__state to indicate where the state machine should jump back to the next time MoveNext is invoked. When the code awaits an incomplete awaiter, it similarly sets the <>1__state (to the location of the code that checks the result of the then-completed awaited operation) and uses the <>t__builder to hook up a continuation that will cause the implementation to call MoveNext again (at which point it will jump to the location dictated by <>1__state). While the implementation may look complicated, that’s effectively all it is: the developer’s code interspersed with the logic for handling yields and awaits in this manner, plus a “jump table” at the beginning of the method that looks at the <>1__state and decides where to go based on it. Much of the remaining complication comes from error handling, as well as from the ability to execute finally blocks as part of DisposeAsync if the enumerator is disposed before it reaches the end, such as if code breaks out of an await foreach loop early.

Figure 3 State Machine Implementation

private void MoveNext()
{
  try
  {
    TaskAwaiter awaiter;
    switch (<>1__state)
    {
      default:
        if (<>w__disposeMode) goto DONE_ITERATING;
        <>1__state = -1;
        <i>5__2 = 0; // int i = 0;
        goto LOOP_CONDITION;
      case 0:
        awaiter = <>u__1;
        <>u__1 = default(TaskAwaiter);
        <>1__state = -1;
        goto DONE_AWAIT;
      case -4:
        <>1__state = -1;
        if (<>w__disposeMode) goto DONE_ITERATING;
        <i>5__2++; // i++
        goto LOOP_CONDITION;
    }
    LOOP_CONDITION:
    // i < count
    if (<i>5__2 >= count) goto DONE_ITERATING;
    awaiter = Task.Delay(<i>5__2, cancellationToken).GetAwaiter();
    if (!awaiter.IsCompleted) // await Task.Delay(i, cancellationToken);
    {
      <>1__state = 0;
      <>u__1 = awaiter;
      <RangeAsync>d__1 sm = this;
      <>t__builder.AwaitUnsafeOnCompleted(ref awaiter, ref sm);
      return;
    }
    DONE_AWAIT:
    awaiter.GetResult();
    <>2__current = start + <i>5__2;
    <>1__state = -4;
    goto RETURN_TRUE_FROM_MOVENEXTASYNC; // yield return start + i;
    DONE_ITERATING:
    <>1__state = -2;
    <>x__combinedTokens.Dispose();
    <>v__promiseOfValueOrEnd.SetResult(result: false);
    return;
    RETURN_TRUE_FROM_MOVENEXTASYNC:
    <>v__promiseOfValueOrEnd.SetResult(result: true);
  }
  catch (Exception e)
  {
    <>1__state = -2;
    <>x__combinedTokens?.Dispose();
    <>v__promiseOfValueOrEnd.SetException(e);
  }
}

Finally, there’s the MoveNextAsync method (Figure 4), which is comparatively simple. If the enumerator isn’t in a good state for MoveNextAsync to be called (for example, it’s already been disposed), then it just returns the equivalent of  “new ValueTask<bool>(false).” Otherwise, it resets the ManualResetValueTaskSourceCore<bool> for the next iteration and calls (via a runtime helper) the MoveNext method just shown. Then, if the invocation completed synchronously, a Boolean indicating whether it successfully moved next or hit the end of the iteration is returned wrapped in a ValueTask<bool>, and if it’s completing asynchronously, this object is wrapped in the returned ValueTask<bool>.

Figure 4 MoveNextAsync Method

ValueTask<bool> IAsyncEnumerator<int>.MoveNextAsync()
{
  if (<>1__state == -2)
    return default;
  <>v__promiseOfValueOrEnd.Reset();
  <RangeAsync>d__1 stateMachine = this;
  <>t__builder.MoveNext(ref stateMachine);
  short version = <>v__promiseOfValueOrEnd.Version;
  return <>v__promiseOfValueOrEnd.GetStatus(version) == ValueTaskSourceStatus.Succeeded ?
    new ValueTask<bool>(<>v__promiseOfValueOrEnd.GetResult(version)) :
    new ValueTask<bool>(this, version);
}

All of this is, of course, implementation detail and could easily change in the future. In fact, there are several additional optimizations the compiler can employ and hopefully will in future releases. The beauty of this is you get to keep writing the simple code you want in your async iterators and the compiler handles the details; and as the compiler improves, so, too, do your libraries and applications. The same goes for the runtime and supporting libraries. For example, .NET Core 2.1 and 3.0 both saw significant improvements in the infrastructure supporting async methods, such that existing async methods just got better, and those improvements accrue to async iterators, as well.

Much Ado About Threading

It can be tempting to think of things that are asynchronous as also being “thread-safe” and then jumping to conclusions based on that, so it’s important to understand what is and what is not safe when working with async enumerables.

It should be evident that it’s fine for one MoveNextAsync call to occur on a different thread from a previous or subsequent MoveNext­Async call; after all, the implementation may await a task and continue execution somewhere else. However, that doesn’t mean MoveNext­Async is “thread-safe”—far from it. On a given async enumerator, MoveNextAsync must never be invoked concurrently, meaning MoveNextAsync shouldn’t be called again on a given enumerator until the previous call to it has completed. Similarly, DisposeAsync on an iterator shouldn’t be invoked while either MoveNextAsync or DisposeAsync on that same enumerator is still in flight.

These rules are easy to follow, and you’d be hard-pressed not to follow them when using await foreach, which naturally follows the rules as part of the code it translates into. However, it’s possible to write slightly different code and find yourself with a problem. Consider this buggy variant:

IAsyncEnumerable<int> src = ...;
IAsyncEnumerator<int> e = src.GetAsyncEnumerator();
try
{
  while (await e.MoveNextAsync().TimeoutAfter(30)) // BUG!!
    Use(e.Current);
}
finally { if (e != null) await e.DisposeAsync(); }

This snippet is using a hypothetical TimeoutAfter method; it doesn’t actually exist in .NET Core 3.0, but imagine that it did or that someone wrote it as an extension method, with the semantics that if the task on which it’s called hasn’t completed within the specified timeout, it’ll throw an exception. Now consider this in the context of the previous rules: If this timeout were hit, that means the MoveNextAsync was still in flight, but the TimeoutAfter would cause the iterator to resume with an exception, the finally block to be entered, and DisposeAsync to be called on the enumerator that may still have MoveNextAsync in progress. This could end up failing in a variety of ways, or it could end up accidentally succeeding; in any event, stay away from code like that.

What About LINQ?

Language Integrated Query, or LINQ, provides both a set of helper methods for operating on synchronous enumerables and a set of keywords in the language for writing queries that then compile down to these helper methods. .NET Core 3.0 and C# 8 don’t include either of those for asynchronous enumerables. However, the github.com/dotnet/reactive project includes the System.Linq.Async library, which provides a full set of such extension methods for operating on IAsyncEnumerable<T>. You can include this library from NuGet in your project, and have access to a wide array of helpful extension methods for operating over IAsyncEnumerable<T> objects.

What’s Next?

C# 8 and .NET Core 3.0 are exciting releases. They include not only the aforementioned language and library support for async enumerables, but also a variety of types that produce or consume them (for example, the System.Threading.Channels.ChannelReader<T> type provides a ReadAllAsync method that returns an IAsyncEnumerable<T>). However, this really is just the beginning for async enumerables. I expect subsequent releases will see further support in the libraries, improvements in the compiler, and additional language functionality. On top of that, I expect we’ll see many NuGet libraries for interacting with IAsyncEnumerable<T>, just as we do for IEnumerable<T>. I’m looking forward to seeing all of the ways this feature set will be put to great use. Enjoy!


Stephen Toub works on .NET at Microsoft. You can find him on GitHub at github.com/stephentoub.

Thanks to the following Microsoft technical experts for reviewing this article: Julien Couvreur, Jared Parsons
Jared Parsons is a Principal Developer Lead on the C# Language Team.