Tutorial: gerar e consumir fluxos assíncronos usando o C# e .NET

Os fluxos assíncronos modelam uma fonte de streaming de dados. Os fluxos de dados geralmente recuperam ou geram elementos de forma assíncrona. Eles fornecem um modelo de programação natural para fontes de dados de streaming assíncronas.

Neste tutorial, você aprenderá como:

  • Criar uma fonte de dados que gera uma sequência de elementos de dados de forma assíncrona.
  • Consumir essa fonte de dados de forma assíncrona.
  • Suporte ao cancelamento e contextos capturados para fluxos assíncronos.
  • Reconhecer quando a nova interface e a fonte de dados forem preferenciais para sequências anteriores de dados síncronos.

Pré-requisitos

Você precisa configurar o computador para executar o .NET, incluindo o compilador C#. O compilador C# está disponível com o Visual Studio 2022 ou o SDK do .NET.

Você precisará criar um token de acesso do GitHub para poder acessar o ponto de extremidade GitHub GraphQL. Selecione as seguintes permissões para o Token de acesso do GitHub:

  • repo:status
  • public_repo

Salve o token de acesso em um local seguro, de modo que possa usá-lo para acessar o ponto de extremidade da API do GitHub.

Aviso

Mantenha seu token de acesso pessoal protegido. Qualquer software com seu token de acesso pessoal pode fazer chamadas da API do GitHub usando seus direitos de acesso.

Este tutorial pressupõe que você esteja familiarizado com o C# e .NET, incluindo o Visual Studio ou a CLI do .NET.

Executar o aplicativo inicial

Você pode obter o código para o aplicativo inicial usado neste tutorial em nosso repositório dotnet/docs na pasta asynchronous-programming/snippets.

O aplicativo inicial é um aplicativo de console que usa a interface GitHub GraphQL para recuperar os problemas recentes gravados no repositório dotnet/docs. Comece observando o código a seguir para o método Main do aplicativo inicial:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Você pode definir uma variável de ambiente GitHubKey para o token de acesso pessoal ou pode substituir o último argumento na chamada para GetEnvVariable com seu token de acesso pessoal. Não coloque seu código de acesso no código-fonte se você estiver compartilhando a origem com outras pessoas. Nunca carregue códigos de acesso em um repositório de origem compartilhado.

Após criar o cliente do GitHub, o código em Main criará um objeto de relatório de andamento e um token de cancelamento. Depois que esses objetos forem criados, Main chamará RunPagedQueryAsync para recuperar os 250 problemas mais recente criados. Depois que a tarefa for concluída, os resultados serão exibidos.

Ao executar o aplicativo inicial, você poderá realizar algumas observações importantes sobre como esse aplicativo é executado. Você verá o progresso informado para cada página retornada do GitHub. É possível observar uma pausa perceptível antes do GitHub retornar cada nova página de problemas. Por fim, os problemas só serão exibidos depois que todas as 10 páginas forem recuperadas do GitHub.

Examinar a implementação

A implementação revela por que você observou o comportamento discutido na seção anterior. Examine o código para RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A primeira coisa que este método faz é criar o objeto POST usando a classe GraphQLRequest:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

que ajuda a formar o corpo do objeto POST e a convertê-lo corretamente em JSON apresentado como uma cadeia de caracteres única com o método ToJsonText, que remove todos os caracteres de nova linha do corpo da solicitação marcando-os com o caractere de escape (barra invertida) \.

Vamos nos concentrar no algoritmo de paginação e na estrutura assíncrona do código anterior. (Você pode consultar a documentação do GraphQL do GitHub para obter detalhes sobre a API do GraphQL do GitHub). O método RunPagedQueryAsync enumera os problemas dos mais recentes aos mais antigos. Ele solicita 25 problemas por página e examina a estrutura pageInfo da resposta para continuar com a página anterior. Isso segue o suporte de paginação padrão do GraphQL para respostas com várias páginas. A resposta inclui um objeto pageInfo que inclui um valor hasPreviousPages e um valor startCursor usados para solicitar a página anterior. Os problemas estão na matriz nodes. O método RunPagedQueryAsync anexa esses nós em uma matriz que contém todos os resultados de todas as páginas.

Após a recuperação e a restauração de uma página de resultados, RunPagedQueryAsync informa o andamento e verifica o cancelamento. Se o cancelamento tiver sido solicitado, RunPagedQueryAsync gerará um OperationCanceledException.

Há vários elementos nesse código que podem ser melhorados. Acima de tudo, RunPagedQueryAsync deve alocar armazenamento para todos os problemas retornados. Este exemplo é interrompido em 250 problemas porque recuperar todos os problemas exigiria muito mais memória para armazenar todos os problemas recuperados. Os protocolos para dar suporte a relatórios de progresso e cancelamento dificultam o entendimento do algoritmo em sua primeira leitura. Mais tipos e APIs estão envolvidos. Você também tem que rastrear as comunicações por meio de CancellationTokenSource e seu CancellationToken associado para entender onde o cancelamento foi solicitado e onde ele foi concedido.

