共用方式為


教學課程:使用 C# 和 .NET 產生和使用異步數據流

異步串流會 建立數據流來源的模型。 數據流通常會以異步方式擷取或產生元素。 它們提供異步串流數據源的自然程序設計模型。

在本教學課程中,您將了解如何:

  • 建立數據源,以異步方式產生一連串的數據元素。
  • 以異步方式取用該數據源。
  • 支援異步數據流的取消和擷取內容。
  • 辨識新介面和數據來源是否優先於先前的同步數據序列。

先決條件

您必須設定您的電腦以執行 .NET,包括 C# 編譯程式。 C# 編譯程式適用於 Visual Studio 2022.NET SDK

您必須建立 GitHub 存取令牌 ,才能存取 GitHub GraphQL 端點。 為您的 GitHub 存取權杖選取下列權限:

  • 儲存庫:狀態
  • public_repo

將存取令牌儲存在安全的地方,讓您可以使用它來取得 GitHub API 端點的存取權。

警告

保護您的個人存取令牌安全。 任何具有個人存取令牌的軟體都可以使用您的訪問許可權來呼叫 GitHub API。

本教學課程假設您已熟悉 C# 和 .NET,包括 Visual Studio 或 .NET CLI。

執行入門應用程式

您可以從異步程式設計/代碼段資料夾中的 dotnet/docs 存放庫,取得本教學課程中使用的入門應用程式程式代碼。

入門應用程式是一種控制台應用程式,會使用 GitHub GraphQL 介面來擷取 dotnet/docs 存放庫中所撰寫的最新問題。 從檢視下列入門應用程式 Main 方法的程式代碼開始:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

您可以將環境變數設定為個人存取令牌,也可以將呼叫GitHubKey中的最後一個GetEnvVariable自變數取代為您的個人存取令牌。 如果您要與其他人共用來源,請勿將存取碼放入原始程式碼中。 永遠不要將存取碼上傳至共用來源存放庫。

建立 GitHub 客戶端之後,中的 Main 程式代碼會建立進度報告物件和取消令牌。 建立這些對象之後, Main 呼叫 RunPagedQueryAsync 以擷取最近建立的 250 個問題。 完成該工作之後,就會顯示結果。

當您執行入門應用程式時,您可以對這個應用程式執行方式進行一些重要的觀察。 您會看到從 GitHub 傳回的每個頁面的進度報告。 您可以在 GitHub 顯示每個新問題頁面之前觀察到明顯的暫停。 最後,只有在從 GitHub 擷取所有 10 個頁面之後,才會顯示問題。

檢查實施

實作會顯示為何您觀察到上一節所討論的行為。 檢查程式代碼 RunPagedQueryAsync

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

這個方法做的第一件事是使用 GraphQLRequest 類別建立 POST 物件:

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

這有助於形成 POST 物件主體,並使用 ToJsonText 方法將它正確地轉換成以單一字串呈現的 JSON,此方法會從您的要求主體中移除所有換行符,並用\(反斜杠)逃脫字元標記它們。

