教學課程:使用 C# 與 .NET 產生及取用非同步串流
非同步串流會建立資料的串流來源模型。 資料流通常會以非同步方式擷取或產生元素。 它們為非同步串流資料來源提供自然的程式設計模型。
在本教學課程中,您將了解如何:
- 建立會以非同步方式產生資料元素序列的資料來源。
- 以非同步方式取用資料來源。
- 支援取消和擷取非同步串流的內容。
- 識別何時應該使用新的介面與資料來源,而非先前的同步資料來源。
必要條件
您將需要設定電腦,以執行 .NET (包括 C# 編譯器)。 C# 編譯器可與 Visual Studio 2022 或 .NET SDK 搭配使用。
您將必須建立 GitHub 存取權杖,以便存取 GitHub GraphQL 端點。 為您的 GitHub 存取權杖選取下列權限:
- repo:status
- public_repo
將存取權杖儲存在安全的地方,以便您可以使用它來取得對 GitHub API 端點的存取權。
警告
保護您個人存取權杖的安全。 使用您個人存取權杖的任何軟體都可以使用您的存取權限進行 GitHub API 呼叫。
本教學課程假設您已熟悉 C# 和 .NET,包括 Visual Studio 或 .NET CLI。
執行入門應用程式
您可以從我們的 dotnet/samples 存放庫 (位於 csharp/tutorials/AsyncStreams 資料夾中) 取得此教學課程中使用的入門應用程式程式碼。
入門應用程式是主控台應用程式,它使用 GitHub GraphQL 介面來擷取 dotnet/docs 存放庫中最近撰寫的議題。 從查看入門應用程式 Main
方法的下列程式碼開始:
static async Task Main(string[] args)
{
//Follow these steps to create a GitHub Access Token
// https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
//Select the following permissions for your GitHub Access Token:
// - repo:status
// - public_repo
// Replace the 3rd parameter to the following code with your GitHub access token.
var key = GetEnvVariable("GitHubKey",
"You must store your GitHub key in the 'GitHubKey' environment variable",
"");
var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
{
Credentials = new Octokit.Credentials(key)
};
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
}
您可以將 GitHubKey
環境變數設定到您的個人存取權杖,或可以您可以將 GetEnvVariable
呼叫中的最後一個引數取代為您自己的個人存取權杖。 如果您要與其他人共用來源,請勿將存取碼放入原始程式碼中。 永遠不要將存取碼上傳至共用來源存放庫。
建立 GitHub 用戶端之後,Main
中的程式碼會建立進度報告物件與取消權杖。 一旦建立那些物件,Main
就會呼叫 RunPagedQueryAsync
以擷取最近的 250 個已建立議題。 該工作完成之後,會顯示結果。
當您執行入門應用程式時,您可以重點觀察此應用程式如何執行。 您將會看到針對從 GitHub 傳回的每個頁面回報的進度。 您會發現 GitHub 傳回議題的每個新頁面時暫停了一些時間。 最後,只有在從 GitHub 擷取全部 10 頁之後,才會顯示議題。
檢查實作
實作會揭示為何您觀察到上一節中討論的行為。 檢查 RunPagedQueryAsync
的程式碼:
private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
JArray finalResults = new JArray();
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
}
return finalResults;
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
這個方法所做的第一件事是使用 GraphQLRequest
類別建立 POST 物件:
public class GraphQLRequest
{
[JsonProperty("query")]
public string? Query { get; set; }
[JsonProperty("variables")]
public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();
public string ToJsonText() =>
JsonConvert.SerializeObject(this);
}
這有助於形成 POST 物件主體,並使用 ToJsonText
方法將其正確地轉換成以單一字串呈現的 JSON,這會從要求本文中移除所有新行字元,以 \
(反斜線) 逸出字元標記它們。
讓我們專注在分頁上述程式碼的演算法與非同步結構。 (您可以查閱 GitHub GraphQL 文件以取得有關 GitHub GraphQL API 的詳細資料。)RunPagedQueryAsync
方法會依從最新到最舊的順序列舉議題。 它會要求每頁 25 個議題,並檢查回應的 pageInfo
結構以使用上一頁繼續。 這遵循 GraphQL 對多頁回應的標準分頁支援。 回應包括 pageInfo
物件,此物件包括 hasPreviousPages
值與用來要求上一頁的 startCursor
值。 議題位於 nodes
陣列中。 RunPagedQueryAsync
方法會將這些節點附加到包含來自所有頁面之結果的陣列。
擷取並還原一頁的結果之後,RunPagedQueryAsync
會回報進度並檢查取消。 若已要求取消,RunPagedQueryAsync
會擲回 OperationCanceledException。
此程式碼中有許多元素可以改進。 最重要的是,RunPagedQueryAsync
必須為傳回的所有議題配置儲存體。 此案例會在第 250 個議題停止,因為擷取所有未決議題將需要多得多的記憶體來儲存所以已擷取的議題。 支援進度報告與取消的通訊協定會讓第一次讀取演算法時更難以理解。 涉及更多類型和 API。 您必須透過 CancellationTokenSource 與其關聯的 CancellationToken 來追蹤通訊,以了解要求取消之處與授與權限之處。
非同步資料流提供更好的方式
非同步資料流與關聯的語言支援可處理所有哪些顧慮。 產生序列的程式碼現在可以使用 yield return
在使用 async
修飾詞宣告的方法中傳回元素。 您可以使用 await foreach
迴圈取用非同步資料流,就像您使用 foreach
迴圈取用任何序列一樣。
這些新語言功能仰賴 .NET Standard 2.1 中新增並在 .NET Core 3.0 中實作的三個新介面:
- System.Collections.Generic.IAsyncEnumerable<T>
- System.Collections.Generic.IAsyncEnumerator<T>
- System.IAsyncDisposable
這三個介面對大部分的 C# 開發人員來說應該都很熟悉。 它們的運作方式與行為類似其同步對等項:
- System.Collections.Generic.IEnumerable<T>
- System.Collections.Generic.IEnumerator<T>
- System.IDisposable
一個不熟悉的型別可能是 System.Threading.Tasks.ValueTask。 ValueTask
結構提供類似的 API 給 System.Threading.Tasks.Task 類別。 ValueTask
因為效能原因而用於這些介面。
轉換為非同步資料流
接著,轉換 RunPagedQueryAsync
方法以產生非同步資料流。 首先,變更f RunPagedQueryAsync
的簽章以傳回 IAsyncEnumerable<JToken>
,並從參數清單移除取消權杖與進度物件,如下列程式碼所示:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
入門程式碼會在擷取頁面時處理每一頁,如下列程式碼所示:
finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();
使用下列程式碼取代那三行:
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
您也可以移除此方法中稍早的 finalResults
宣告以及遵循您先前修改之迴圈的 return
陳述式。
您已完成變更以產生非同步資料流。 完成方法應類似於下列程式碼:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
接著,您必須變更取用集合以取用非同步資料流的程式碼。 在 Main
中尋找會處理議題集合的下列程式碼:
var progressReporter = new progressStatus((num) =>
{
Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();
try
{
var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
cancellationSource.Token, progressReporter);
foreach(var issue in results)
Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
Console.WriteLine("Work has been cancelled");
}
使用下列 await foreach
迴圈取代該程式碼:
int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
新的介面 IAsyncEnumerator<T> 衍生自 IAsyncDisposable。 這表示上述迴圈會在迴圈完成時以非同步方式處置串流。 您可以想像迴圈看起來像下列程式碼:
int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
while (await enumerator.MoveNextAsync())
{
var issue = enumerator.Current;
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
} finally
{
if (enumerator != null)
await enumerator.DisposeAsync();
}
根據預設,串流元素會在擷取的內容中處理。 如果您想要停用內容擷取,請使用 TaskAsyncEnumerableExtensions.ConfigureAwait 擴充方法。 如需同步處理內容和擷取目前內容的詳細資訊,請參閱取用以工作為基礎的非同步模式上的文章。
非同步串流支援使用與其他 async
方法相同的通訊協定來取消。 您可以如下所示修改非同步迭代器方法的簽章來支援取消:
private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var issueAndPRQuery = new GraphQLRequest
{
Query = queryText
};
issueAndPRQuery.Variables["repo_name"] = repoName;
bool hasMorePages = true;
int pagesReturned = 0;
int issuesReturned = 0;
// Stop with 10 pages, because these are large repos:
while (hasMorePages && (pagesReturned++ < 10))
{
var postBody = issueAndPRQuery.ToJsonText();
var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
postBody, "application/json", "application/json");
JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);
int totalCount = (int)issues(results)["totalCount"]!;
hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
issuesReturned += issues(results)["nodes"]!.Count();
foreach (JObject issue in issues(results)["nodes"]!)
yield return issue;
}
JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}
System.Runtime.CompilerServices.EnumeratorCancellationAttribute 屬性會造成編譯器產生 IAsyncEnumerator<T> 的程式碼,讓作為該引數的非同步迭代器主體可看見傳遞至 GetAsyncEnumerator
的權杖。 在 runQueryAsync
內,您可以檢查權杖的狀態,並在要求時取消進一步的工作。
您可以使用另一個擴充方法 WithCancellation,將取消權杖傳遞至非同步串流。 您可以修改列舉問題的迴圈,如下所示:
private static async Task EnumerateWithCancellation(GitHubClient client)
{
int num = 0;
var cancellation = new CancellationTokenSource();
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
.WithCancellation(cancellation.Token))
{
Console.WriteLine(issue);
Console.WriteLine($"Received {++num} issues in total");
}
}
您可以從 dotnet/docs 存放庫 (位於 asynchronous-programming/snippets 資料夾中) 取得已完成之教學課程的程式碼。
執行已完成的應用程式
再次執行應用程式。 將其行為與入門應用程式的行為進行比較。 當第一個結果頁面可用時會儘快列舉。 您會發現要求並擷取每個新頁面時會暫停一些時間,燃後快速列舉下一個頁面的結果。 不需要 try
/ catch
區塊就能處理取消:呼叫者可以停止列舉集合。 系統會明確回報進度,因為非同步資料流會在下載每個頁面時產生結果。 每個傳回的問題狀態都會順暢地包含在 await foreach
迴圈中。 您不需要回撥物件來追蹤進度。
您可以透過檢查程式碼看到記憶體使用狀況的改進。 您再也不需要在列舉結果之前配置集合以儲存所有結果。 呼叫者可以決定如何取用結果,以及是否需要儲存體集合。
執行入門與已完成的應用程式,您將能親自觀察實作之間的差異。 完成之後,您可以刪除開始此教學課程時建立的 GitHub 存取權杖。 若攻擊者取得權杖的存取權,他們將能使用您的認證存取 GitHub API。
在本教學課程中,您已使用非同步資料流,從傳回資料頁面的網路 API 讀取個別項目。 非同步資料流也可以從「永不結束的資料流」讀取,例如股票行情指示器或感應器裝置。 MoveNextAsync
的呼叫會在下一個項目可用時立即傳回。