教學課程:使用 C# 和 .NET 產生和使用非同步資料流程

非同步串流會 建立資料流程資料來源的模型。 資料流程通常會以非同步方式擷取或產生元素。 它們為非同步串流資料來源提供自然的程式設計模型。

在本教學課程中,您將了解如何:

  • 建立會以非同步方式產生資料元素序列的資料來源。
  • 以非同步方式取用資料來源。
  • 支援非同步資料流程的取消和擷取內容。
  • 識別何時應該使用新的介面與資料來源,而非先前的同步資料來源。

必要條件

您必須設定機器以執行 .NET,包括 C# 編譯器。 C# 編譯器適用于 Visual Studio 2022.NET SDK

您將必須建立 GitHub 存取權杖,以便存取 GitHub GraphQL 端點。 為您的 GitHub 存取權杖選取下列權限:

  • repo:status
  • public_repo

將存取權杖儲存在安全的地方,以便您可以使用它來取得對 GitHub API 端點的存取權。

警告

保護您個人存取權杖的安全。 使用您個人存取權杖的任何軟體都可以使用您的存取權限進行 GitHub API 呼叫。

本教學課程假設您已熟悉 C# 和 .NET,包括 Visual Studio 或 .NET CLI。

執行入門應用程式

您可以從csharp/whats-new/tutorials資料夾中的 dotnet/docs存放庫,取得本教學課程中使用的入門應用程式程式碼。

入門應用程式是主控台應用程式,它使用 GitHub GraphQL 介面來擷取 dotnet/docs 存放庫中最近撰寫的議題。 從查看入門應用程式 Main 方法的下列程式碼開始:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

您可以將 GitHubKey 環境變數設定到您的個人存取權杖,或可以您可以將 GetEnvVariable 呼叫中的最後一個引數取代為您自己的個人存取權杖。 如果您要與其他人共用來源,請勿將存取碼放在原始程式碼中。 永遠不要將存取碼上傳至共用來源存放庫。

建立 GitHub 用戶端之後,Main 中的程式碼會建立進度報告物件與取消權杖。 一旦建立那些物件,Main 就會呼叫 RunPagedQueryAsync 以擷取最近的 250 個已建立議題。 該工作完成之後,會顯示結果。

當您執行入門應用程式時,您可以重點觀察此應用程式如何執行。 您將會看到針對從 GitHub 傳回的每個頁面回報的進度。 您會發現 GitHub 傳回議題的每個新頁面時暫停了一些時間。 最後,只有在從 GitHub 擷取全部 10 頁之後,才會顯示議題。

檢查實作

實作會揭示為何您觀察到上一節中討論的行為。 檢查 RunPagedQueryAsync 的程式碼:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

讓我們專注在分頁上述程式碼的演算法與非同步結構。 (如需 GitHub GraphQL API 的詳細資訊,請參閱 GitHub GraphQL 檔 。) 方法 RunPagedQueryAsync 會列舉最近到最舊的問題。 它會要求每頁 25 個議題,並檢查回應的 pageInfo 結構以使用上一頁繼續。 這遵循 GraphQL 對多頁回應的標準分頁支援。 回應包括 pageInfo 物件,此物件包括 hasPreviousPages 值與用來要求上一頁的 startCursor 值。 議題位於 nodes 陣列中。 RunPagedQueryAsync 方法會將這些節點附加到包含來自所有頁面之結果的陣列。

擷取並還原一頁的結果之後,RunPagedQueryAsync 會回報進度並檢查取消。 若已要求取消,RunPagedQueryAsync 會擲回 OperationCanceledException

此程式碼中有許多元素可以改進。 最重要的是,RunPagedQueryAsync 必須為傳回的所有議題配置儲存體。 此案例會在第 250 個議題停止,因為擷取所有未決議題將需要多得多的記憶體來儲存所以已擷取的議題。 支援進度報告和取消的通訊協定可讓演算法更難瞭解其第一次閱讀。 涉及更多類型和 API。 您必須透過 CancellationTokenSource 及其相關聯的 CancellationToken 追蹤通訊,以瞭解要求取消的位置以及授與的位置。

