Tutorial: Membuat dan mengonsumsi aliran asinkron menggunakan C# dan .NET

Aliran asinkron memodelkan sumber data streaming. Aliran sering mengambil atau membuat elemen secara asinkron. Menyediakan model pemrograman alami untuk sumber data streaming asinkron.

Dalam tutorial ini, Anda akan mempelajari cara:

  • Membuat sumber data yang menghasilkan urutan elemen data secara asinkron.
  • Memakai sumber data tersebut secara asinkron.
  • Mendukung pembatalan dan konteks yang diambil untuk aliran asinkron.
  • Mengenali kapan antarmuka dan sumber data baru lebih disukai daripada urutan data sinkron sebelumnya.

Prasyarat

Anda harus menyiapkan komputer untuk menjalankan .NET, termasuk pengkompilasi C#. Pengkompilasi C# tersedia dengan Visual Studio 2022 atau .NET SDK.

Anda harus membuat token akses GitHub sehingga Anda dapat mengakses titik akhir GitHub GraphQL. Memilih izin berikut untuk Token Akses GitHub Anda:

  • repositori:status
  • public_repo

Simpan token akses di tempat yang aman sehingga Anda dapat menggunakannya untuk mendapatkan akses ke titik akhir API GitHub.

Peringatan

Jaga keamanan token akses pribadi Anda. Perangkat lunak apa pun dengan token akses pribadi dapat melakukan panggilan API GitHub dengan menggunakan hak akses.

Tutorial ini mengasumsikan Anda terbiasa dengan C# dan .NET, termasuk Visual Studio atau .NET CLI.

Menjalankan aplikasi starter

Anda bisa mendapatkan kode untuk aplikasi pemula yang digunakan dalam tutorial ini dari repositori dotnet/docs di folder asinkron-pemrograman/cuplikan .

Aplikasi starter adalah aplikasi konsol yang menggunakan antarmuka GitHub GraphQL untuk mengambil masalah terbaru yang ditulis dalam repositori dotnet/docs. Mulailah dengan memperhatikan kode berikut untuk metode Main aplikasi starter:

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Anda dapat mengatur variabel lingkungan GitHubKey ke token akses pribadi, atau mengganti argumen terakhir dalam panggilan ke GetEnvVariable dengan token akses pribadi Anda. Jangan letakkan kode akses Anda dalam kode sumber jika Anda akan berbagi sumber tersebut dengan orang lain. Jangan pernah mengunggah kode akses ke repositori sumber bersama.

Setelah membuat klien GitHub, kode dalam Main membuat objek pelaporan kemajuan dan token pembatalan. Setelah objek tersebut dibuat, Main memanggil RunPagedQueryAsync untuk mengambil 250 masalah terbaru yang dibuat. Setelah tugas selesai, hasilnya ditampilkan.

Ketika Anda menjalankan aplikasi starter, Anda dapat membuat beberapa pengamatan penting tentang bagaimana aplikasi ini berjalan. Anda akan melihat kemajuan yang dilaporkan untuk setiap halaman yang dikembalikan dari GitHub. Anda dapat mengamati jeda yang nyata sebelum GitHub mengembalikan setiap halaman baru masalah. Akhirnya, masalah ditampilkan hanya setelah keseluruhan 10 halamannya diambil dari GitHub.

Memeriksa penerapan

Penerapan mengungkapkan alasan Anda mengamati perilaku yang dibahas di bagian sebelumnya. Memeriksa kode untuk RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Hal pertama yang dilakukan metode ini adalah membuat objek POST, menggunakan GraphQLRequest kelas :

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

yang membantu membentuk isi objek POST, dan mengonversinya dengan benar ke JSON yang disajikan sebagai string tunggal dengan ToJsonText metode , yang menghapus semua karakter baris baru dari isi permintaan Anda yang menandainya dengan \ karakter escape (garis miring terbalik).

Mari kita berkonsentrasi pada algoritma halaman dan struktur asinkron dari kode sebelumnya. (Anda dapat berkonsultasi dengan dokumentasi GitHub GraphQL untuk detail tentang GitHub GraphQL API.) Metode RunPagedQueryAsync ini melakukan enumerasi masalah dari yang terbaru hingga terlama. Ia meminta 25 masalah per halaman dan memeriksa struktur pageInfo dari respons untuk melanjutkan dengan halaman sebelumnya. Mengikuti dukungan penomoran standar GraphQL untuk respons multi-halaman. Respons menyertakan objek pageInfo yang menyertakan nilai hasPreviousPages dan nilai startCursor yang digunakan untuk meminta halaman sebelumnya. Masalahnya ada dalam array nodes. Metode RunPagedQueryAsync melampirkan simpul ini ke array yang berisi semua hasil dari keseluruhan halaman.

Setelah mengambil dan memulihkan halaman hasil, RunPagedQueryAsync melaporkan kemajuan dan memeriksa pembatalan. Jika pembatalan telah diminta, RunPagedQueryAsync melempar OperationCanceledException.

Ada beberapa elemen dalam kode ini yang dapat ditingkatkan. Yang terpenting, RunPagedQueryAsync harus mengalokasikan penyimpanan untuk semua masalah yang dikembalikan. Sampel ini berhenti pada 250 masalah karena mengambil semua masalah yang terbuka akan membutuhkan lebih banyak memori untuk menyimpan semua masalah yang diambil. Protokol untuk mendukung laporan kemajuan dan pembatalan membuat algoritma lebih sulit dipahami pada pembacaan pertamanya. Lebih banyak jenis dan API yang terlibat. Anda harus melacak komunikasi melalui CancellationTokenSource dan yang terkait dengan CancellationToken untuk memahami kapan pembatalan diminta dan kapan diberikan.