讓我們專注於上述程式代碼的分頁演算法和異步結構。 (如需 GitHub GraphQL API 的詳細資訊,請參閱 GitHub GraphQL 檔 。方法 RunPagedQueryAsync 會列舉最近到最舊的問題。 它會要求每頁處理 25 個事項,並檢查 pageInfo 響應的結構以便返回上一頁。 這遵循 GraphQL 對多頁回應的標準分頁支援。 回應包含 pageInfo 物件,其中包含 hasPreviousPages 值,以及用來要求上一頁 startCursor 的值。 問題位於陣列中 nodes 。 方法會將 RunPagedQueryAsync 這些節點附加至陣列,其中包含所有頁面的所有結果。

擷取和還原結果頁面之後, RunPagedQueryAsync 報告進度並檢查取消。 如果已要求取消,RunPagedQueryAsync 則會拋出 OperationCanceledException

此程式代碼中有數個元素可以改善。 最重要的是, RunPagedQueryAsync 必須針對傳回的所有問題配置記憶體。 此範例在 250 個問題時停止,因為擷取所有未解決的問題需要更多記憶體來進行儲存。 支援進度報告和取消的協定讓演算法在第一次閱讀時更難理解。 涉及更多類型和 API。 您必須透過 CancellationTokenSource 和相關聯的 CancellationToken 追蹤通訊,以瞭解取消請求的來源及其獲得批准的位置。

異步串流提供更佳的方式

異步串流和相關聯的語言支援可解決上述所有問題。 產生序列的程式代碼現在可以使用 yield return 來傳回在使用 async 修飾詞宣告的方法中的元素。 您可以使用 await foreach 迴圈取用異步數據流,就像您使用 foreach 迴圈取用任何序列一樣。

這些新的語言功能取決於新增至 .NET Standard 2.1 的三個新介面,並在 .NET Core 3.0 中實作:

這三個介面對大多數 C# 開發人員應該很熟悉。 它們的行為方式類似於其同步對象:

其中一種可能不熟悉的類型是 System.Threading.Tasks.ValueTask。 結構 ValueTask 提供與類別類似的 API System.Threading.Tasks.TaskValueTask 基於效能考慮,會用於這些介面中。

轉換成異步數據流

接下來,將方法轉換為 RunPagedQueryAsync 產生異步數據流。 首先,将 RunPagedQueryAsync 的簽章更改為返回 IAsyncEnumerable<JToken>,並從參數列表中移除取消標記和進度對象,如以下程式代碼所示。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

起始程序代碼會在擷取頁面時處理每個頁面,如下列程式代碼所示:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

以下列程式代碼取代這三行:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

您也可以移除此方法中稍早的 finalResults 宣告,以及您所修改的迴圈後的 return 語句。

您已完成產生異步數據流的變更。 完成的方法應該類似下列程式代碼:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

接下來,您需要修改代碼,使之從取用集合改為取用非同步串流。 在 中 Main 尋找下列程式代碼,以處理問題的集合:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

以下列 await foreach 迴圈取代該程式代碼:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

新的介面 IAsyncEnumerator<T> 衍生自 IAsyncDisposable。 這表示上述迴圈會在迴圈完成時以異步方式處置數據流。 您可以想像循環看起來像下列程式代碼:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

根據預設,數據流元素會在擷取的內容中處理。 如果您想要停用內容的擷取,請使用 TaskAsyncEnumerableExtensions.ConfigureAwait 擴充方法。 如需同步化上下文和擷取當前上下文的詳細資訊,請參閱 使用工作為基礎的非同步模式一文。

非同步串流能夠使用與其他 async 方法相同的通訊協定來取消。 您可以修改非同步迭代器方法的簽名,如下所示,以支援取消操作:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute 屬性會使編譯器為 IAsyncEnumerator<T> 生成程式碼,使傳遞給 GetAsyncEnumerator 的 token 在異步迭代器的主體中作為該自變數可見。 在 runQueryAsync 中,您可以檢查 token 的狀態,並在需要時取消後續工作。

您可以使用另一個擴充方法 WithCancellation,將取消令牌傳遞至異步數據流。 您可以修改列舉問題的迴圈,如下所示:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

您可以從異步程式設計/代碼段資料夾中的 dotnet/docs 存放庫取得已完成教學課程的程式碼。

執行已完成的應用程式

再次執行應用程式。 將其行為與入門應用程式的行為形成對比。 結果的第一頁會在可用時立即列舉。 每次要求並擷取新頁面時,都會有可觀察的暫停,接著會快速列舉下一頁的結果。 try / catch不需要區塊來處理取消:呼叫端可以停止列舉集合。 進度會清楚回報,因為異步數據流會在下載每個頁面時產生結果。 傳回的每個項目的狀態會順暢地包含在 await foreach 迴圈中。 您不需要回呼對象來追蹤進度。

您可以藉由檢查程式代碼來查看記憶體使用量的改善。 您不再需要配置集合來儲存所有結果,再加以列舉。 呼叫端可以判斷如何取用結果,以及是否需要記憶體集合。

同時執行入門和已完成的應用程式,而且您可以自行觀察實作之間的差異。 您可以在完成本教學課程之後,刪除您建立的 GitHub 存取令牌。 如果攻擊者取得該令牌的存取權,他們可以使用您的認證來存取 GitHub API。

在本次教學中,您使用異步串流從傳回數據頁面的網路 API 中讀取個別項目。 異步串流也可以讀取「永不中斷的串流」,例如股票行情顯示器或感測器裝置。 呼叫 MoveNextAsync 會在下一個項目可用時立即返回。