异步流

注意

本文是特性规范。 此规范是功能的设计文档。 它包括建议的规范变更,以及功能设计和开发过程中所需的信息。 这些文章将持续发布,直至建议的规范变更最终确定并纳入当前的 ECMA 规范。

功能规范与已完成的实现之间可能存在一些差异。 这些差异已记录在相关的 语言设计会议(LDM)说明中。

可以在有关规范的文章中了解更多有关将功能规范子块纳入 C# 语言标准的过程。

支持者问题:https://github.com/dotnet/csharplang/issues/43

总结

C# 支持迭代器方法和异步方法,但不支持既是迭代器又是异步方法的方法。 我们应该通过允许 await 用于新形式的 async 迭代器(此迭代器返回 IAsyncEnumerable<T>IAsyncEnumerator<T>,而不是 IEnumerable<T>IEnumerator<T>)来纠正这一点,并使 IAsyncEnumerable<T> 能在新的 await foreach中使用。 IAsyncDisposable 接口也用于启用异步清理。

详细设计

接口

IAsyncDisposable

关于IAsyncDisposable的问题(例如https://github.com/dotnet/roslyn/issues/114)以及它是否是个好主意,人们进行了大量讨论。 但是,这是支持异步迭代器时必须添加的概念。 由于 finally 块可能包含 await,而 finally 块需要作为处置迭代器的一部分运行,因此我们需要异步处置。 一般来说,在清理资源可能需要一段时间的情况下,例如关闭文件(需要刷新)、取消注册回调并提供一种了解取消注册完成时间的方法等,它也非常有用。

以下接口被添加到核心 .NET 库(如 System.Private.CoreLib / System.Runtime)中:

namespace System
{
    public interface IAsyncDisposable
    {
        ValueTask DisposeAsync();
    }
}

Dispose 一样,多次调用 DisposeAsync 也是可以接受的,第一次调用后的后续调用应视为无操作,返回同步完成的成功任务(但是,DisposeAsync 不必是线程安全的,也不必支持并发调用)。 此外,类型可以同时实现 IDisposableIAsyncDisposable,如果实现了,同样可以调用 Dispose 然后再调用 DisposeAsync,反之亦然,但只有第一次调用才有意义,其后的调用均无效。 因此,如果某类型实现了这两者,建议使用者根据上下文选择调用更适合的一个方法,且仅调用一次。在同步上下文中使用 Dispose,在异步上下文中使用 DisposeAsync

IAsyncDisposable 如何与 using 交互将另作讨论。至于如何与 foreach 交互,将在本提案的后面部分中讨论。)

考虑的替代方法:

  • DisposeAsync 接受 CancellationToken:虽然从理论上讲,任何异步工作都可以取消,但处置是关于清理、关闭工作、释放资源等,而这些工作通常不应该取消;对于取消的工作,清理仍然很重要。 导致实际工作被取消的 CancellationToken 通常与传递给 DisposeAsync 的令牌相同,这使得 DisposeAsync 毫无价值,因为取消工作会导致 DisposeAsync 无效。 如果有人想避免在等待处置时被阻塞,则可以避免等待结果 ValueTask,或者只等待一段时间。
  • DisposeAsync 返回一个 Task:既然非泛型ValueTask 已经存在,并且可以从 IValueTaskSource 构造,那么从 ValueTask 返回 DisposeAsync 就可以重用现有对象作为表示 DisposeAsync 最终异步完成的承诺,从而在 Task 异步完成的情况下节省 DisposeAsync 分配。
  • ConfigureAwait 配置 :虽然可能存在如何将这样一个概念公开给 usingforeach 和其他使用这个概念的语言构造的相关问题,但从接口的角度来看,它实际上并没有执行任何 await,也没有什么需要配置的...ValueTask 的使用者可以随心所欲地使用它。
  • IAsyncDisposable 继承 IDisposable:由于只能使用其中一个,所以强制类型同时实现两者并无意义。
  • IDisposableAsync 而不是 IAsyncDisposable:我们一直遵循的命名方式是,事物/类型是“异步的事物”,而操作是“异步完成”,因此类型的前缀是“Async”,方法的后缀是“Async”。

