System.Threading.Channels 库

System.Threading.Channels 命名空间提供了用于在生成者和使用者之间以异步方式传递数据的一组同步数据结构。 该库面向 .NET Standard,适用于所有 .NET 实现。

可在 System.Threading.Channels NuGet 包中获取此库。 但是,如果使用的是 .NET Core 3.0 或更高版本,此包将作为框架的一部分提供。

生成者/使用者概念编程模型

通道是生成者/使用者概念编程模型的实现。 在此编程模型中,生成者异步生成数据,使用者异步使用该数据。 换句话说,此模型通过先进先出(“FIFO”)队列将数据从一方传递到另一方。 尝试将通道视为任何其他常见的泛型集合类型,例如 List<T>。 主要区别在于,此集合管理同步,并通过工厂创建选项提供各种消耗模型。 这些选项可控制通道的行为,例如允许它们存储多少元素、如果达到该限制会发生什么情况,或者通道是否同时由多个生成者或多个使用者访问。

边界策略

根据 Channel<T> 的创建方式,其读取器和编写器的行为会有所不同。

若要创建指定最大容量的通道,请调用 Channel.CreateBounded。 若要创建由任意数量的读取器和编写器并发使用的通道,请调用 Channel.CreateUnbounded。 每个边界策略都会公开各种创建者定义的选项,分别为 BoundedChannelOptionsUnboundedChannelOptions

注意

无论边界策略如何,通道在关闭后使用时始终会引发 ChannelClosedException

无界通道

若要创建无界通道,请调用 Channel.CreateUnbounded 重载之一:

var channel = Channel.CreateUnbounded<T>();

默认情况下,创建无界通道时,该通道可以同时供任意数量的读取器和编写器使用。 或者,在创建无界通道时,可通过提供 UnboundedChannelOptions 实例来指定非默认行为。 该通道的容量不受限制,并且所有写入均以同步方式执行。 如需更多示例,请参阅无界创建模式

有界通道

若要创建有界通道,请调用 Channel.CreateBounded 重载之一:

var channel = Channel.CreateBounded<T>(7);

前面的代码创建了一个最大容量为 7 个项的通道。 创建有界通道时,该通道将绑定到最大容量。 达到边界时,默认行为是通道异步阻止生成者,直到空间变得可用。 可以通过在创建通道时指定选项来配置此行为。 可以使用任何大于零的容量值创建有界通道。 有关其他示例,请参阅有界创建模式

完整模式行为

使用有界通道时,可指定在达到配置的边界时通道会遵循的行为。 下表列出了每个 BoundedChannelFullMode 值的完整模式行为:

行为
BoundedChannelFullMode.Wait 这是默认值。 对 WriteAsync 的调用要等到空间可用才能完成写入操作。 调用 TryWrite 会立即返回 false
BoundedChannelFullMode.DropNewest 删除并忽略通道中的最新项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropOldest 删除并忽略通道中的最旧项,以便为要写入的项留出空间。
BoundedChannelFullMode.DropWrite 删除要写入的项。

重要

每当 Channel<TWrite,TRead>.Writer 的生成速度快于 Channel<TWrite,TRead>.Reader 的使用速度时,通道的编写器都会遇到反压力。

生成者 API

生成者功能在 Channel<TWrite,TRead>.Writer 上公开。 下表详细介绍了生成者 API 和预期行为:

API 预期行为
ChannelWriter<T>.Complete 将通道标记为“正在完成”,表示不再将项写入到该通道。
ChannelWriter<T>.TryComplete 尝试将通道标记为“正在完成”,表示不再将项写入到该通道。
ChannelWriter<T>.TryWrite 尝试将指定的项写入到通道。 当与无界通道一起使用时,除非通道的编写器通过 ChannelWriter<T>.CompleteChannelWriter<T>.TryComplete 发出完成信号,否则这将始终返回 true
ChannelWriter<T>.WaitToWriteAsync 返回在有可用空间来写入项时完成的 ValueTask<TResult>
ChannelWriter<T>.WriteAsync 以异步方式将项写入到通道。

使用者 API

使用者功能在 Channel<TWrite,TRead>.Reader 上公开。 下表详细介绍了使用者 API 和预期行为:

