异步流 模拟数据源的流式传输。 数据流通常以异步方式检索或生成元素。 它们为异步流数据源提供自然编程模型。
本教程介绍以下操作:
- 创建以异步方式生成一系列数据元素的数据源。
- 以异步方式使用该数据源。
- 支持异步流的取消和捕获的上下文。
- 识别新接口和数据源是否优先于早期同步数据序列。
先决条件
需要将计算机设置为运行 .NET,包括 C# 编译器。 C# 编译器可用于 Visual Studio 2022 或 .NET SDK。
需要创建 GitHub 访问令牌 ,以便可以访问 GitHub GraphQL 终结点。 为 GitHub 访问令牌选择以下权限:
- 存储库:status
- public_repo
将访问令牌保存在安全的位置,以便使用它来获取对 GitHub API 终结点的访问权限。
警告
确保个人访问令牌安全。 任何具有个人访问令牌的软件都可以使用访问权限进行 GitHub API 调用。
本教程假定你熟悉 C# 和 .NET,包括 Visual Studio 或 .NET CLI。
运行初学者应用程序
可以从异步编程/代码片段文件夹中的 dotnet/docs 存储库获取本教程中使用的初学者应用程序的代码。
初学者应用程序是一个控制台应用程序,它使用 GitHub GraphQL 接口检索在 dotnet/docs 存储库中编写的最近问题。 首先来看一下以下初学者应用 Main
方法的代码:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
可以将环境变量设置为个人访问令牌,也可以将调用GitHubKey
中的最后一个GetEnvVariable
参数替换为个人访问令牌。 如果要与他人共享源,请不要将访问代码放入源代码中。 从不将访问代码上传到共享源存储库。
创建 GitHub 客户端后,代码在 Main
中创建一个进度报告对象和一个取消令牌。 创建这些对象后, Main
调用 RunPagedQueryAsync
以检索最近创建的 250 个问题。 完成该任务后,将显示结果。
运行初学者应用程序时,可以就此应用程序如何运行进行一些重要观察。 你将看到从 GitHub 返回的每个页面的进度报告。 在 GitHub 返回问题的每个新页面之前,可以观察到明显的停顿。 最后,问题仅在从 GitHub 检索到所有 10 个页面之后才会显示。
检查实施情况
该实现揭示了你观察到上一节中讨论的行为的原因。 检查 RunPagedQueryAsync
的代码:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
此方法做的第一件事是使用 GraphQLRequest
类创建 POST 对象:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
这有助于构成 POST 对象正文,并使用 ToJsonText
方法将其正确转换为以单个字符串呈现的 JSON,该方法从请求正文中删除所有换行符,并用 \
(反斜杠)转义字符对其进行标记。
让我们专注于上述代码的分页算法和异步结构。 (有关 GitHub GraphQL API 的详细信息,请参阅 GitHub GraphQL 文档 。该方法 RunPagedQueryAsync
枚举最新到最早的问题。 它请求每页 25 个条目,并检查 pageInfo
的响应结构以继续处理之前的页面。 这遵循了 GraphQL 对多页响应的标准分页支持。 响应包括一个pageInfo
对象,其中包括一个hasPreviousPages
值和一个startCursor
值,用于请求上一页。 问题位于数组中 nodes
。 该方法 RunPagedQueryAsync
将这些节点追加到包含所有页面的所有结果的数组中。
在检索和还原结果页之后,RunPagedQueryAsync
将报告进度并检查是否取消。 如果已请求取消,RunPagedQueryAsync
将引发 OperationCanceledException。
此代码中有几个元素可以改进。 最重要的是, RunPagedQueryAsync
必须为返回的所有问题分配存储。 此示例在 250 个问题处停止,因为检索所有打开的问题需要更多的内存来存储所有检索到的问题。 支持进度报告和取消的协议使算法在首次阅读时更难理解。 涉及更多类型和 API。 必须通过 CancellationTokenSource 及其关联的 CancellationToken 跟踪通信,以了解在何处请求取消,以及在何处授予取消。
异步流提供了更好的方法
异步流和关联的语言支持解决了所有这些问题。 生成序列的代码现在可以使用 yield return
返回用 async
修饰符声明的方法中的元素。 可以通过 await foreach
循环来使用异步流,就像通过 foreach
循环使用任何序列一样。
这些新的语言功能取决于添加到 .NET Standard 2.1 并在 .NET Core 3.0 中实现的三个新接口:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
大多数 C# 开发人员应该熟悉这三个接口。 它们的行为方式类似于其同步对应项:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
一种可能不熟悉的类型是 System.Threading.Tasks.ValueTask。 该 ValueTask
结构提供与 System.Threading.Tasks.Task 类类似的 API。
ValueTask
出于性能原因,在这些接口中使用。
转换为异步流
接下来,转换方法 RunPagedQueryAsync
以生成异步流。 首先,将RunPagedQueryAsync
的签名更改为返回IAsyncEnumerable<JToken>
,并从参数列表中删除取消令牌和进度对象,如以下代码所示:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
起始代码在检索页面时处理每个页面,如以下代码所示:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
将这三行替换为以下代码:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
还可以删除你在此方法中前面声明的 finalResults
,以及修改的循环后面的 return
语句。
已完成更改以生成异步流。 完成的方法应类似于以下代码:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
接下来,将使用集合的代码更改为使用异步流。 在Main
中找到处理问题集合的以下代码:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
将该代码替换为以下 await foreach
循环:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
新接口 IAsyncEnumerator<T> 派生自 IAsyncDisposable。 这意味着在循环完成时,前面的循环会以异步方式释放流。 可以想象循环类似于以下代码:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
默认情况下,流元素在捕获的上下文中进行处理。 如果要禁用上下文捕获,请使用 TaskAsyncEnumerableExtensions.ConfigureAwait 扩展方法。 有关同步上下文和捕获当前上下文的详细信息,请参阅有关 使用基于任务的异步模式的文章。
异步流支持使用与其他 async
方法相同的协议的取消。 将修改异步迭代器方法的签名,如下所示以支持取消:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
System.Runtime.CompilerServices.EnumeratorCancellationAttribute 属性导致编译器生成 IAsyncEnumerator<T> 的代码,该代码使传递给 GetAsyncEnumerator
的令牌对作为该参数的异步迭代器的主体可见。 在 runQueryAsync
中,可以检查令牌的状态,并在请求时取消进一步的工作。
您使用另一个扩展方法WithCancellation,将取消令牌传递给异步流。 可以按如下所示修改枚举问题的循环:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
可以从异步编程/代码片段文件夹中的 dotnet/docs 存储库获取已完成教程的代码。
运行已完成的应用程序
再次运行应用程序。 将其行为与初学者应用程序的行为进行对比。 会在结果的第一页可用立即对其进行枚举。 在请求和检索每个新页面时都会有一个可观察到的暂停,然后快速枚举下一页结果。 不需要try
/ catch
块进行取消处理:调用方可以停止枚举集合。 由于异步数据流在下载每个页面时生成结果,因此进度汇报显而易见。 返回的每个问题的状态都被无缝包含在await foreach
循环中。 无需回调对象即可跟踪进度。
可以通过检查代码来查看内存使用量的改进。 在枚举结果之前,不再需要分配集合来存储所有结果。 调用方可以确定如何使用结果以及是否需要存储集合。
运行初学者和已完成的应用程序,你可以自行观察实现之间的差异。 完成本教程后,可以删除在开始本教程时创建的 GitHub 访问令牌。 如果攻击者获得了该令牌的访问权限,他们可以使用凭据访问 GitHub API。
在本教程中,你使用异步流从返回数据页的网络 API 读取单个项。 异步流还可以从股票行情自动收录器或传感器设备等“永不结束的流”读取内容。 对 MoveNextAsync
的调用将在下一项可用后立即返回它。