IAsyncEnumerable / IAsyncEnumerator

核心 .NET 库中添加了两个接口:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        ValueTask<bool> MoveNextAsync();
        T Current { get; }
    }
}

典型使用情况(不包括其他语言功能)如下所示:

IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        Use(enumerator.Current);
    }
}
finally { await enumerator.DisposeAsync(); }

已考虑放弃的选项:

  • Task<bool> MoveNextAsync(); T current { get; }:使用 Task<bool> 将支持使用缓存任务对象来表示同步、成功的 MoveNextAsync 调用,但异步完成仍需要进行分配。 通过返回 ValueTask<bool>,我们使枚举对象可以自行实现 IValueTaskSource<bool>,并用作从 ValueTask<bool>返回的 MoveNextAsync 的支撑,这样可以显著降低开销。
  • ValueTask<(bool, T)> MoveNextAsync();:不仅更难使用,而且也意味着 T 不再具备协变性。
  • ValueTask<T?> TryMoveNextAsync();:非协变。
  • Task<T?> TryMoveNextAsync();:非协变,每次调用都进行分配等。
  • ITask<T?> TryMoveNextAsync();:非协变,每次调用都进行分配等。
  • ITask<(bool,T)> TryMoveNextAsync();:非协变,每次调用都进行分配等。
  • Task<bool> TryMoveNextAsync(out T result);out 结果需要在操作同步返回时设置,而不是在异步完成任务时设置,因为异步完成任务的时间可能在未来很长时间内,届时将无法传递结果。
  • IAsyncEnumerator<T> 不实现 IAsyncDisposable:我们可以选择将它们分开。 然而,这样做会使提议中的某些其他方面变得复杂,因为代码必须能够处理枚举器不提供处置的可能性,这使得编写基于模式的帮助程序变得困难。 此外,枚举器通常需要进行处置(例如,任何具有最终代码块的 C# 异步迭代器、从网络连接中枚举数据的大多数程序等),如果不需要,则可以简单地以 public ValueTask DisposeAsync() => default(ValueTask); 的形式实现该方法,将额外开销降至最低。
  • _ IAsyncEnumerator<T> GetAsyncEnumerator():没有取消令牌参数。

以下小节将讨论未被选中的替代方案。

可行的替代方法:

namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
        IAsyncEnumerator<T> GetAsyncEnumerator();
    }

    public interface IAsyncEnumerator<out T> : IAsyncDisposable
    {
        ValueTask<bool> WaitForNextAsync();
        T TryGetNext(out bool success);
    }
}

TryGetNext 用于内循环,只需调用一次接口,就能同步使用项目。 当无法同步检索到下一项时,它将返回 false,并且每当返回 false 时,调用方必须随后调用 WaitForNextAsync,以等待下一项可用或确认不会再有其他项。 典型使用情况(不包括其他语言功能)如下所示:

IAsyncEnumerator<T> enumerator = enumerable.GetAsyncEnumerator();
try
{
    while (await enumerator.WaitForNextAsync())
    {
        while (true)
        {
            int item = enumerator.TryGetNext(out bool success);
            if (!success) break;
            Use(item);
        }
    }
}
finally { await enumerator.DisposeAsync(); }

这项优势有两个方面,一个是次要的,另一个是主要的:

  • 次要:允许一个枚举器支持多个使用者。 在某些情况下,枚举器支持多个并发使用者可能会非常有用。 如果 MoveNextAsyncCurrent 是分开的,那么实现时就无法让它们的使用具有原子性,这一点就无法实现。 相比之下,这种方法提供了一个单一的 TryGetNext 方法,支持向前推动枚举器并获取下一个项目,因此枚举器可以根据需要启用原子性。 但是,如果从共享的枚举器中给每个使用者提供自己的枚举器,则也有可能实现这种方案。 此外,我们并不想强制要求每个枚举器都支持并发使用,因为这会让大多数不需要并发使用的情况显著增加开销,这意味着接口的使用者一般无法依赖并发使用。
  • 主要:性能MoveNextAsync/Current 方法每次操作需要调用两个接口,而 WaitForNextAsync/TryGetNext 的最佳情况是大多数迭代同步完成,从而可以使用 TryGetNext 实现紧密的内循环,这样我们每次操作只需调用一个接口。 在接口调用主导计算的情况下,这会产生明显的影响。