API 预期行为
ChannelReader<T>.ReadAllAsync 创建允许从通道中读取所有数据的 IAsyncEnumerable<T>
ChannelReader<T>.ReadAsync 以异步方式从通道中读取项。
ChannelReader<T>.TryPeek 尝试从通道中查看项。
ChannelReader<T>.TryRead 尝试从通道中读取项。
ChannelReader<T>.WaitToReadAsync 返回在有数据可读取时完成的 ValueTask<TResult>

常见使用模式

通道有多种使用模式。 API 设计为简单、一致且尽可能灵活。 所有异步方法都返回一个表示轻量级异步操作的 ValueTask(或 ValueTask<bool>),如果操作同步完成,甚至可能异步完成,则可以避免分配。 此外,API 被设计为可组合,因为通道的创建者承诺了其预期用途。 当使用某些参数创建通道时,内部实现可以在知道这些承诺的情况下更高效地运行。

创建模式

假设你正在为全球定位系统 (GPS) 创建生成者/使用者解决方案。 你想要跟踪设备随时间推移的坐标。 示例坐标对象可能如下所示:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

无界创建模式

一种常见的使用模式是创建默认的无界通道:

var channel = Channel.CreateUnbounded<Coordinates>();

但是,假设你想要创建一个包含多个生成者和使用者的无界通道:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

在此情况下,所有写入都是同步的,即使是 WriteAsync 也是如此。 这是因为无界通道始终具有可用空间,可供立即进行有效地写入。 但是,在将 AllowSynchronousContinuations 设置为 true 时,写入操作可能会通过执行其延续来终止执行与读取器相关的工作。 这不会影响操作的同步性。

有界创建模式

对于有界通道,使用者应了解通道的可配置性,以确保正确使用。 也就是说,使用者应了解在达到配置的边界时,通道会表现出什么行为。 让我们来探索一些常见的有界创建模式。

创建有界通道的最简单方法是指定容量:

var channel = Channel.CreateBounded<Coordinates>(1);

前面的代码创建了最大容量为 1 的有界通道。 有其他选项可用,一些选项与无界通道相同,而另一些选项则特定于无界通道:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

在前面的代码中,通道创建为限制为 1,000 个项的有界通道,包含单个编写器和多个读取器。 其完整模式行为被定义为 DropWrite,这意味着如果通道已满,它会删除正在写入的项。

使用有界通道时,若要观察删除的项,请注册 itemDropped 回调:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

每当通道已满且添加了新项时,都会调用 itemDropped 回调。 在此示例中,提供的回调将项写入控制台,但你可以自由执行所需的任何其他操作。

生成者模式

假设此场景中的生成者正在向通道写入新坐标。 生成者可以通过调用 TryWrite 达到此目的:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

前面的生成者代码:

  • 接受 Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) 作为参数,以及初始 Coordinates
  • 定义一个条件 while 循环,该循环尝试使用 TryWrite 移动坐标。

替代生成者可能使用 WriteAsync 方法:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

同样,在 while 循环中使用了 Channel<Coordinates>.Writer。 但这次调用了 WriteAsync 方法。 只有在写入坐标后,该方法才会继续。 当 while 循环退出时,会调用 Complete,这表示不再向通道写入数据。

另一种生成者模式是使用 WaitToWriteAsync 方法,请考虑以下代码:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

作为条件 while 的一部分,WaitToWriteAsync 调用的结果用于确定是否继续循环。

使用者模式

有几种常见的通道使用者模式。 当通道永不结束,意味着它将无限地生成数据时,使用者可使用 while (true) 循环,并在数据可用时读取数据:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

注意

如果通道关闭,此代码将引发异常。

替代使用者可以通过使用嵌套的 while 循环来避免这种问题,如以下代码所示:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

在前面的代码中,使用者等待读取数据。 数据可用后,使用者便会尝试读取数据。 这些循环会继续评估,直到通道的生成者表示它不再具有要读取的数据。 也就是说,当已知生成者具有生成的有限数量的项,并且这表示完成时,使用者可使用 await foreach 语义循环访问这些项:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

前面的代码使用 ReadAllAsync 方法从通道读取所有坐标。

另请参阅