Async Streams
Note
This article is a feature specification. The specification serves as the design document for the feature. It includes proposed specification changes, along with information needed during the design and development of the feature. These articles are published until the proposed spec changes are finalized and incorporated in the current ECMA specification.
There may be some discrepancies between the feature specification and the completed implementation. Those differences are captured in the pertinent language design meeting (LDM) notes.
You can learn more about the process for adopting feature speclets into the C# language standard in the article on the specifications.
Summary
C# has support for iterator methods and async methods, but no support for a method that is both an iterator and an async method. We should rectify this by allowing for await
to be used in a new form of async
iterator, one that returns an IAsyncEnumerable<T>
or IAsyncEnumerator<T>
rather than an IEnumerable<T>
or IEnumerator<T>
, with IAsyncEnumerable<T>
consumable in a new await foreach
. An IAsyncDisposable
interface is also used to enable asynchronous cleanup.
Related discussion
Detailed design
Interfaces
IAsyncDisposable
There has been much discussion of IAsyncDisposable
(e.g. https://github.com/dotnet/roslyn/issues/114) and whether it's a good idea. However, it's a required concept to add in support of async iterators. Since finally
blocks may contain await
s, and since finally
blocks need to be run as part of disposing of iterators, we need async disposal. It's also just generally useful any time cleaning up of resources might take any period of time, e.g. closing files (requiring flushes), deregistering callbacks and providing a way to know when deregistration has completed, etc.
The following interface is added to the core .NET libraries (e.g. System.Private.CoreLib / System.Runtime):
namespace System
{
public interface IAsyncDisposable
{
ValueTask DisposeAsync();
}
}
As with Dispose
, invoking DisposeAsync
multiple times is acceptable, and subsequent invocations after the first should be treated as no-ops, returning a synchronously completed successful task (DisposeAsync
need not be thread-safe, though, and need not support concurrent invocation). Further, types may implement both IDisposable
and IAsyncDisposable
, and if they do, it's similarly acceptable to invoke Dispose
and then DisposeAsync
or vice versa, but only the first should be meaningful and subsequent invocations of either should be a nop. As such, if a type does implement both, consumers are encouraged to call once and only once the more relevant method based on the context, Dispose
in synchronous contexts and DisposeAsync
in asynchronous ones.
(How IAsyncDisposable
interacts with using
is a separate discussion. And coverage of how it interacts with foreach
is handled later in this proposal.)
Alternatives considered:
DisposeAsync
accepting aCancellationToken
: while in theory it makes sense that anything async can be canceled, disposal is about cleanup, closing things out, free'ing resources, etc., which is generally not something that should be canceled; cleanup is still important for work that's canceled. The sameCancellationToken
that caused the actual work to be canceled would typically be the same token passed toDisposeAsync
, makingDisposeAsync
worthless because cancellation of the work would causeDisposeAsync
to be a no-op. If someone wants to avoid being blocked waiting for disposal, they can avoid waiting on the resultingValueTask
, or wait on it only for some period of time.DisposeAsync
returning aTask
: Now that a non-genericValueTask
exists and can be constructed from anIValueTaskSource
, returningValueTask
fromDisposeAsync
allows an existing object to be reused as the promise representing the eventual async completion ofDisposeAsync
, saving aTask
allocation in the case whereDisposeAsync
completes asynchronously.- Configuring
DisposeAsync
with abool continueOnCapturedContext
(ConfigureAwait
): While there may be issues related to how such a concept is exposed tousing
,foreach
, and other language constructs that consume this, from an interface perspective it's not actually doing anyawait
'ing and there's nothing to configure... consumers of theValueTask
can consume it however they wish. IAsyncDisposable
inheritingIDisposable
: Since only one or the other should be used, it doesn't make sense to force types to implement both.IDisposableAsync
instead ofIAsyncDisposable
: We've been following the naming that things/types are an "async something" whereas operations are "done async", so types have "Async" as a prefix and methods have "Async" as a suffix.
IAsyncEnumerable / IAsyncEnumerator
Two interfaces are added to the core .NET libraries:
namespace System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
ValueTask<bool> MoveNextAsync();
T Current { get; }
}
}
Typical consumption (without additional language features) would look like:
IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
Use(enumerator.Current);
}
}
finally { await enumerator.DisposeAsync(); }
Discarded options considered:
Task<bool> MoveNextAsync(); T current { get; }
: UsingTask<bool>
would support using a cached task object to represent synchronous, successfulMoveNextAsync
calls, but an allocation would still be required for asynchronous completion. By returningValueTask<bool>
, we enable the enumerator object to itself implementIValueTaskSource<bool>
and be used as the backing for theValueTask<bool>
returned fromMoveNextAsync
, which in turn allows for significantly reduced overheads.ValueTask<(bool, T)> MoveNextAsync();
: It's not only harder to consume, but it means thatT
can no longer be covariant.ValueTask<T?> TryMoveNextAsync();
: Not covariant.Task<T?> TryMoveNextAsync();
: Not covariant, allocations on every call, etc.ITask<T?> TryMoveNextAsync();
: Not covariant, allocations on every call, etc.ITask<(bool,T)> TryMoveNextAsync();
: Not covariant, allocations on every call, etc.Task<bool> TryMoveNextAsync(out T result);
: Theout
result would need to be set when the operation returns synchronously, not when it asynchronously completes the task potentially sometime long in the future, at which point there'd be no way to communicate the result.IAsyncEnumerator<T>
not implementingIAsyncDisposable
: We could choose to separate these. However, doing so complicates certain other areas of the proposal, as code must then be able to deal with the possibility that an enumerator doesn't provide disposal, which makes it difficult to write pattern-based helpers. Further, it will be common for enumerators to have a need for disposal (e.g. any C# async iterator that has a finally block, most things enumerating data from a network connection, etc.), and if one doesn't, it is simple to implement the method purely aspublic ValueTask DisposeAsync() => default(ValueTask);
with minimal additional overhead.- _
IAsyncEnumerator<T> GetAsyncEnumerator()
: No cancellation token parameter.
The following subsection discuss alternatives that weren't chosen.
Viable alternative:
namespace System.Collections.Generic
{
public interface IAsyncEnumerable<out T>
{
IAsyncEnumerator<T> GetAsyncEnumerator();
}
public interface IAsyncEnumerator<out T> : IAsyncDisposable
{
ValueTask<bool> WaitForNextAsync();
T TryGetNext(out bool success);
}
}
TryGetNext
is used in an inner loop to consume items with a single interface call as long as they're available synchronously. When the next item can't be retrieved synchronously, it returns false, and any time it returns false, a caller must subsequently invoke WaitForNextAsync
to either wait for the next item to be available or to determine that there will never be another item. Typical consumption (without additional language features) would look like:
IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator();
try
{
while (await enumerator.WaitForNextAsync())
{
while (true)
{
int item = enumerator.TryGetNext(out bool success);
if (!success) break;
Use(item);
}
}
}
finally { await enumerator.DisposeAsync(); }
The advantage of this is two-fold, one minor and one major:
- Minor: Allows for an enumerator to support multiple consumers. There may be scenarios where it's valuable for an enumerator to support multiple concurrent consumers. That can't be achieved when
MoveNextAsync
andCurrent
are separate such that an implementation can't make their usage atomic. In contrast, this approach provides a single methodTryGetNext
that supports pushing the enumerator forward and getting the next item, so the enumerator can enable atomicity if desired. However, it's likely that such scenarios could also be enabled by giving each consumer its own enumerator from a shared enumerable. Further, we don't want to enforce that every enumerator support concurrent usage, as that would add non-trivial overheads to the majority case that doesn't require it, which means a consumer of the interface generally couldn't rely on this any way. - Major: Performance. The
MoveNextAsync
/Current
approach requires two interface calls per operation, whereas the best case forWaitForNextAsync
/TryGetNext
is that most iterations complete synchronously, enabling a tight inner loop withTryGetNext
, such that we only have one interface call per operation. This can have a measurable impact in situations where the interface calls dominate the computation.
However, there are non-trivial downsides, including significantly increased complexity when consuming these manually, and an increased chance of introducing bugs when using them. And while the performance benefits show up in microbenchmarks, we don't believe they'll be impactful in the vast majority of real usage. If it turns out they are, we can introduce a second set of interfaces in a light-up fashion.
Discarded options considered:
ValueTask<bool> WaitForNextAsync(); bool TryGetNext(out T result);
:out
parameters can't be covariant. There's also a small impact here (an issue with the try pattern in general) that this likely incurs a runtime write barrier for reference type results.
Cancellation
There are several possible approaches to supporting cancellation:
IAsyncEnumerable<T>
/IAsyncEnumerator<T>
are cancellation-agnostic:CancellationToken
doesn't appear anywhere. Cancellation is achieved by logically baking theCancellationToken
into the enumerable and/or enumerator in whatever manner is appropriate, e.g. when calling an iterator, passing theCancellationToken
as an argument to the iterator method and using it in the body of the iterator, as is done with any other parameter.IAsyncEnumerator<T>.GetAsyncEnumerator(CancellationToken)
: You pass aCancellationToken
toGetAsyncEnumerator
, and subsequentMoveNextAsync
operations respect it however it can.IAsyncEnumerator<T>.MoveNextAsync(CancellationToken)
: You pass aCancellationToken
to each individualMoveNextAsync
call.- 1 && 2: You both embed
CancellationToken
s into your enumerable/enumerator and passCancellationToken
s intoGetAsyncEnumerator
. - 1 && 3: You both embed
CancellationToken
s into your enumerable/enumerator and passCancellationToken
s intoMoveNextAsync
.
From a purely theoretical perspective, (5) is the most robust, in that (a) MoveNextAsync
accepting a CancellationToken
enables the most fine-grained control over what's canceled, and (b) CancellationToken
is just any other type that can passed as an argument into iterators, embedded in arbitrary types, etc.
However, there are multiple problems with that approach:
- How does a
CancellationToken
passed toGetAsyncEnumerator
make it into the body of the iterator? We could expose a newiterator
keyword that you could dot off of to get access to theCancellationToken
passed toGetEnumerator
, but a) that's a lot of additional machinery, b) we're making it a very first-class citizen, and c) the 99% case would seem to be the same code both calling an iterator and callingGetAsyncEnumerator
on it, in which case it can just pass theCancellationToken
as an argument into the method. - How does a
CancellationToken
passed toMoveNextAsync
get into the body of the method? This is even worse, as if it's exposed off of aniterator
local object, its value could change across awaits, which means any code that registered with the token would need to unregister from it prior to awaits and then re-register after; it's also potentially quite expensive to need to do such registering and unregistering in everyMoveNextAsync
call, regardless of whether implemented by the compiler in an iterator or by a developer manually. - How does a developer cancel a
foreach
loop? If it's done by giving aCancellationToken
to an enumerable/enumerator, then either a) we need to supportforeach
'ing over enumerators, which raises them to being first-class citizens, and now you need to start thinking about an ecosystem built up around enumerators (e.g. LINQ methods) or b) we need to embed theCancellationToken
in the enumerable anyway by having someWithCancellation
extension method off ofIAsyncEnumerable<T>
that would store the provided token and then pass it into the wrapped enumerable'sGetAsyncEnumerator
when theGetAsyncEnumerator
on the returned struct is invoked (ignoring that token). Or, you can just use theCancellationToken
you have in the body of the foreach. - If/when query comprehensions are supported, how would the
CancellationToken
supplied toGetEnumerator
orMoveNextAsync
be passed into each clause? The easiest way would simply be for the clause to capture it, at which point whatever token is passed toGetAsyncEnumerator
/MoveNextAsync
is ignored.
An earlier version of this document recommended (1), but we since switched to (4).
The two main problems with (1):
- producers of cancellable enumerables have to implement some boilerplate, and can only leverage the compiler's support for async-iterators to implement a
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken)
method. - it is likely that many producers would be tempted to just add a
CancellationToken
parameter to their async-enumerable signature instead, which will prevent consumers from passing the cancellation token they want when they are given anIAsyncEnumerable
type.
There are two main consumption scenarios:
await foreach (var i in GetData(token)) ...
where the consumer calls the async-iterator method,await foreach (var i in givenIAsyncEnumerable.WithCancellation(token)) ...
where the consumer deals with a givenIAsyncEnumerable
instance.
We find that a reasonable compromise to support both scenarios in a way that is convenient for both producers and consumers of async-streams is to use a specially annotated parameter in the async-iterator method. The [EnumeratorCancellation]
attribute is used for this purpose. Placing this attribute on a parameter tells the compiler that if a token is passed to the GetAsyncEnumerator
method, that token should be used instead of the value originally passed for the parameter.
Consider IAsyncEnumerable<int> GetData([EnumeratorCancellation] CancellationToken token = default)
.
The implementer of this method can simply use the parameter in the method body.
The consumer can use either consumption patterns above:
- if you use
GetData(token)
, then the token is saved into the async-enumerable and will be used in iteration, - if you use
givenIAsyncEnumerable.WithCancellation(token)
, then the token passed toGetAsyncEnumerator
will supersede any token saved in the async-enumerable.
foreach
foreach
will be augmented to support IAsyncEnumerable<T>
in addition to its existing support for IEnumerable<T>
. And it will support the equivalent of IAsyncEnumerable<T>
as a pattern if the relevant members are exposed publicly, falling back to using the interface directly if not, in order to enable struct-based extensions that avoid allocating as well as using alternative awaitables as the return type of MoveNextAsync
and DisposeAsync
.
Syntax
Using the syntax:
foreach (var i in enumerable)
C# will continue to treat enumerable
as a synchronous enumerable, such that even if it exposes the relevant APIs for async enumerables (exposing the pattern or implementing the interface), it will only consider the synchronous APIs.
To force foreach
to instead only consider the asynchronous APIs, await
is inserted as follows:
await foreach (var i in enumerable)
No syntax would be provided that would support using either the async or the sync APIs; the developer must choose based on the syntax used.
Semantics
The compile-time processing of an await foreach
statement first determines the collection type, enumerator type and iteration type of the expression (very similar to https://github.com/dotnet/csharpstandard/blob/draft-v8/standard/statements.md#1395-the-foreach-statement). This determination proceeds as follows:
- If the type
X
of expression isdynamic
or an array type, then an error is produced and no further steps are taken. - Otherwise, determine whether the type
X
has an appropriateGetAsyncEnumerator
method:- Perform member lookup on the type
X
with identifierGetAsyncEnumerator
and no type arguments. If the member lookup does not produce a match, or it produces an ambiguity, or produces a match that is not a method group, check for an enumerable interface as described below. - Perform overload resolution using the resulting method group and an empty argument list. If overload resolution results in no applicable methods, results in an ambiguity, or results in a single best method but that method is either static or not public, check for an enumerable interface as described below.
- If the return type
E
of theGetAsyncEnumerator
method is not a class, struct or interface type, an error is produced and no further steps are taken. - Member lookup is performed on
E
with the identifierCurrent
and no type arguments. If the member lookup produces no match, the result is an error, or the result is anything except a public instance property that permits reading, an error is produced and no further steps are taken. - Member lookup is performed on
E
with the identifierMoveNextAsync
and no type arguments. If the member lookup produces no match, the result is an error, or the result is anything except a method group, an error is produced and no further steps are taken. - Overload resolution is performed on the method group with an empty argument list. If overload resolution results in no applicable methods, results in an ambiguity, or results in a single best method but that method is either static or not public, or its return type is not awaitable into
bool
, an error is produced and no further steps are taken. - The collection type is
X
, the enumerator type isE
, and the iteration type is the type of theCurrent
property.
- Perform member lookup on the type
- Otherwise, check for an enumerable interface:
- If among all the types
Tᵢ
for which there is an implicit conversion fromX
toIAsyncEnumerable<ᵢ>
, there is a unique typeT
such thatT
is not dynamic and for all the otherTᵢ
there is an implicit conversion fromIAsyncEnumerable<T>
toIAsyncEnumerable<Tᵢ>
, then the collection type is the interfaceIAsyncEnumerable<T>
, the enumerator type is the interfaceIAsyncEnumerator<T>
, and the iteration type isT
. - Otherwise, if there is more than one such type
T
, then an error is produced and no further steps are taken.
- If among all the types
- Otherwise, an error is produced and no further steps are taken.
The above steps, if successful, unambiguously produce a collection type C
, enumerator type E
and iteration type T
.
await foreach (V v in x) «embedded_statement»
is then expanded to:
{
E e = ((C)(x)).GetAsyncEnumerator();
try {
while (await e.MoveNextAsync()) {
V v = (V)(T)e.Current;
«embedded_statement»
}
}
finally {
... // Dispose e
}
}
The body of the finally
block is constructed according to the following steps:
- If the type
E
has an appropriateDisposeAsync
method:- Perform member lookup on the type
E
with identifierDisposeAsync
and no type arguments. If the member lookup does not produce a match, or it produces an ambiguity, or produces a match that is not a method group, check for the disposal interface as described below. - Perform overload resolution using the resulting method group and an empty argument list. If overload resolution results in no applicable methods, results in an ambiguity, or results in a single best method but that method is either static or not public, check for the disposal interface as described below.
- If the return type of the
DisposeAsync
method is not awaitable, an error is produced and no further steps are taken. - The
finally
clause is expanded to the semantic equivalent of:
finally { await e.DisposeAsync(); }
- Perform member lookup on the type
- Otherwise, if there is an implicit conversion from
E
to theSystem.IAsyncDisposable
interface, then- If
E
is a non-nullable value type then thefinally
clause is expanded to the semantic equivalent of:
finally { await ((System.IAsyncDisposable)e).DisposeAsync(); }
- Otherwise the
finally
clause is expanded to the semantic equivalent of:
except that iffinally { System.IAsyncDisposable d = e as System.IAsyncDisposable; if (d != null) await d.DisposeAsync(); }
E
is a value type, or a type parameter instantiated to a value type, then the conversion ofe
toSystem.IAsyncDisposable
shall not cause boxing to occur.
- If
- Otherwise, the
finally
clause is expanded to an empty block:finally { }
ConfigureAwait
This pattern-based compilation will allow ConfigureAwait
to be used on all of the awaits, via a ConfigureAwait
extension method:
await foreach (T item in enumerable.ConfigureAwait(false))
{
...
}
This will be based on types we'll add to .NET as well, likely to System.Threading.Tasks.Extensions.dll:
// Approximate implementation, omitting arg validation and the like
namespace System.Threading.Tasks
{
public static class AsyncEnumerableExtensions
{
public static ConfiguredAsyncEnumerable<T> ConfigureAwait<T>(this IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext) =>
new ConfiguredAsyncEnumerable<T>(enumerable, continueOnCapturedContext);
public struct ConfiguredAsyncEnumerable<T>
{
private readonly IAsyncEnumerable<T> _enumerable;
private readonly bool _continueOnCapturedContext;
internal ConfiguredAsyncEnumerable(IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext)
{
_enumerable = enumerable;
_continueOnCapturedContext = continueOnCapturedContext;
}
public ConfiguredAsyncEnumerator<T> GetAsyncEnumerator() =>
new ConfiguredAsyncEnumerator<T>(_enumerable.GetAsyncEnumerator(), _continueOnCapturedContext);
public struct ConfiguredAsyncEnumerator<T>
{
private readonly IAsyncEnumerator<T> _enumerator;
private readonly bool _continueOnCapturedContext;
internal ConfiguredAsyncEnumerator(IAsyncEnumerator<T> enumerator, bool continueOnCapturedContext)
{
_enumerator = enumerator;
_continueOnCapturedContext = continueOnCapturedContext;
}
public ConfiguredValueTaskAwaitable<bool> MoveNextAsync() =>
_enumerator.MoveNextAsync().ConfigureAwait(_continueOnCapturedContext);
public T Current => _enumerator.Current;
public ConfiguredValueTaskAwaitable DisposeAsync() =>
_enumerator.DisposeAsync().ConfigureAwait(_continueOnCapturedContext);
}
}
}
}
Note that this approach will not enable ConfigureAwait
to be used with pattern-based enumerables, but then again it's already the case that the ConfigureAwait
is only exposed as an extension on Task
/Task<T>
/ValueTask
/ValueTask<T>
and can't be applied to arbitrary awaitable things, as it only makes sense when applied to Tasks (it controls a behavior implemented in Task's continuation support), and thus doesn't make sense when using a pattern where the awaitable things may not be tasks. Anyone returning awaitable things can provide their own custom behavior in such advanced scenarios.
(If we can come up with some way to support a scope- or assembly-level ConfigureAwait
solution, then this won't be necessary.)
Async Iterators
The language / compiler will support producing IAsyncEnumerable<T>
s and IAsyncEnumerator<T>
s in addition to consuming them. Today the language supports writing an iterator like:
static IEnumerable<int> MyIterator()
{
try
{
for (int i = 0; i < 100; i++)
{
Thread.Sleep(1000);
yield return i;
}
}
finally
{
Thread.Sleep(200);
Console.WriteLine("finally");
}
}
but await
can't be used in the body of these iterators. We will add that support.
Syntax
The existing language support for iterators infers the iterator nature of the method based on whether it contains any yield
s. The same will be true for async iterators. Such async iterators will be demarcated and differentiated from synchronous iterators via adding async
to the signature, and must then also have either IAsyncEnumerable<T>
or IAsyncEnumerator<T>
as its return type. For example, the above example could be written as an async iterator as follows:
static async IAsyncEnumerable<int> MyIterator()
{
try
{
for (int i = 0; i < 100; i++)
{
await Task.Delay(1000);
yield return i;
}
}
finally
{
await Task.Delay(200);
Console.WriteLine("finally");
}
}
Alternatives considered:
- Not using
async
in the signature: Usingasync
is likely technically required by the compiler, as it uses it to determine whetherawait
is valid in that context. But even if it's not required, we've established thatawait
may only be used in methods marked asasync
, and it seems important to keep the consistency. - Enabling custom builders for
IAsyncEnumerable<T>
: That's something we could look at for the future, but the machinery is complicated and we don't support that for the synchronous counterparts. - Having an
iterator
keyword in the signature: Async iterators would useasync iterator
in the signature, andyield
could only be used inasync
methods that includediterator
;iterator
would then be made optional on synchronous iterators. Depending on your perspective, this has the benefit of making it very clear by the signature of the method whetheryield
is allowed and whether the method is actually meant to return instances of typeIAsyncEnumerable<T>
rather than the compiler manufacturing one based on whether the code usesyield
or not. But it is different from synchronous iterators, which don't and can't be made to require one. Plus some developers don't like the extra syntax. If we were designing it from scratch, we'd probably make this required, but at this point there's much more value in keeping async iterators close to sync iterators.
LINQ
There are over ~200 overloads of methods on the System.Linq.Enumerable
class, all of which work in terms of IEnumerable<T>
; some of these accept IEnumerable<T>
, some of them produce IEnumerable<T>
, and many do both. Adding LINQ support for IAsyncEnumerable<T>
would likely entail duplicating all of these overloads for it, for another ~200. And since IAsyncEnumerator<T>
is likely to be more common as a standalone entity in the asynchronous world than IEnumerator<T>
is in the synchronous world, we could potentially need another ~200 overloads that work with IAsyncEnumerator<T>
. Plus, a large number of the overloads deal with predicates (e.g. Where
that takes a Func<T, bool>
), and it may be desirable to have IAsyncEnumerable<T>
-based overloads that deal with both synchronous and asynchronous predicates (e.g. Func<T, ValueTask<bool>>
in addition to Func<T, bool>
). While this isn't applicable to all of the now ~400 new overloads, a rough calculation is that it'd be applicable to half, which means another ~200 overloads, for a total of ~600 new methods.
That is a staggering number of APIs, with the potential for even more when extension libraries like Interactive Extensions (Ix) are considered. But Ix already has an implementation of many of these, and there doesn't seem to be a great reason to duplicate that work; we should instead help the community improve Ix and recommend it for when developers want to use LINQ with IAsyncEnumerable<T>
.
There is also the issue of query comprehension syntax. The pattern-based nature of query comprehensions would allow them to "just work" with some operators, e.g. if Ix provides the following methods:
public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> func);
public static IAsyncEnumerable<T> Where(this IAsyncEnumerable<T> source, Func<T, bool> func);
then this C# code will "just work":
IAsyncEnumerable<int> enumerable = ...;
IAsyncEnumerable<int> result = from item in enumerable
where item % 2 == 0
select item * 2;
However, there is no query comprehension syntax that supports using await
in the clauses, so if Ix added, for example:
public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> func);
then this would "just work":
IAsyncEnumerable<string> result = from url in urls
where item % 2 == 0
select SomeAsyncMethod(item);
async ValueTask<int> SomeAsyncMethod(int item)
{
await Task.Yield();
return item * 2;
}
but there'd be no way to write it with the await
inline in the select
clause. As a separate effort, we could look into adding async { ... }
expressions to the language, at which point we could allow them to be used in query comprehensions and the above could instead be written as:
IAsyncEnumerable<int> result = from item in enumerable
where item % 2 == 0
select async
{
await Task.Yield();
return item * 2;
};
or to enabling await
to be used directly in expressions, such as by supporting async from
. However, it's unlikely a design here would impact the rest of the feature set one way or the other, and this isn't a particularly high-value thing to invest in right now, so the proposal is to do nothing additional here right now.
Integration with other asynchronous frameworks
Integration with IObservable<T>
and other asynchronous frameworks (e.g. reactive streams) would be done at the library level rather than at the language level. For example, all of the data from an IAsyncEnumerator<T>
can be published to an IObserver<T>
simply by await foreach
'ing over the enumerator and OnNext
'ing the data to the observer, so an AsObservable<T>
extension method is possible. Consuming an IObservable<T>
in a await foreach
requires buffering the data (in case another item is pushed while the previous item is still being processing), but such a push-pull adapter can easily be implemented to enable an IObservable<T>
to be pulled from with an IAsyncEnumerator<T>
. Etc. Rx/Ix already provide prototypes of such implementations, and libraries like https://github.com/dotnet/corefx/tree/master/src/System.Threading.Channels provide various kinds of buffering data structures. The language need not be involved at this stage.
C# feature specifications