但是,存在一些不容忽视的缺点,包括手动处理时复杂性显著增加,以及使用这些功能时引入 bug 的可能性增加。 虽然性能优势反映在微基准测试中,但我们认为它们不会对绝大多数实际使用场景产生显著影响。 如果事实证明是这样,我们就可以采用亮点的方式来引入第二组接口。

已考虑放弃的选项:

  • ValueTask<bool> WaitForNextAsync(); bool TryGetNext(out T result);out 参数不能协变。 此外,还有一个小小的影响(这也是尝试模式的一个普遍问题),即这很可能会对引用类型的结果造成运行时写入障碍。

取消

支持取消有几种可能的方法:

  1. IAsyncEnumerable<T> / IAsyncEnumerator<T> 与取消无关:CancellationToken 在任何地方都不会出现。 通过将 CancellationToken 以适当方式逻辑上整合到可枚举和/或枚举器中来实现取消,例如,当调用迭代器时,将 CancellationToken 作为参数传递给迭代器方法,并像使用其他参数一样在迭代器主体中使用它。
  2. IAsyncEnumerator<T>.GetAsyncEnumerator(CancellationToken):将一个 CancellationToken 传递给 GetAsyncEnumerator,随后的 MoveNextAsync 操作会尽可能地遵循它。
  3. IAsyncEnumerator<T>.MoveNextAsync(CancellationToken):为每个单独的 MoveNextAsync 调用传递一个 CancellationToken
  4. 1 和 2:将两个 CancellationToken 都嵌入到可枚举/枚举器中,并将 CancellationToken 传递到 GetAsyncEnumerator 中。
  5. 1 和 3:将两个 CancellationToken 都嵌入到可枚举/枚举器中,并将 CancellationToken 传递到 MoveNextAsync 中。

从纯理论的角度来看,(5) 是最稳健的,因为 (a) MoveNextAsync 接受 CancellationToken 可以对取消的内容进行最精细的控制,(b) CancellationToken 只是可以作为参数传递给迭代器、嵌入任意类型等的任何其他类型。

然而,这种方法存在诸多问题:

  • 传递给 CancellationTokenGetAsyncEnumerator 如何进入迭代器主体? 我们可以公开一个新的 iterator 关键字,你可以通过它来访问传递给 GetEnumeratorCancellationToken,但是:a) 这需要很多额外的机器;b) 我们让它成为了优先事项;c) 99% 的情况似乎是相同的代码既调用了迭代器,又调用了 GetAsyncEnumerator,在这种情况下,它可以直接将 CancellationToken 作为参数传递给方法。
  • 传递给 CancellationTokenMoveNextAsync 如何进入方法的主体? 这就更糟糕了,因为如果它是从一个 iterator 本地对象中公开出来的,那么它的值可能会在等待过程中发生变化,这意味着任何注册了令牌的代码都需要在等待之前取消注册,然后在等待之后重新注册;在每次 MoveNextAsync 调用中都需要进行这样的注册和取消注册,不管是由编译器在迭代器中实现,还是由开发人员手动实现,代价都可能相当高昂。
  • 开发人员如何取消 foreach 循环? 如果是通过给枚举/枚举器提供 CancellationToken 来实现,则 a) 我们需要支持对枚举器进行 foreach 操作,从而将它们提升为优先事项,现在你需要开始考虑围绕枚举器建立一个生态系统(例如 LINQ 方法);或者 b) 我们需要将 CancellationToken 嵌入到枚举器中,方法是在 WithCancellationIAsyncEnumerable<T> 扩展方法中存储所提供的令牌,然后在调用返回结构上的 GetAsyncEnumerator 时将其传递到封装的枚举器的 GetAsyncEnumerator 中(忽略该令牌)。 或者,可以直接使用 foreach 主体中的 CancellationToken
  • 如果/当支持查询推导式时,如何将提供给 CancellationTokenGetEnumeratorMoveNextAsync 传递到每个子句中? 最简单的方法是让子句去捕获它,此时传递给 GetAsyncEnumerator/MoveNextAsync 的任何令牌都会被忽略。

