Compartilhar via


Tutorial: Gerar e consumir fluxos assíncronos usando C# e .NET

Os fluxos assíncronos modelam uma fonte de dados de streaming. Os fluxos de dados geralmente recuperam ou geram elementos de forma assíncrona. Eles fornecem um modelo de programação natural para fontes de dados de streaming assíncronas.

Neste tutorial, você aprenderá a:

  • Crie uma fonte de dados que gere uma sequência de elementos de dados de forma assíncrona.
  • Consuma essa fonte de dados de forma assíncrona.
  • Suporte ao cancelamento e contextos capturados para fluxos assíncronos.
  • Reconheça quando a nova interface e a fonte de dados são preferenciais a sequências de dados síncronas anteriores.

Pré-requisitos

Você precisará configurar seu computador para executar o .NET, incluindo o compilador C#. O compilador C# está disponível com o Visual Studio 2022 ou o SDK do .NET.

Você precisará criar um token de acesso do GitHub para poder acessar o ponto de extremidade do GraphQL do GitHub. Selecione as seguintes permissões para o Token de Acesso do GitHub:

  • repo:status
  • public_repo

Salve o token de acesso em um local seguro para que você possa usá-lo para obter acesso ao ponto de extremidade da API do GitHub.

Aviso

Mantenha seu token de acesso pessoal seguro. Qualquer software com seu token de acesso pessoal pode fazer chamadas à API do GitHub usando seus direitos de acesso.

Este tutorial pressupõe que você esteja familiarizado com C# e .NET, incluindo o Visual Studio ou a CLI do .NET.

Executar o aplicativo inicial

Você pode obter o código para o aplicativo de início usado neste tutorial no repositório dotnet/docs na pasta asynchronous-programming/snippets.

O aplicativo inicial é um aplicativo de console que usa a interface GraphQL do GitHub para recuperar problemas recentes escritos no repositório dotnet/docs . Comece examinando o seguinte código para o método inicial do aplicativo Main.

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Você pode definir uma variável de ambiente GitHubKey para o seu token de acesso pessoal, ou você pode substituir o último argumento na chamada GetEnvVariable pelo seu token de acesso pessoal. Não coloque seu código de acesso no código-fonte se você compartilhar a origem com outras pessoas. Nunca carregue códigos de acesso em um repositório de origem compartilhado.

Depois de criar o cliente GitHub, o código em Main cria um objeto de relatório de progresso e um token de cancelamento. Depois que esses objetos são criados, Main chama RunPagedQueryAsync para recuperar os 250 incidentes mais recentes criados. Depois que essa tarefa for concluída, os resultados serão exibidos.

Ao executar o aplicativo inicial, você pode fazer algumas observações importantes sobre como esse aplicativo é executado. Você verá o progresso sendo relatado para cada página que for retornada do GitHub. Você pode observar uma pausa perceptível antes que o GitHub retorne cada nova página de problemas. Por fim, os problemas são exibidos somente depois que todas as 10 páginas foram recuperadas do GitHub.

Examinar a implementação

A implementação revela por que você observou o comportamento discutido na seção anterior. Examine o código para RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

A primeira coisa que este método faz é criar o objeto POST usando a GraphQLRequest classe:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

que ajuda a formar o corpo do objeto POST e a convertê-lo corretamente em JSON apresentado como uma cadeia de caracteres única com o método ToJsonText, que remove todos os caracteres de nova linha do corpo da solicitação marcando-os com o caractere de escape (barra invertida) \.

Vamos nos concentrar no algoritmo de paginação e na estrutura assíncrona do código anterior. (Você pode consultar a documentação do GraphQL do GitHub para obter detalhes sobre a API do GraphQL do GitHub.) O RunPagedQueryAsync método enumera os problemas dos mais recentes aos mais antigos. Solicita 25 itens por página e examina a pageInfo estrutura da resposta para prosseguir para a página anterior. Isso segue o suporte de paginação padrão do GraphQL para respostas de várias páginas. A resposta inclui um pageInfo objeto que inclui um hasPreviousPages valor e um startCursor valor usado para solicitar a página anterior. Os problemas estão na nodes matriz. O RunPagedQueryAsync método acrescenta esses nós a uma matriz que contém todos os resultados de todas as páginas.

Depois de recuperar e restaurar uma página de resultados, RunPagedQueryAsync relata o progresso e verifica se há cancelamento. Se o cancelamento tiver sido solicitado, RunPagedQueryAsync gerará um OperationCanceledException.

Há vários elementos nesse código que podem ser aprimorados. Acima de tudo, RunPagedQueryAsync deve alocar armazenamento para todos os problemas retornados. Esse exemplo para em 250 problemas porque recuperar todos os problemas abertos exigiria muito mais memória para armazenar todos os problemas recuperados. Os protocolos para dar suporte a relatórios de progresso e cancelamento tornam o algoritmo mais difícil de entender em sua primeira leitura. Mais tipos e APIs estão envolvidos. Você deve rastrear as comunicações por meio do CancellationTokenSource e do associado CancellationToken para entender onde o cancelamento é solicitado e onde ele é concedido.

