チュートリアル: C# と .NET を使用して非同期ストリームを生成および使用する

非同期ストリームでは、データのストリーミング ソースをモデル化します。 データ ストリームでは、多くの場合、要素を非同期で取得または生成します。 非同期でストリーミングするデータ ソースにとって自然なプログラミング モデルが与えられます。

このチュートリアルでは、次の作業を行う方法について説明します。

  • データ要素のシーケンスを非同期で生成するデータ ソースを作成します。
  • そのデータ ソースを非同期で使用します。
  • 非同期ストリームのキャンセルとキャプチャされたコンテキストをサポートします。
  • 新しいインターフェイスとデータ ソースが以前の同期データ シーケンスより優先される場合を認識します。

必須コンポーネント

C# コンパイラを含め、.NET を実行するようにコンピューターを設定する必要があります。 C# コンパイラは、Visual Studio 2022 または .NET SDK で使用できます。

GitHub アクセス トークンを作成して、GitHub GraphQL エンドポイントにアクセスできるようにする必要があります。 GitHub アクセス トークンに対して次のアクセス許可を選択します。

  • repo:status
  • public_repo

アクセス トークンを安全な場所に保存して、GitHub API エンドポイントへのアクセス権を得るために使用できるようにします。

警告

自分の個人用アクセス トークンをセキュリティで保護します。 個人用アクセス トークンを使用するソフトウェアでは、ユーザーのアクセス権を使用して GitHub API 呼び出しが行われる可能性があります。

このチュートリアルでは、C# と .NET (Visual Studio または .NET CLI のいずれかを含む) に精通していることを前提としています。

初期アプリケーションを実行する

このチュートリアルで使用される初期アプリケーションのコードは、asynchronous-programming/snippets フォルダー内の dotnet/docs リポジトリから取得できます。

初期アプリケーションは、GitHub GraphQL インターフェイスを使用して、dotnet/docs リポジトリに書き込まれた最近の問題を取得するコンソール アプリケーションです。 まず、初期アプリの Main メソッドについて次のコードを参照します。

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

GitHubKey 環境変数を自分の個人用アクセス トークンに設定するか、GetEnvVariable への呼び出しの最後の引数を自分の個人用アクセス トークンで置き換えることができます。 ソースを他人と共有する場合、自分のアクセス コードをソース コードに置かないでください。 共有ソース リポジトリにアクセス コードをアップロードしないでください。

GitHub クライアントの作成後に、Main 内のコードによって進行状況レポート オブジェクトとキャンセル トークンが作成されます。 これらのオブジェクトが作成されると、Main によって RunPagedQueryAsync が呼び出されて、作成された直近 250 件の問題が取得されます。 そのタスクが完了すると、結果が表示されます。

初期アプリケーションを実行するときに、このアプリケーションの実行方法についていくつかの重要な観察を行うことができます。 GitHub から返される各ページについてレポートされる進行状況を確認します。 問題の新しい各ページが GitHub から返される前に、注目すべき一時停止を観察できます。 最後に、問題は GitHub から 10 ページすべてが取得された後にのみ表示されます。

実装を調べる

実装では、前のセクションで説明した動作を観察した理由が明らかになります。 RunPagedQueryAsync のコードを調べます:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

このメソッドでは最初に、GraphQLRequest クラスを使って POST オブジェクトを作成します。

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

これは、POST オブジェクト本体を作成し、ToJsonText メソッドで単一の文字列として表される JSON に正しく変換するのに役立ちます。これにより、要求本文から改行文字がすべて削除され、\ (円記号) エスケープ文字でマークされます。

上記のコードのページング アルゴリズムと非同期構造体に注目してみましょう。 (GitHub GraphQL API について詳しくは、GitHub GraphQL のドキュメントをご覧ください。)RunPagedQueryAsync メソッドでは、問題が新しい順に列挙されます。 1 ページあたり 25 件の問題を要求し、応答の pageInfo 構造体を調べて前のページに進みます。 次の GraphQL の標準的なページングでは、複数ページの応答がサポートされます。 応答には、前のページを要求するために使用される hasPreviousPages 値と startCursor 値を含む pageInfo オブジェクトが含まれています。 問題は nodes 配列内にあります。 RunPagedQueryAsync メソッドでは、すべてのページからのすべての結果を格納する配列にこれらのノードが追加されます。

結果のページを取得し、復元した後、RunPagedQueryAsync によって進行状況がレポートされ、キャンセルがチェックされます。 キャンセルが要求されている場合は、RunPagedQueryAsync によって OperationCanceledException がスローされます。

このコードには、改善できる要素がいくつかあります。 最も重要なものとして、RunPagedQueryAsync では返されるすべての問題に記憶域を割り当てる必要があります。 未解決の問題をすべて取得するには、取得したすべての問題を格納するためにはるかに多くのメモリが必要になるので、このサンプルは 250 件の問題で停止します。 進行状況レポートとキャンセルをサポートするためのプロトコルが原因で、アルゴリズムを初読で理解することが難しくなります。 関連する型や API の数が増えます。 CancellationTokenSource とそれに関連付けられた CancellationToken を通じた通信をトレースして、キャンセルが要求されている場所と、付与されている場所を理解する必要があります。