本文件的早期版本建议采用 (1),但后来我们改用了 (4)。

(1) 的两个主要问题:

  • 可取消枚举的生成者必须实现一些样本,而且只能利用编译器对异步迭代器的支持来实现 IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken) 方法。
  • 很多生成者可能会倾向于在其异步可数签名中添加一个 CancellationToken 参数,这将导致使用者在获得 IAsyncEnumerable 类型时无法传递他们想要的取消令牌。

主要有两种消耗情况:

  1. await foreach (var i in GetData(token)) ...,使用者在其中调用异步迭代器方法,
  2. await foreach (var i in givenIAsyncEnumerable.WithCancellation(token)) ... 使用者在其中处理给定的 IAsyncEnumerable 实例。

我们发现,为了在方便异步流生成者和使用者的情况下合理支持这两种方案,最佳的折衷方案是在异步迭代器方法中使用特别标注的参数。 [EnumeratorCancellation] 属性用于此目的。 将此属性置于参数上会告诉编译器,如果向 GetAsyncEnumerator 方法传递了令牌,则应使用该令牌,而不是参数最初传递的值。

IAsyncEnumerable<int> GetData([EnumeratorCancellation] CancellationToken token = default) 为例。 该方法的实现者只需在方法主体中使用该参数即可。 使用者可以使用上述任何一种消耗模式:

  1. 如果使用 GetData(token),则令牌将保存到异步枚举中,并在迭代中使用,
  2. 如果使用 givenIAsyncEnumerable.WithCancellation(token),那么传递给 GetAsyncEnumerator 的令牌将取代保存在异步枚举中的任何令牌。

foreach

除了现有的对 IEnumerable<T> 的支持外,foreach 还将增加对 IAsyncEnumerable<T> 的支持。 如果相关成员被公开,它将支持 IAsyncEnumerable<T> 的等效模式;如果未公开,它将退回到直接使用接口的模式,以便实现基于结构的扩展,从而避免分配以及使用替代的 Awaitable 作为 MoveNextAsyncDisposeAsync 的返回类型。

语法

使用以下语法:

foreach (var i in enumerable)

C# 会继续将 enumerable 视为同步可枚举,这样,即使它公开了异步可枚举的相关 API(公开模式或实现接口),也只会考虑同步 API。

为了强制 foreach 只考虑异步 API,await 将按如下所示插入:

await foreach (var i in enumerable)

不提供支持使用异步或同步 API 的语法;开发人员必须根据所使用的语法进行选择。

语义