非同步資料流提供更好的方式

非同步資料流與關聯的語言支援可處理所有哪些顧慮。 產生序列的程式碼現在可以使用 yield return 在使用 async 修飾詞宣告的方法中傳回元素。 您可以使用 await foreach 迴圈取用非同步資料流,就像您使用 foreach 迴圈取用任何序列一樣。

這些新語言功能仰賴 .NET Standard 2.1 中新增並在 .NET Core 3.0 中實作的三個新介面:

這三個介面對大部分的 C# 開發人員來說應該都很熟悉。 它們的運作方式與行為類似其同步對等項:

一個不熟悉的型別可能是 System.Threading.Tasks.ValueTaskValueTask 結構提供類似的 API 給 System.Threading.Tasks.Task 類別。 ValueTask 因為效能原因而用於這些介面。

轉換為非同步資料流

接著,轉換 RunPagedQueryAsync 方法以產生非同步資料流。 首先,變更f RunPagedQueryAsync 的簽章以傳回 IAsyncEnumerable<JToken>,並從參數清單移除取消權杖與進度物件,如下列程式碼所示:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

入門程式碼會在擷取頁面時處理每一頁,如下列程式碼所示:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

使用下列程式碼取代那三行:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

您也可以移除此方法中稍早的 finalResults 宣告以及遵循您先前修改之迴圈的 return 陳述式。

您已完成變更以產生非同步資料流。 完成的方法應該類似下列程式碼:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

接著,您必須變更取用集合以取用非同步資料流的程式碼。 在 Main 中尋找會處理議題集合的下列程式碼:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

使用下列 await foreach 迴圈取代該程式碼:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

新的介面 IAsyncEnumerator<T> 衍生自 IAsyncDisposable 。 這表示上述迴圈會在迴圈完成時以非同步方式處置資料流程。 您可以想像迴圈看起來像下列程式碼:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetEnumeratorAsync();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

根據預設,資料流程元素會在擷取的內容中處理。 如果您想要停用內容的擷取,請使用 TaskAsyncEnumerableExtensions.ConfigureAwait 擴充方法。 如需同步處理內容和擷取目前內容的詳細資訊,請參閱取用以 工作為基礎的非同步模式一文。

非同步資料流程支援使用與其他 async 方法相同的通訊協定取消。 您可以修改非同步反覆運算器方法的簽章,如下所示,以支援取消:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

屬性 System.Runtime.CompilerServices.EnumeratorCancellationAttribute 會使編譯器產生 的程式 IAsyncEnumerator<T> 代碼,讓權杖傳遞至 GetAsyncEnumerator 非同步反覆運算器的主體可見做為該引數。 在 內 runQueryAsync ,您可以檢查權杖的狀態,並在要求時取消進一步的工作。

您可以使用另一個擴充方法 WithCancellation ,將取消權杖傳遞至非同步資料流程。 您可以修改列舉問題的迴圈,如下所示:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

您可以從csharp/whats-new/tutorials資料夾中的dotnet/docs存放庫取得已完成教學課程的程式碼。

執行已完成的應用程式

再次執行應用程式。 將其行為與入門應用程式的行為進行比較。 當第一個結果頁面可用時會儘快列舉。 您會發現要求並擷取每個新頁面時會暫停一些時間,燃後快速列舉下一個頁面的結果。 不需要 try / catch 區塊就能處理取消:呼叫者可以停止列舉集合。 系統會明確回報進度,因為非同步資料流會在下載每個頁面時產生結果。 每個傳回之問題的狀態都會順暢地包含在 迴圈中 await foreach 。 您不需要回呼物件來追蹤進度。

您可以透過檢查程式碼看到記憶體使用狀況的改進。 您再也不需要在列舉結果之前配置集合以儲存所有結果。 呼叫者可以決定如何取用結果,以及是否需要儲存體集合。

執行入門與已完成的應用程式,您將能親自觀察實作之間的差異。 完成之後,您可以刪除開始此教學課程時建立的 GitHub 存取權杖。 若攻擊者取得權杖的存取權,他們將能使用您的認證存取 GitHub API。