Partilhar via


Tutorial: Gerar e consumir fluxos assíncronos usando C# e .NET

Os fluxos assíncronos modelam uma fonte de transmissão de dados. Os fluxos de dados geralmente recuperam ou geram elementos de forma assíncrona. Eles fornecem um modelo de programação natural para fontes de dados de streaming assíncronas.

Neste tutorial, irá aprender a:

  • Crie uma fonte de dados que gere uma sequência de elementos de dados de forma assíncrona.
  • Consuma essa fonte de dados de forma assíncrona.
  • Suporte a cancelamento e contextos capturados para fluxos assíncronos.
  • Reconhecer quando a nova interface e a fonte de dados são preferidas às sequências de dados síncronas anteriores.

Pré-requisitos

Você precisará configurar sua máquina para executar o .NET, incluindo o compilador C#. O compilador C# está disponível com o Visual Studio 2022 ou o SDK do .NET.

Você precisará criar um token de acesso do GitHub para poder acessar o ponto de extremidade do GitHub GraphQL. Selecione as seguintes permissões para seu token de acesso do GitHub:

  • repo:status
  • public_repo

Salve o token de acesso em um local seguro para que você possa usá-lo para obter acesso ao ponto de extremidade da API do GitHub.

Aviso

Mantenha o seu token de acesso pessoal seguro. Qualquer software com seu token de acesso pessoal pode fazer chamadas de API do GitHub usando seus direitos de acesso.

Este tutorial pressupõe que você esteja familiarizado com C# e .NET, incluindo o Visual Studio ou a CLI do .NET.

Execute o aplicativo inicial

Você pode obter o código para o aplicativo inicial usado neste tutorial no repositório dotnet/docs na pasta programação assíncrona /trechos .

O aplicativo inicial é um aplicativo de console que usa a interface GitHub GraphQL para recuperar problemas recentes escritos no repositório dotnet/docs . Comece examinando o seguinte código para o método de aplicativo Main inicial:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Você pode definir uma variável de GitHubKey ambiente para seu token de acesso pessoal ou pode substituir o último argumento na chamada para GetEnvVariable pelo seu token de acesso pessoal. Não coloque seu código de acesso no código-fonte se você estiver compartilhando o código-fonte com outras pessoas. Nunca carregue códigos de acesso para um repositório de código-fonte compartilhado.

Depois de criar o cliente GitHub, o código em Main cria um objeto de relatório de progresso e um token de cancelamento. Depois que esses objetos são criados, Main chama RunPagedQueryAsync para recuperar os 250 problemas criados mais recentes. Após a conclusão dessa tarefa, os resultados são exibidos.

Quando você executa o aplicativo inicial, você pode fazer algumas observações importantes sobre como esse aplicativo é executado. Você verá o progresso relatado para cada página retornada do GitHub. Você pode observar uma pausa percetível antes que o GitHub retorne cada nova página de problemas. Finalmente, os problemas são exibidos somente depois que todas as 10 páginas foram recuperadas do GitHub.

Examinar a implementação

A implementação revela por que você observou o comportamento discutido na seção anterior. Examine o código para RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A primeira coisa que esse método faz é criar o objeto POST, usando a GraphQLRequest classe:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

que ajuda a formar o corpo do objeto POST e convertê-lo corretamente em JSON apresentado como uma única cadeia de caracteres com o ToJsonText método, que remove todos os caracteres de nova linha do corpo da solicitação, marcando-os com o \ caractere de escape (barra invertida).

Vamos nos concentrar no algoritmo de paginação e na estrutura assíncrona do código anterior. (Pode consultar o Documentação do GitHub GraphQL para obter detalhes sobre a API do GitHub GraphQL.) O RunPagedQueryAsync método enumera os problemas do mais recente para o mais antigo. Solicita 25 edições por página e examina a pageInfo estrutura da resposta para continuar com a página anterior. Isso segue o suporte de paginação padrão do GraphQL para respostas de várias páginas. A resposta inclui um pageInfo objeto que inclui um hasPreviousPages valor e um startCursor valor usado para solicitar a página anterior. Os problemas estão na nodes matriz. O RunPagedQueryAsync método acrescenta esses nós a uma matriz que contém todos os resultados de todas as páginas.

Depois de recuperar e restaurar uma página de resultados, RunPagedQueryAsync relata o progresso e verifica o cancelamento. Se o cancelamento tiver sido solicitado, RunPagedQueryAsync lança um OperationCanceledExceptionarquivo .

Há vários elementos neste código que podem ser melhorados. Mais importante, RunPagedQueryAsync deve alocar armazenamento para todos os problemas retornados. Este exemplo para em 250 problemas porque recuperar todos os problemas abertos exigiria muito mais memória para armazenar todos os problemas recuperados. Os protocolos de apoio aos relatórios de progresso e cancelamento tornam o algoritmo mais difícil de entender em sua primeira leitura. Mais tipos e APIs estão envolvidos. Você deve rastrear as comunicações através do e seus associados CancellationToken para entender onde o CancellationTokenSource cancelamento é solicitado e onde é concedido.