await foreach 语句的编译时处理首先会确定表达式的集合类型枚举器类型迭代类型(与 https://github.com/dotnet/csharpstandard/blob/draft-v8/standard/statements.md#1395-the-foreach-statement 非常相似)。 该确定过程如下:

  • 如果 expression 的类型 Xdynamic 或数组类型,则会产生错误,不会执行进一步的步骤。
  • 否则,请确定类型 X 是否具有适当的 GetAsyncEnumerator 方法:
    • 对标识符为 GetAsyncEnumerator 且没有类型参数的类型 X 执行成员查找。 如果成员查找未找到匹配项,或找到不明确的匹配项,或找到的匹配项不是方法组,请按如下所述检查可枚举接口。
    • 使用生成的方法组和空参数列表执行重载解析。 如果重载解析导致没有适用的方法、结果不明确或仅有单一最佳方法,但该方法为静态或非公开方法,请检查是否具有可枚举接口,如下所示。
    • 如果 GetAsyncEnumerator 方法的返回类型 E 不是类、结构或接口类型,则会生成错误,并且不会采取进一步的步骤。
    • 对标识符为 Current 且没有类型参数的 E 执行成员查找。 如果成员查找未找到匹配项,则结果为错误;或者结果是除允许读取的公共实例属性之外的任何内容,则会生成错误,并且不执行进一步的步骤。
    • 对标识符为 MoveNextAsync 且没有类型参数的 E 执行成员查找。 如果成员查找不生成匹配项,则结果为错误;或者结果为除方法组之外的任何内容,则会生成错误,并且不执行进一步的步骤。
    • 重载解析过程在具有空参数列表的方法组上执行。 如果重载解析的结果是没有适用的方法、产生歧义或产生了一个最佳方法,但该方法是静态的或不是公开的,或其返回类型不可等待到 bool 中,则会产生一个错误,且不会执行进一步的步骤。
    • 集合类型是 X,枚举器类型是 E,迭代类型是 Current 属性的类型。
  • 否则,检查可枚举的接口:
    • 如果在所有类型 Tᵢ 中有从 XIAsyncEnumerable<ᵢ> 的隐式转换,则存在唯一类型 T,使得 T 不是动态的,而对于所有其他 Tᵢ,有从 IAsyncEnumerable<T>IAsyncEnumerable<Tᵢ> 的隐式转换,那么集合类型是接口 IAsyncEnumerable<T>,枚举器类型是接口 IAsyncEnumerator<T>,迭代类型是 T
    • 否则,如果有多个此类类型 T,则会生成错误,并且不采取进一步的步骤。
  • 否则,将会产生错误,并不会采取进一步的步骤。

上述步骤如果成功,将明确产生集合类型 C、枚举器类型 E 和迭代类型 T

await foreach (V v in x) «embedded_statement»

然后扩展为:

{
    E e = ((C)(x)).GetAsyncEnumerator();
    try {
        while (await e.MoveNextAsync()) {
            V v = (V)(T)e.Current;
            «embedded_statement»
        }
    }
    finally {
        ... // Dispose e
    }
}

finally 块的主体是按照以下步骤构建的:

  • 如果 E 类型有一个适当的 DisposeAsync 方法:
    • 对标识符为 DisposeAsync 且没有类型参数的类型 E 执行成员查找。 如果成员查找未找到匹配项,或找到不明确的匹配项,或找到的匹配项不是方法组,请按如下所述检查处置接口。
    • 使用生成的方法组和空参数列表执行重载解析。 如果重载解析没有找到任何适用的方法、导致不明确的情况,或找到了一个单一的最佳方法,但该方法是静态的或非公开的,请检查下面所述的处置接口。
    • 如果 DisposeAsync 方法的返回类型不可等待,则会生成错误,并且不会执行进一步的步骤。
    • finally 子句扩展为语义等同的内容:
      finally {
          await e.DisposeAsync();
      }
    
  • 否则,如果存在从 ESystem.IAsyncDisposable 接口的隐式转换,则
    • 如果 E 是不可为 null 的值类型,那么 finally 子句将扩展为语义等同的内容:
      finally {
          await ((System.IAsyncDisposable)e).DisposeAsync();
      }
    
    • 否则,finally 子句扩展为语义等同的内容:
      finally {
          System.IAsyncDisposable d = e as System.IAsyncDisposable;
          if (d != null) await d.DisposeAsync();
      }
      
      但如果 E 是一个值类型,或者是一个实例化为值类型的类型参数,则 eSystem.IAsyncDisposable 的转换不应导致装箱。
  • 否则,finally 子句将被扩展为一个空块:
    finally {
    }
    

ConfigureAwait

这种基于模式的编译将允许通过 ConfigureAwait 扩展方法在所有等待中使用 ConfigureAwait

await foreach (T item in enumerable.ConfigureAwait(false))
{
   ...
}

这将基于我们将添加到 .NET 中的类型,可能是 System.Threading.Tasks.Extensions.dll 中的类型:

// Approximate implementation, omitting arg validation and the like
namespace System.Threading.Tasks
{
    public static class AsyncEnumerableExtensions
    {
        public static ConfiguredAsyncEnumerable<T> ConfigureAwait<T>(this IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext) =>
            new ConfiguredAsyncEnumerable<T>(enumerable, continueOnCapturedContext);

        public struct ConfiguredAsyncEnumerable<T>
        {
            private readonly IAsyncEnumerable<T> _enumerable;
            private readonly bool _continueOnCapturedContext;

            internal ConfiguredAsyncEnumerable(IAsyncEnumerable<T> enumerable, bool continueOnCapturedContext)
            {
                _enumerable = enumerable;
                _continueOnCapturedContext = continueOnCapturedContext;
            }

            public ConfiguredAsyncEnumerator<T> GetAsyncEnumerator() =>
                new ConfiguredAsyncEnumerator<T>(_enumerable.GetAsyncEnumerator(), _continueOnCapturedContext);

            public struct ConfiguredAsyncEnumerator<T>
            {
                private readonly IAsyncEnumerator<T> _enumerator;
                private readonly bool _continueOnCapturedContext;

                internal ConfiguredAsyncEnumerator(IAsyncEnumerator<T> enumerator, bool continueOnCapturedContext)
                {
                    _enumerator = enumerator;
                    _continueOnCapturedContext = continueOnCapturedContext;
                }

                public ConfiguredValueTaskAwaitable<bool> MoveNextAsync() =>
                    _enumerator.MoveNextAsync().ConfigureAwait(_continueOnCapturedContext);

                public T Current => _enumerator.Current;

                public ConfiguredValueTaskAwaitable DisposeAsync() =>
                    _enumerator.DisposeAsync().ConfigureAwait(_continueOnCapturedContext);
            }
        }
    }
}

请注意,这种方法将无法使 ConfigureAwait 与基于模式的枚举器一起使用,但同样,ConfigureAwait 仅作为 Task/Task<T>/ValueTask/ValueTask<T> 的扩展而公开,无法应用于任意可等待的事物,因为它只有在应用于任务时才有意义(它控制在任务的延续支持中实现的行为),因此在使用可等待的事物可能不是任务的模式时没有意义。 在这种高级方案中,任何返回可等待对象的人都可以提供自己的自定义行为。

(如果我们能想出某种方法来支持范围或程序集级 ConfigureAwait 解决方案,则没有必要这样做。)

异步迭代器

语言/编译器除了使用 IAsyncEnumerable<T>IAsyncEnumerator<T> 之外,还将支持生成它们。 现在,该语言支持编写迭代器,如:

static IEnumerable<int> MyIterator()
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            Thread.Sleep(1000);
            yield return i;
        }
    }
    finally
    {
        Thread.Sleep(200);
        Console.WriteLine("finally");
    }
}