Os fluxos assíncronos fornecem uma maneira mais eficiente

Os fluxos assíncronos e o suporte ao idioma associado abordam todas essas preocupações. O código que gera a sequência agora pode usar yield return para retornar elementos em um método que foi declarado com seu modificador async. Você pode consumir um fluxo assíncrono usando um await foreach loop assim como consome qualquer sequência usando um foreach loop.

Esses novos recursos de linguagem dependem de três novas interfaces adicionadas ao .NET Standard 2.1 e implementadas no .NET Core 3.0:

Essas três interfaces devem ser familiares para a maioria dos desenvolvedores do C#. Eles se comportam de maneira semelhante aos seus equivalentes síncronos:

Um tipo que pode não ser familiar é System.Threading.Tasks.ValueTask. O ValueTask struct fornece uma API semelhante à System.Threading.Tasks.Task classe. ValueTask é usado nessas interfaces por motivos de desempenho.

Converter para fluxos assíncronos

Em seguida, converta o RunPagedQueryAsync método para gerar um fluxo assíncrono. Primeiro, altere a assinatura de RunPagedQueryAsync para retornar um IAsyncEnumerable<JToken>, e remova o token de cancelamento e os objetos de progresso da lista de parâmetros, conforme mostrado no código a seguir.

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

O código inicial processa cada página conforme a página é recuperada, conforme mostrado no seguinte código:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Substitua essas três linhas pelo seguinte código:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Você também pode remover a declaração de finalResults anterior neste método e a instrução de return que segue o loop que você modificou.

Você concluiu as alterações para gerar um fluxo assíncrono. O método concluído deve ser semelhante ao seguinte código:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Em seguida, você altera o código que consome a coleção para consumir o fluxo assíncrono. Encontre o seguinte código em Main que processa a coleção de problemas.

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Substitua esse código pelo seguinte await foreach loop:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

A nova interface IAsyncEnumerator<T> deriva de IAsyncDisposable. Isso significa que o loop anterior descartará o fluxo de forma assíncrona quando o loop terminar. Você pode imaginar que o loop se parece com o seguinte código:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Por padrão, os elementos de fluxo são processados no contexto capturado. Se você quiser desabilitar a captura do contexto, use o TaskAsyncEnumerableExtensions.ConfigureAwait método de extensão. Para obter mais informações sobre contextos de sincronização e captura do contexto atual, consulte o artigo sobre como consumir o padrão assíncrono baseado em tarefa.

Os fluxos assíncronos dão suporte ao cancelamento usando o mesmo protocolo que outros async métodos. Você modificaria a assinatura para o método iterador assíncrono da seguinte maneira para dar suporte ao cancelamento:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

O atributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute faz com que o compilador gere código para o IAsyncEnumerator<T> que torne o token passado para GetAsyncEnumerator visível ao corpo do iterador assíncrono como um argumento. Em runQueryAsync, você pode examinar o estado do token e cancelar mais trabalhos, se solicitado.

Você usa outro método de extensão, WithCancellation, para passar o token de cancelamento para o fluxo assíncrono. Você modificaria o loop enumerando os problemas da seguinte maneira:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Você pode obter o código para o tutorial concluído no repositório dotnet/docs na pasta assíncrona-programação/snippets .

Executar o aplicativo concluído

Execute o aplicativo novamente. Contraste seu comportamento com o comportamento do aplicativo inicial. A primeira página de resultados é enumerada assim que está disponível. Há uma pausa observável à medida que cada nova página é solicitada e recuperada e, em seguida, os resultados da próxima página são enumerados rapidamente. O try / catch bloco não é necessário para lidar com o cancelamento: o chamador pode parar de enumerar a coleção. O progresso é claramente relatado porque o fluxo assíncrono gera resultados conforme cada página é baixada. O status de cada problema retornado está incluído diretamente no loop await foreach. Você não precisa de um objeto de retorno de chamada para acompanhar o progresso.

Você pode ver melhorias no uso da memória examinando o código. Você não precisa mais alocar uma coleção para armazenar todos os resultados antes que eles sejam enumerados. O chamador pode determinar como consumir os resultados e se uma coleção de armazenamento é necessária.

Execute os aplicativos iniciados e concluídos e você pode observar as diferenças entre as implementações por conta própria. Você pode excluir o token de acesso do GitHub que criou quando iniciou este tutorial após a conclusão. Se um invasor tiver acesso a esse token, ele poderá acessar as APIs do GitHub usando suas credenciais.

Neste tutorial, você usou fluxos assíncronos para ler itens individuais de uma API de rede que retorna páginas de dados. Fluxos assíncronos também podem ler de "fluxos sem fim", como um ticker de ações ou dispositivo sensor. A chamada para MoveNextAsync retorna o próximo item assim que ele estiver disponível.