Fluxos assíncronos fornecem uma maneira melhor

Os fluxos assíncronos e o suporte a idiomas associados resolvem todas essas preocupações. O código que gera a sequência agora pode ser usado yield return para retornar elementos em um método que foi declarado com o async modificador. Você pode consumir um fluxo assíncrono usando um await foreach loop da mesma forma que consome qualquer sequência usando um foreach loop.

Esses novos recursos de linguagem dependem de três novas interfaces adicionadas ao .NET Standard 2.1 e implementadas no .NET Core 3.0:

Essas três interfaces devem ser familiares para a maioria dos desenvolvedores de C#. Comportam-se de forma semelhante aos seus homólogos síncronos:

Um tipo que pode ser desconhecido é System.Threading.Tasks.ValueTask. O ValueTask struct fornece uma API semelhante à System.Threading.Tasks.Task classe. ValueTask é usado nessas interfaces por motivos de desempenho.

Converter em fluxos assíncronos

Em seguida, converta o método para gerar um fluxo assíncrono RunPagedQueryAsync . Primeiro, altere a assinatura de para retornar um IAsyncEnumerable<JToken>, e remova o token de cancelamento e os objetos progress RunPagedQueryAsync da lista de parâmetros, conforme mostrado no código a seguir:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

O código inicial processa cada página à medida que a página é recuperada, conforme mostrado no código a seguir:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Substitua essas três linhas pelo seguinte código:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Você também pode remover a declaração de finalResults anteriormente neste método e a return instrução que segue o loop que você modificou.

Você concluiu as alterações para gerar um fluxo assíncrono. O método finished deve ser semelhante ao seguinte código:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Em seguida, altere o código que consome a coleção para consumir o fluxo assíncrono. Encontre o seguinte código que Main processa a coleção de problemas:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Substitua esse código pelo seguinte await foreach loop:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

A nova interface IAsyncEnumerator<T> deriva de IAsyncDisposable. Isso significa que o loop anterior descartará o fluxo de forma assíncrona quando o loop terminar. Você pode imaginar que o loop se parece com o seguinte código:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Por padrão, os elementos de fluxo são processados no contexto capturado. Se você quiser desativar a captura do contexto, use o TaskAsyncEnumerableExtensions.ConfigureAwait método de extensão. Para obter mais informações sobre contextos de sincronização e captura do contexto atual, consulte o artigo sobre o consumo do padrão assíncrono baseado em tarefas.

Os fluxos assíncronos suportam cancelamento usando o mesmo protocolo de outros async métodos. Você modificaria a assinatura para o método iterador assíncrono da seguinte maneira para oferecer suporte ao cancelamento:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

O System.Runtime.CompilerServices.EnumeratorCancellationAttribute atributo faz com que o compilador gere código para o IAsyncEnumerator<T> que torna o token passado visível GetAsyncEnumerator para o corpo do iterador assíncrono como esse argumento. Dentro runQueryAsyncdo , você pode examinar o estado do token e cancelar o trabalho adicional, se solicitado.

Você usa outro método de extensão, WithCancellation, para passar o token de cancelamento para o fluxo assíncrono. Você modificaria o loop enumerando os problemas da seguinte maneira:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Você pode obter o código para o tutorial concluído no repositório dotnet/docs na pasta programação assíncrona /trechos .

Executar o aplicativo concluído

Execute novamente a aplicação. Compare seu comportamento com o comportamento do aplicativo inicial. A primeira página de resultados é enumerada assim que está disponível. Há uma pausa observável à medida que cada nova página é solicitada e recuperada, em seguida, os resultados da próxima página são rapidamente enumerados. O try / catch bloco não é necessário para lidar com o cancelamento: o chamador pode parar de enumerar a coleção. O progresso é claramente relatado porque o fluxo assíncrono gera resultados à medida que cada página é baixada. O status de cada problema retornado é incluído perfeitamente no await foreach loop. Você não precisa de um objeto de retorno de chamada para acompanhar o progresso.

Você pode ver melhorias no uso da memória examinando o código. Não é mais necessário alocar uma coleção para armazenar todos os resultados antes que eles sejam enumerados. O chamador pode determinar como consumir os resultados e se uma coleção de armazenamento é necessária.

Execute os aplicativos iniciais e concluídos e você pode observar as diferenças entre as implementações por si mesmo. Você pode excluir o token de acesso do GitHub criado quando iniciou este tutorial depois de terminar. Se um invasor obtiver acesso a esse token, ele poderá acessar as APIs do GitHub usando suas credenciais.

Neste tutorial, você usou fluxos assíncronos para ler itens individuais de uma API de rede que retorna páginas de dados. Os fluxos assíncronos também podem ser lidos a partir de "fluxos intermináveis", como um ticker de ações ou um dispositivo sensor. A chamada para MoveNextAsync retornar o próximo item assim que estiver disponível.