await 不能在这些迭代器的主体中使用。 我们将增加这种支持。

语法

现有语言对迭代器的支持是根据方法中是否包含 yield 来推断方法的迭代器性质。 异步迭代器也是如此。 此类异步迭代器将通过在签名中添加 async 来与同步迭代器进行划分和区分,并且还必须使用 IAsyncEnumerable<T>IAsyncEnumerator<T> 作为其返回类型。 例如,上述示例可以写成一个异步迭代器,如下所示:

static async IAsyncEnumerable<int> MyIterator()
{
    try
    {
        for (int i = 0; i < 100; i++)
        {
            await Task.Delay(1000);
            yield return i;
        }
    }
    finally
    {
        await Task.Delay(200);
        Console.WriteLine("finally");
    }
}

考虑的替代方法:

  • 在签名中不使用 async:使用 async 很可能是编译器的技术要求,因为编译器会使用它来确定 await 在该上下文中是否有效。 但是,即使不是必需的,我们也已经确定 await 只能在标记为 async 的方法中使用,因此保持一致性似乎很重要。
  • IAsyncEnumerable<T> 启用自定义生成器:这是我们将来可以考虑的,但这一机制很复杂,我们不支持同步对应程序。
  • iterator中包含 关键字:在签名中,异步迭代器将使用 async iteratoryield 只能用于包含 asynciterator 方法中,然后 iterator 在同步迭代器上将变为可选。 根据你的观点,这有一个好处,即通过方法的签名可以非常清楚地看出是否允许 yield,以及该方法是否实际上旨在返回 IAsyncEnumerable<T> 类型的实例,而不是由编译器根据代码是否使用 yield 来制造实例。 但它与同步迭代器不同,同步迭代器不需要也不可能需要这样的机制。 另外,一些开发人员不喜欢额外的语法。 如果我们从头开始设计,我们可能会将其设为必需,但在此时,更有价值的是使异步迭代器与同步迭代器保持相似。

