Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Os fluxos assíncronos modelam uma fonte de transmissão de dados. Os fluxos de dados geralmente recuperam ou geram elementos de forma assíncrona. Eles fornecem um modelo de programação natural para fontes de dados de streaming assíncronas.
Neste tutorial, você aprenderá a:
- Crie uma fonte de dados que gere uma sequência de elementos de dados de forma assíncrona.
- Consuma essa fonte de dados de forma assíncrona.
- Suporte ao cancelamento e contextos capturados para fluxos assíncronos.
- Reconhecer quando a nova interface e a fonte de dados são preferidas às sequências de dados síncronas anteriores.
Pré-requisitos
Você precisará configurar sua máquina para executar o .NET, incluindo o compilador C#. O compilador C# está disponível com o Visual Studio 2022 ou o .NET SDK.
Você precisará criar um token de acesso do GitHub para poder acessar o ponto de extremidade do GitHub GraphQL. Selecione as seguintes permissões para seu token de acesso do GitHub:
- repositório:estado
- public_repo
Salve o token de acesso em um local seguro para que você possa usá-lo para obter acesso ao ponto de extremidade da API do GitHub.
Advertência
Mantenha o seu token de acesso pessoal seguro. Qualquer software com seu token de acesso pessoal pode fazer chamadas de API do GitHub usando seus direitos de acesso.
Este tutorial pressupõe que você esteja familiarizado com C# e .NET, incluindo o Visual Studio ou a CLI do .NET.
Execute o aplicativo inicial
Você pode obter o código para o aplicativo inicial usado neste tutorial no repositório dotnet/docs na pasta programação assíncrona/trechos .
O aplicativo inicial é um aplicativo de console que usa a interface GitHub GraphQL para recuperar problemas recentes escritos no repositório dotnet/docs . Comece por examinar o seguinte código para o método da aplicação inicial Main.
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
Você pode definir uma variável de GitHubKey ambiente para seu token de acesso pessoal ou pode substituir o último argumento na chamada para GetEnvVariable pelo seu token de acesso pessoal. Não coloque seu código de acesso no código-fonte se você estiver compartilhando o código-fonte com outras pessoas. Nunca carregue códigos de acesso para um repositório de código-fonte compartilhado.
Depois de criar o cliente GitHub, o código em Main cria um objeto de relatório de progresso e um token de cancelamento. Depois que esses objetos são criados, Main chama RunPagedQueryAsync para recuperar os 250 problemas criados mais recentes. Após a conclusão dessa tarefa, os resultados são exibidos.
Quando você executa o aplicativo inicial, você pode fazer algumas observações importantes sobre como esse aplicativo é executado. Você verá um relatório de progresso para cada página recebida do GitHub. Você pode observar uma pausa percetível antes que o GitHub retorne cada nova página de problemas. Finalmente, os problemas são exibidos somente depois que todas as 10 páginas foram recuperadas do GitHub.
Examinar a implementação
A implementação revela por que você observou o comportamento discutido na seção anterior. Examine o código para RunPagedQueryAsync:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
A primeira coisa que esse método faz é criar o objeto POST, usando a GraphQLRequest classe:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
que ajuda a formar o corpo do objeto POST e a convertê-lo corretamente em JSON, apresentado como uma única cadeia de caracteres com o método ToJsonText, que remove todos os caracteres de nova linha do corpo da solicitação, marcando-os com o caractere de escape \ (barra invertida).
Vamos nos concentrar no algoritmo de paginação e na estrutura assíncrona do código anterior. (Você pode consultar a documentação do GitHub GraphQL para obter detalhes sobre a API do GitHub GraphQL.) O RunPagedQueryAsync método enumera os problemas do mais recente para o mais antigo. Solicita 25 questões por página e examina a pageInfo estrutura da resposta para continuar com a página anterior. Isso segue o suporte de paginação padrão do GraphQL para respostas de várias páginas. A resposta inclui um pageInfo objeto que inclui um hasPreviousPages valor e um startCursor valor usado para solicitar a página anterior. Os problemas estão na nodes matriz. O RunPagedQueryAsync método acrescenta esses nós a uma matriz que contém todos os resultados de todas as páginas.
Depois de recuperar e restaurar uma página de resultados, RunPagedQueryAsync relata o progresso e verifica o cancelamento. Se o cancelamento tiver sido solicitado, RunPagedQueryAsync lança um OperationCanceledException.
Há vários elementos neste código que podem ser melhorados. Mais importante, RunPagedQueryAsync deve alocar armazenamento para todas as questões retornadas. Este exemplo para em 250 problemas porque recuperar todos os problemas abertos exigiria muito mais memória para armazenar todos os problemas recuperados. Os protocolos de apoio aos relatórios de progresso e cancelamento tornam o algoritmo mais difícil de entender em sua primeira leitura. Mais tipos e APIs estão envolvidos. Você deve rastrear as comunicações através do CancellationTokenSource e do seu associado CancellationToken para entender onde o pedido de cancelamento é feito e onde ele é concedido.
Os fluxos assíncronos oferecem uma forma melhor
Os fluxos assíncronos e o suporte a idiomas associados resolvem todas essas preocupações. O código que gera a sequência pode agora utilizar yield return para retornar elementos num método que foi declarado com o modificador async. Você pode consumir um fluxo assíncrono usando um await foreach loop da mesma forma que consome qualquer sequência usando um foreach loop.
Esses novos recursos de linguagem dependem de três novas interfaces adicionadas ao .NET Standard 2.1 e implementadas no .NET Core 3.0:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
Essas três interfaces devem ser familiares para a maioria dos desenvolvedores de C#. Comportam-se de forma semelhante aos seus homólogos síncronos:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
Um tipo que pode ser desconhecido é System.Threading.Tasks.ValueTask. O ValueTask struct fornece uma API semelhante à System.Threading.Tasks.Task classe.
ValueTask é usado nessas interfaces por motivos de desempenho.
Converter para fluxos assíncronos
Em seguida, converta o método RunPagedQueryAsync para gerar um fluxo assíncrono. Primeiro, altere a assinatura de RunPagedQueryAsync para retornar um IAsyncEnumerable<JToken>, e remova o token de cancelamento e os objetos de progresso da lista de parâmetros, conforme mostrado no código a seguir.
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
O código inicial processa cada página à medida que a página é recuperada, conforme mostrado no código a seguir:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
Substitua essas três linhas pelo seguinte código:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
Você também pode remover a declaração de finalResults anteriormente neste método e a return instrução que segue o loop que você modificou.
Você concluiu as alterações para gerar um fluxo assíncrono. O método concluído deve ser semelhante ao seguinte código:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
Em seguida, modifique o código que consome a coleção para consumir o fluxo assíncrono. Encontre o seguinte código em Main que processa a coleção de problemas.
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
Substitua esse código pelo seguinte await foreach loop:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
A nova interface IAsyncEnumerator<T> deriva de IAsyncDisposable. Isso significa que o loop anterior descartará o stream de forma assíncrona quando o loop terminar. Você pode imaginar que o loop se parece com o seguinte código:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
Por padrão, os elementos de fluxo são processados no contexto capturado. Se você quiser desativar a captura do contexto, use o TaskAsyncEnumerableExtensions.ConfigureAwait método de extensão. Para obter mais informações sobre contextos de sincronização e capturar o contexto atual, consulte o artigo sobre consumir o padrão assíncrono baseado em tarefas.
Os fluxos assíncronos suportam cancelamento usando o mesmo protocolo de outros async métodos. Você modificaria a assinatura para o método iterador assíncrono da seguinte maneira para oferecer suporte ao cancelamento:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
O atributo System.Runtime.CompilerServices.EnumeratorCancellationAttribute faz com que o compilador gere código para o IAsyncEnumerator<T>, que torna o token passado ao GetAsyncEnumerator visível como esse argumento para o corpo do iterador assíncrono. Dentro runQueryAsync, você pode examinar o estado do token e interromper trabalho adicional, se solicitado.
Você usa outro método de extensão, WithCancellation, para passar o token de cancelamento para o fluxo assíncrono. Você modificaria o loop enumerando os problemas da seguinte maneira:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
Você pode obter o código para o tutorial concluído no repositório dotnet/docs na pasta programação assíncrona/trechos .
Executar o aplicativo concluído
Execute o aplicativo novamente. Compare seu comportamento com o comportamento do aplicativo inicial. A primeira página de resultados é enumerada assim que está disponível. Há uma pausa observável à medida que cada nova página é solicitada e recuperada, em seguida, os resultados da próxima página são rapidamente enumerados. O try / catch bloco não é necessário para lidar com o cancelamento: o chamador pode parar de enumerar a coleção. O progresso é claramente relatado porque o fluxo assíncrono gera resultados à medida que cada página é baixada. O status de cada problema retornado é incluído perfeitamente no await foreach loop. Você não precisa de um objeto de retorno de chamada para acompanhar o progresso.
Você pode ver melhorias no uso da memória examinando o código. Não é mais necessário alocar uma coleção para armazenar todos os resultados antes que eles sejam enumerados. O chamador pode determinar como consumir os resultados e se uma coleção de armazenamento é necessária.
Execute os aplicativos iniciais e concluídos e você pode observar as diferenças entre as implementações por si mesmo. Você pode excluir o token de acesso do GitHub criado quando iniciou este tutorial depois de terminar. Se um invasor obtiver acesso a esse token, ele poderá acessar as APIs do GitHub usando suas credenciais.
Neste tutorial, você usou fluxos assíncronos para ler itens individuais de uma API de rede que retorna páginas de dados. Os fluxos assíncronos também podem ser lidos a partir de "fluxos intermináveis", como um ticker de ações ou um dispositivo sensor. A chamada para MoveNextAsync retorna o próximo item assim que estiver disponível.