Os fluxos assíncronos fornecem uma melhor maneira

Os fluxos assíncronos e o suporte de linguagem associado lidam com todas essas preocupações. O código que gera a sequência agora pode usar yield return para retornar os elementos em um método que foi declarado com o modificador async. É possível consumir um fluxo assíncrono usando um loop await foreach da mesma forma que é possível consumir qualquer sequência usando um loop foreach.

Esses novos recursos de linguagem dependem das três novas interfaces adicionadas ao .NET Standard 2.1 e implementadas no .NET Core 3.0:

Essas três interfaces devem ser familiares à maioria dos desenvolvedores C#. Elas se comportam de maneira semelhante às suas contrapartes síncronas:

Um tipo que pode não ser familiar é System.Threading.Tasks.ValueTask. A estrutura ValueTask fornece uma API semelhante para a classe System.Threading.Tasks.Task. ValueTask é usado nas interfaces por motivos de desempenho.

Converter para fluxos assíncronos

Em seguida, converta o método RunPagedQueryAsync para gerar um fluxo assíncrono. Primeiro, altere a assinatura de RunPagedQueryAsync para retornar um IAsyncEnumerable<JToken> e remova os objetos de progresso e o token de cancelamento da lista de parâmetros, conforme mostrado no código a seguir:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

O código inicial processa cada página à medida que a página é recuperada, como mostrado no código a seguir:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Substitua essas três linhas pelo seguinte código:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Você também pode remover a declaração de finalResults anteriormente nesse método e a instrução return que segue o loop que você modificou.

Você terminou as alterações para gerar um fluxo assíncrono. O método concluído deve ser semelhante ao seguinte código:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Em seguida, você pode alterar o código que consome a coleção para consumir o fluxo assíncrono. Localize o seguinte código em Main que processa a coleção de problemas:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Substitua o código pelo seguinte loop await foreach:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

A nova interface IAsyncEnumerator<T> deriva de IAsyncDisposable. Isso significa que o loop anterior descartará o fluxo de forma assíncrona quando o loop terminar. Você pode imaginar que o loop se parece com o seguinte código:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Por padrão, os elementos de fluxo são processados no contexto capturado. Se você quiser desabilitar a captura do contexto, use o método de extensão TaskAsyncEnumerableExtensions.ConfigureAwait. Para obter mais informações sobre contextos de sincronização e captura do contexto atual, consulte o artigo Como consumir o padrão assíncrono baseado em tarefa.

Os fluxos assíncronos dão suporte ao cancelamento usando o mesmo protocolo que outros métodos async. Você modificaria a assinatura do método iterador assíncrono da seguinte forma para dar suporte ao cancelamento:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

O atributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute faz com que o compilador gere código para o IAsyncEnumerator<T> que torna o token passado para visível para GetAsyncEnumerator o corpo do iterador assíncrono como esse argumento. Em runQueryAsync, você pode examinar o estado do token e cancelar mais trabalhos, se solicitado.

Você usa outro método de extensão, WithCancellation, para passar o token de cancelamento para o fluxo assíncrono. Você modificaria o loop ao enumerar os problemas da seguinte forma:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Você pode obter o código para o tutorial concluído do repositório dotnet/docs na pasta asynchronous-programming/snippets.

Executar o aplicativo finalizado

Execute o aplicativo novamente. Compare esse comportamento com o comportamento do aplicativo inicial. A primeira página de resultados é enumerada assim que fica disponível. Há uma pausa observável à medida que cada nova página é solicitada e recuperada, e os resultados da próxima página são rapidamente enumerados. O bloco try / catch não é necessário para lidar com o cancelamento: o chamador pode interromper a enumeração da coleção. O progresso é claramente informado, pois o fluxo assíncrono gera resultados à medida que cada página é baixada. O status de cada problema retornado está incluído diretamente no loop await foreach. Você não precisa de um objeto de retorno de chamada para acompanhar o progresso.

Você pode ver as melhorias no uso da memória examinando o código. Não é mais necessário alocar uma coleção para armazenar todos os resultados antes de serem enumerados. O chamador pode determinar como consumir os resultados e se uma coleção de armazenamento é necessária.

Execute o aplicativos inicial e o acabado, e observe você mesmo as diferenças entre as implementações. Depois de terminar, você pode excluir o token de acesso de GitHub criado ao iniciar este tutorial. Se um invasor obtiver acesso a esse token, ele poderá acessar as APIs do GitHub usando suas credenciais.

Neste tutorial, você usou fluxos assíncronos para ler itens individuais de uma API de rede que retorna páginas de dados. Fluxos assíncronos também podem ler de "fluxos sem fim", como um ticker de ações ou dispositivo sensor. A chamada para MoveNextAsync retorna o próximo item assim que ele estiver disponível.