LINQ

System.Linq.Enumerable 类上有超过 200 个方法重载,所有这些重载都基于 IEnumerable<T>运行;其中一些接受 IEnumerable<T>,一些产生 IEnumerable<T>,很多则同时具备这两种功能。 如果为 IAsyncEnumerable<T> 添加 LINQ 支持,很可能需要为其重复所有这些重载,又要增加约 200 个重载。 由于在异步世界中,IAsyncEnumerator<T> 作为独立实体可能比在同步世界中的 IEnumerator<T> 更常见,因此我们可能需要另外约 200 个重载来处理 IAsyncEnumerator<T>。 此外,大量的重载都是处理谓词的(例如,使用 WhereFunc<T, bool>),而基于 IAsyncEnumerable<T> 的重载既可以处理同步谓词,也可以处理异步谓词(例如,除了 Func<T, ValueTask<bool>> 之外,还有 Func<T, bool>),这可能是比较理想的。 虽然这并不适用于现在约 400 种新的重载方法,但粗略计算,它将适用于一半的重载方法,即另外约 200 种重载方法,总共约 600 种新方法。

这是一个数量惊人的 API,如果考虑到交互式扩展 (Ix) 等扩展库,可能会有更多的 API。 但 Ix 已经实现了其中的许多功能,似乎没有必要再重复这项工作;我们应该帮助社区改进 Ix,并在开发人员希望将 LINQ 与 IAsyncEnumerable<T> 结合使用时推荐 Ix。

此外,还有查询理解语法的问题。 例如,如果 Ix 提供了以下方法,那么查询理解的基于模式的特性就能让它们与某些操作符一起“正常工作”:

public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, TResult> func);
public static IAsyncEnumerable<T> Where(this IAsyncEnumerable<T> source, Func<T, bool> func);

然后,这段 C# 代码将会“直接运行”:

IAsyncEnumerable<int> enumerable = ...;
IAsyncEnumerable<int> result = from item in enumerable
                               where item % 2 == 0
                               select item * 2;

但是,目前还没有支持在子句中使用 await 的查询理解语法,因此,例如添加了 Ix:

public static IAsyncEnumerable<TResult> Select<TSource, TResult>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TResult>> func);

那么这就“行得通”了:

IAsyncEnumerable<string> result = from url in urls
                                  where item % 2 == 0
                                  select SomeAsyncMethod(item);

async ValueTask<int> SomeAsyncMethod(int item)
{
    await Task.Yield();
    return item * 2;
}

但如果在 select 子句中使用 await 内联,则无法编写。 作为一项单独的工作,我们可以考虑在语言中添加 async { ... } 表达式,这样我们就可以允许在查询理解中使用这些表达式,而上述表达式也可以写成这样:

IAsyncEnumerable<int> result = from item in enumerable
                               where item % 2 == 0
                               select async
                               {
                                   await Task.Yield();
                                   return item * 2;
                               };

或使 await 可以直接在表达式中使用,例如支持 async from。 不过,这里的设计无论如何都不太可能对功能集的其他部分产生显著影响,并且这也不是一个目前值得特别投入的高价值项目,因此建议现在不进行额外操作。

与其他异步框架集成

IObservable<T> 和其他异步框架(如反应流)的集成将在库级别而非语言级别完成。 例如,只需通过 IAsyncEnumerator<T> 枚举器并 IObserver<T> 数据到观察者,就可以将 await foreach 中的所有数据发布到 OnNext 中,因此 AsObservable<T> 扩展方法是可能的。 在 IObservable<T> 中使用 await foreach 需要缓冲数据(以防在处理前一个项目时推送了另一个项目),但这样的推拉适配器可以很容易地实现,以便通过 IObservable<T>IAsyncEnumerator<T> 中提取数据。 等。Rx/Ix 已经提供了此类实现的原型,https://github.com/dotnet/corefx/tree/master/src/System.Threading.Channels 等库也提供了各种缓冲数据结构。 此阶段不需要涉及语言。