Biblioteca System.Threading.Channels
O namespace System.Threading.Channels fornece um conjunto de estruturas de dados de sincronização para transmitir dados entre produtores e consumidores de forma assíncrona. A biblioteca tem como destino o .NET Standard e funciona em todas as implementações do .NET.
Essa biblioteca está disponível no pacote NuGet System.Threading.Channels. No entanto, se você estiver usando o .NET Core 3.0 ou posterior, o pacote será incluído como parte da estrutura.
Modelo de programação conceitual de produtor/consumidor
Os canais são uma implementação do modelo de programação conceitual de produtor/consumidor. Nesse modelo de programação, os produtores produzem dados de forma assíncrona e os consumidores consomem esses dados de forma assíncrona. Em outras palavras, esse modelo passa dados de uma parte para outra por meio de uma fila FIFO (primeiro a entrar, primeiro a sair). Tente pensar nos canais como faria com qualquer outro tipo de coleção genérica comum, como um List<T>
. A principal diferença é que essa coleção gerencia a sincronização e fornece vários modelos de consumo por meio de opções de criação de fábrica. Essas opções controlam o comportamento dos canais, como quantos elementos eles têm permissão para armazenar e o que acontece se esse limite for atingido ou se o canal é acessado por vários produtores ou vários consumidores simultaneamente.
Estratégias delimitadoras
Dependendo de como um Channel<T>
é criado, seu leitor e gravador se comportam de maneira diferente.
Para criar um canal que especifica uma capacidade máxima, chame Channel.CreateBounded. Para criar um canal usado por qualquer número de leitores e gravadores simultaneamente, chame Channel.CreateUnbounded. Cada estratégia delimitadora expõe várias opções definidas pelo criador, seja BoundedChannelOptions, seja UnboundedChannelOptions, respectivamente.
Observação
Independentemente da estratégia delimitadora, um canal sempre gerará uma ChannelClosedException quando for usado depois de ser fechado.
Canais não associados
Para criar um canal não associado, chame uma das sobrecargas Channel.CreateUnbounded:
var channel = Channel.CreateUnbounded<T>();
Quando você cria um canal não associado, por padrão, o canal pode ser usado por qualquer número de leitores e gravadores simultaneamente. Como alternativa, você pode especificar um comportamento não padrão ao criar um canal não associado. Para isso, forneça uma instância UnboundedChannelOptions
. A capacidade do canal é não associada e todas as gravações são executadas de forma síncrona. Para obter mais exemplos, confira Padrões de criação não associados.
Canais limitados
Para criar um canal limitado, chame uma das sobrecargas Channel.CreateBounded:
var channel = Channel.CreateBounded<T>(7);
O código anterior cria um canal que tem uma capacidade máxima de 7
itens. Quando você cria um canal limitado, o canal é associado a uma capacidade máxima. Quando o limite é atingido, o comportamento padrão é o canal bloquear de maneira assíncrona o produtor até que o espaço fique disponível. Você pode configurar esse comportamento especificando uma opção ao criar o canal. Os canais limitados podem ser criados com qualquer valor de capacidade maior que zero. Para outros exemplos, confira Padrões de criação limitados.
Comportamento do modo completo
Ao usar um canal limitado, você pode especificar o comportamento que o canal segue quando o limite configurado é atingido. A tabela a seguir lista os comportamentos de modo completo para cada valor BoundedChannelFullMode:
Valor | Comportamento |
---|---|
BoundedChannelFullMode.Wait | Esse é o valor padrão. Solicita que WriteAsync aguarde até que espaço esteja disponível para concluir a operação de gravação. Chamadas para TryWrite retornam false imediatamente. |
BoundedChannelFullMode.DropNewest | Remove e ignora o item mais recente no canal para liberar espaço para o item que está sendo gravado. |
BoundedChannelFullMode.DropOldest | Remove e ignora o item mais antigo no canal para liberar espaço para o item que está sendo gravado. |
BoundedChannelFullMode.DropWrite | Remove o item que está sendo gravado. |
Importante
Sempre que um Channel<TWrite,TRead>.Writer produz mais rápido do que um Channel<TWrite,TRead>.Reader consegue consumir, o gravador do canal enfrenta pressão de volta.
APIs de produtor
A funcionalidade de produtor é exposta no Channel<TWrite,TRead>.Writer. As APIs de produtor e o comportamento esperado são detalhados na tabela a seguir:
API | Comportamento esperado |
---|---|
ChannelWriter<T>.Complete | Marca o canal como concluído, ou seja, nenhum outro item é gravado nele. |
ChannelWriter<T>.TryComplete | Tenta marcar o canal como concluído, ou seja, mais nenhum outro dado é gravado nele. |
ChannelWriter<T>.TryWrite | Tenta gravar o item especificado no canal. Quando usado com um canal não associado, isso sempre retorna true , a menos que o gravador do canal sinalize a conclusão com ChannelWriter<T>.Complete ou ChannelWriter<T>.TryComplete. |
ChannelWriter<T>.WaitToWriteAsync | Retorna uma ValueTask<TResult> que é concluída quando houver espaço disponível para gravar um item. |
ChannelWriter<T>.WriteAsync | Grava assincronamente um item para um canal. |
APIs de consumidor
A funcionalidade de consumidor é exposta no Channel<TWrite,TRead>.Reader. As APIs de consumidor e o comportamento esperado são detalhados na tabela a seguir:
API | Comportamento esperado |
---|---|
ChannelReader<T>.ReadAllAsync | Cria um IAsyncEnumerable<T> que habilita a leitura de todos os dados do canal. |
ChannelReader<T>.ReadAsync | Lê assincronamente um item de um canal. |
ChannelReader<T>.TryPeek | Tenta espiar um item do canal. |
ChannelReader<T>.TryRead | Tenta ler um item do canal. |
ChannelReader<T>.WaitToReadAsync | Retorna uma ValueTask<TResult> que é concluída quando os dados estiverem disponíveis para leitura. |
Padrões comuns de uso
Há vários padrões de uso para os canais. A API foi projetada para ser simples, consistente e o mais flexível possível. Todos os métodos assíncronos retornam um ValueTask
(ou ValueTask<bool>
) que representa uma operação assíncrona leve que pode evitar alocação se a operação for concluída de forma síncrona e potencialmente até assíncrona. Além disso, a API foi projetada para ser combinável, na qual o criador de um canal faz promessas sobre seu uso pretendido. Quando um canal é criado com determinados parâmetros, a implementação interna pode operar com mais eficiência sabendo dessas promessas.
Padrões de criação
Imagine que você está criando uma solução de produtor/consumidor para um GPS (sistema de posição global). Você deseja acompanhar as coordenadas de um dispositivo ao longo do tempo. Um exemplo de objeto de coordenadas pode ser assim:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Padrões de criação não associados
Um padrão de uso comum é criar um canal padrão não associado:
var channel = Channel.CreateUnbounded<Coordinates>();
Mas, em vez disso, vamos imaginar que você deseja criar um canal não associado com vários produtores e consumidores:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
Nesse caso, todas as gravações são síncronas, até mesmo o WriteAsync
. Isso ocorre porque um canal não associado sempre tem espaço disponível para uma gravação com eficiência imediata. No entanto, com AllowSynchronousContinuations
definido como true
, as gravações podem acabar fazendo um trabalho associado a um leitor ao executar suas continuações. Isso não afeta a sincronização da operação.
Padrões de criação limitada
Com canais limitados, a configurabilidade do canal deve ser conhecida pelo consumidor para ajudar a garantir o consumo adequado. Ou seja, o consumidor deve saber qual comportamento o canal exibe quando o limite configurado é atingido. Vamos explorar alguns dos padrões comuns de criação limitada.
A maneira mais simples de criar um canal limitado é especificar uma capacidade:
var channel = Channel.CreateBounded<Coordinates>(1);
O código anterior cria um canal limitado com uma capacidade máxima de 1
. Outras opções estão disponíveis, e algumas opções são as mesmas que as de um canal não associado, enquanto outras são específicas a canais não associados:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
No código anterior, o canal é criado como um canal limitado a 1.000 itens, com um único gravador, mas muitos leitores. Seu comportamento de modo completo é definido como DropWrite
, o que significa que ele remove o item que está sendo gravado se o canal está cheio.
Para observar os itens removidos ao usar canais limitados, registre um retorno de chamada itemDropped
:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Sempre que o canal está cheio e um novo item é adicionado, o retorno de chamada itemDropped
é invocado. Neste exemplo, o retorno de chamada fornecido grava o item no console, mas você está livre para executar qualquer outra ação desejada.
Padrões de produtor
Imagine que o produtor nesse cenário está escrevendo novas coordenadas para o canal. O produtor pode fazer isso chamando TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
O código de produtor anterior:
- Aceita o
Channel<Coordinates>.Writer
(ChannelWriter<Coordinates>
) como um argumento, juntamente com oCoordinates
inicial. - Define um loop
while
condicional que tenta mover as coordenadas usandoTryWrite
.
Um produtor alternativo pode usar o método WriteAsync
:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Novamente, o Channel<Coordinates>.Writer
é usado dentro de um loop while
. Mas, desta vez, o método WriteAsync é chamado. O método continuará somente depois que as coordenadas tiverem sido gravadas. Quando o loop while
é encerrado, uma chamada para Complete é feita, o que sinaliza que não são gravados mais dados no canal.
Outro padrão de produtor é usar o método WaitToWriteAsync, considere o seguinte código:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Como parte do while
condicional, o resultado da chamada WaitToWriteAsync
é usado para determinar se o loop deve continuar.
Padrões de consumidor
Há vários padrões comuns de consumidor de canal. Quando um canal nunca termina, ou seja, produz dados infinitamente, o consumidor poderá usar um loop while (true)
e ler dados à medida que estiverem disponíveis:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Observação
Esse código gerará uma exceção se o canal for fechado.
Um consumidor alternativo pode evitar essa preocupação usando um loop aninhado, conforme mostrado no código a seguir:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
No código anterior, o consumidor aguarda para ler dados. Quando os dados estiverem disponíveis, o consumidor tentará lê-los. Esses loops continuam a ser avaliados até que o produtor do canal sinalize que não há mais dados para ler. Dito isso, quando um produtor é conhecido por ter um número finito de itens que produz e sinaliza a conclusão, o consumidor pode usar a semântica await foreach
para iterar sobre os itens:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
O código anterior usa o método ReadAllAsync para ler todas as coordenadas do canal.