Aliran asinkron memberikan cara yang lebih baik

Aliran asinkron dan dukungan bahasa pemrogram terkait mengatasi semua masalah tersebut. Kode yang membuat urutan sekarang dapat digunakan oleh yield return untuk mengembalikan elemen dalam metode yang dideklarasikan dengan pengubah async. Anda dapat memakai aliran asinkron dengan menggunakan perulangan await foreach sama seperti Anda memakai urutan apa pun menggunakan perulangan foreach.

Fitur bahasa pemrogram baru ini bergantung pada tiga antarmuka baru yang ditambahkan ke .NET Standard 2.1 dan diterapkan dalam .NET Core 3.0:

Ketiga antarmuka ini harus akrab bagi sebagian besar pengembang C#. Ketiganya berperilaku dengan cara yang mirip dengan rekan-rekan sinkron mereka:

Salah satu jenis yang mungkin asing adalah System.Threading.Tasks.ValueTask. Struct ValueTask ini menyediakan API serupa terhadap kelas System.Threading.Tasks.Task. ValueTask digunakan dalam antarmuka ini karena alasan performa.

Mengonversi ke aliran asinkron

Selanjutnya, konversikan metode RunPagedQueryAsync untuk membuat aliran asinkron. Pertama, ubah tanda tangan RunPagedQueryAsync untuk mengembalikan IAsyncEnumerable<JToken>, dan hapus token pembatalan serta objek kemajuan dari daftar parameter seperti yang ditunjukkan dalam kode berikut:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Kode starter memproses setiap halaman saat halaman diambil, seperti yang ditunjukkan dalam kode berikut:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Ganti ketiga baris tersebut dengan kode berikut:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Anda juga dapat menghapus deklarasi finalResults lebih dulu dalam metode ini dan pernyataan return yang mengikuti perulangan yang Anda ubah.

Anda telah menyelesaikan perubahan untuk membuat aliran asinkron. Metode yang sudah selesai harus menyerupai kode berikut:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Selanjutnya, Anda mengubah kode yang memakai koleksi untuk memakai aliran asinkron. Temukan kode berikut dalam Main yang memroses koleksi masalah:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Ganti kode tersebut dengan perulangan await foreach berikut:

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Antarmuka IAsyncEnumerator<T> yang baru berasal dari IAsyncDisposable. Hal ini berarti perulangan sebelumnya akan secara asinkron membuang aliran ketika perulangan selesai. Anda dapat membayangkan perulangan terlihat seperti kode berikut:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Secara default, elemen aliran diproses dalam konteks yang diambil. Jika Anda ingin menonaktifkan pengambilan konteks, gunakan metode ekstensi TaskAsyncEnumerableExtensions.ConfigureAwait. Untuk informasi selengkapnya tentang konteks sinkronisasi dan pengambilan konteks saat ini, lihat artikel di Memakai pola asinkron berbasis Tugas.

Aliran asinkron mendukung pembatalan menggunakan protokol yang sama dengan metode async lainnya. Anda akan mengubah tanda tangan untuk metode iterator asinkron seperti berikut untuk mendukung pembatalan:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Atribut System.Runtime.CompilerServices.EnumeratorCancellationAttribute menyebabkan kompilator membuat kode untuk IAsyncEnumerator<T> yang membuat token diteruskan ke GetAsyncEnumerator agar terlihat oleh isi iterator asinkron sebagai argumen tersebut. Di dalam runQueryAsync, Anda dapat memeriksa status token dan membatalkan pekerjaan lebih lanjut jika diminta.

Anda menggunakan metode ekstensi lain, WithCancellation, untuk meneruskan token pembatalan ke aliran asinkron. Anda akan mengubah perulangan yang menghitung masalah seperti berikut:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Anda bisa mendapatkan kode untuk tutorial yang sudah selesai dari repositori dotnet/docs di folder asinkron-pemrograman/cuplikan .

Jalankan aplikasi yang sudah selesai

Menjalankan aplikasi lagi. Bedakan perilakunya dengan perilaku aplikasi starter. Halaman pertama hasil dienumerasi segera setelah tersedia. Ada jeda yang dapat diamati karena setiap halaman baru diminta dan diambil, maka hasil halaman berikutnya dengan cepat dienumerasi. Blok try / catch tidak diperlukan untuk menangani pembatalan: pemanggil dapat berhenti menghitung koleksi. Kemajuan dilaporkan dengan jelas karena aliran asinkron membuat hasil pada saat setiap halaman diunduh. Status untuk setiap masalah yang dikembalikan disertakan dengan mulus dalam perulangan await foreach. Anda tidak memerlukan objek panggilan balik untuk melacak kemajuan.

Anda dapat melihat peningkatan penggunaan memori dengan memeriksa kode. Anda tidak perlu lagi mengalokasikan koleksi untuk menyimpan semua hasil sebelum dienumerasi. Pemanggil dapat menentukan cara memakai hasilnya dan apakah koleksi penyimpanan diperlukan.

Jalankan aplikasi starter dan aplikasi yang sudah selesai lalu dengannya Anda dapat mengamati perbedaan antara penerapannya untuk diri Anda sendiri. Anda dapat menghapus token akses GitHub yang Anda buat saat memulai tutorial ini setelah selesai. Jika penyerang mendapatkan akses ke token tersebut, mereka dapat mengakses API GitHub menggunakan mandat Anda.

Dalam tutorial ini, Anda menggunakan aliran asinkron untuk membaca item individual dari API jaringan yang mengembalikan halaman data. Aliran asinkron juga dapat membaca dari "aliran yang tidak pernah berakhir" seperti ticker stok, atau perangkat sensor. Panggilan untuk MoveNextAsync mengembalikan item berikutnya segera setelah tersedia.