非同期ストリームではより優れた方法が提供される

非同期ストリームおよび関連付けられている言語サポートは、これらすべての問題に対処します。 シーケンスを生成するコードでは、yield return を使用して、async 修飾子で宣言されたメソッド内で要素を返せるようになりました。 foreach ループを使用してシーケンスを使用する場合と同様に、await foreach ループを使用して非同期ストリームを使用することができます。

これらの新しい言語機能は、.NET Standard 2.1 に追加され、.NET Core 3.0 で実装された 3 つの新しいインターフェイスに依存します。

この 3 つのインターフェイスは、ほとんどの C# 開発者にとって見慣れたものです。 これらは、同期版と同じように動作します。

見慣れない可能性のある 1 つの型は System.Threading.Tasks.ValueTask です。 ValueTask 構造体では、System.Threading.Tasks.Task クラスと同様の API が提供されます。 パフォーマンス上の理由から、これらのインターフェイスでは ValueTask が使用されます。

非同期ストリームに変換する

次に、RunPagedQueryAsync メソッドを変換して非同期ストリームを生成します。 まず、IAsyncEnumerable<JToken> を返すように RunPagedQueryAsync のシグネチャを変更し、次のコードに示すように、パラメーター リストからキャンセル トークンと進行状況オブジェクトを削除します。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

次のコードに示すように、初期コードではページが取得されると各ページが処理されます。

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

この 3 行を次のコードに置き換えます。

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

このメソッドの前の方にある finalResults の宣言と、変更したループに続く return ステートメントを削除することもできます。

非同期ストリームを生成するための変更が完了しました。 完了したメソッドは次のコードのようになります。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

次に、コレクションを使用するコードを変更して、非同期ストリームを使用するようにします。 Main 内で、問題のコレクションを処理する次のコードを探します。

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

このコードを次の await foreach ループに置き換えます。

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

新しいインターフェイス IAsyncEnumerator<T>IAsyncDisposable から派生します。 つまり、前のループは、ループの完了時、ストリームを非同期で破棄します。 ループは次のコードのようになります。

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

既定では、ストリーム要素はキャプチャされたコンテキストで処理されます。 コンテキストのキャプチャを無効にする場合は、TaskAsyncEnumerableExtensions.ConfigureAwait 拡張メソッドを使用します。 同期コンテキストについて、および現在のコンテキストのキャプチャについての詳細は、「タスク ベースの非同期パターンの利用」を参照してください。

非同期ストリームでは、他の async メソッドと同じプロトコルを利用してキャンセルできます。 キャンセルをサポートするよう、非同期反復子メソッドのシグネチャを次のように変更します。

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

System.Runtime.CompilerServices.EnumeratorCancellationAttribute 属性によってコンパイラは IAsyncEnumerator<T> のコードを生成します。このコードは、GetAsyncEnumerator に渡されたトークンを非同期反復子の本文にその引数として表示します。 runQueryAsync の中では、トークンの状態を調べたり、要求されたら、後続の処理をキャンセルしたりできます。

キャンセル トークンを非同期ストリームに渡すには、別の拡張メソッド WithCancellation を使用します。 問題を列挙するループを次のように変更します。

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

終了したチュートリアルのコードは、asynchronous-programming/snippets フォルダー内の dotnet/docs リポジトリから取得できます。

完成したアプリケーションを実行する

アプリケーションをもう一度実行します。 その動作を初期アプリケーションの動作と比較します。 結果の最初のページは、使用可能になるとすぐに列挙されます。 新しい各ページが要求され、取得されるときに観測可能な一時停止があり、次のページの結果がすぐに列挙されます。 try / catch ブロックではキャンセルを処理する必要はありません。呼び出し元がコレクションの列挙を停止できます。 各ページがダウンロードされるときに非同期ストリームによって結果が生成されるので、進行状況が明確にレポートされます。 返される各問題の状態は、await foreach ループにシームレスに含まれます。 進行状況を追跡するためにコールバック オブジェクトは必要ありません。

コードを調べることで、メモリの改善を確認できます。 すべての結果が列挙される前にそれらを格納するコレクションを割り当てる必要がなくなります。 呼び出し元では、結果を使用する方法、および記憶域のコレクションが必要かどうかを判断できます。

初期アプリケーションと完成したアプリケーションの両方を実行し、実装間の違いを自分で確認できます。 完了後に、このチュートリアルの開始時に作成した GitHub アクセス トークンを削除することができます。 攻撃者は、そのトークンへのアクセス権を獲得すると、ユーザーの資格情報を使用して GitHub API にアクセスできます。

このチュートリアルでは、非同期ストリームを使って、データのページを返すネットワーク API から個々の項目を読み取りました。 非同期ストリームは、株式情報やセンサー デバイスのような "終わりのないストリーム" からも読み取ることができます。 MoveNextAsync の呼び出しは、次の項目が使用可能になり次第、それを返します。