Bagikan melalui


Tutorial: Membuat dan mengonsumsi aliran asinkron menggunakan C# dan .NET

Aliran asinkron memodelkan sumber data streaming. Aliran data sering mengambil atau menghasilkan elemen secara asinkron. Mereka menyediakan model pemrograman alami untuk sumber data streaming asinkron.

Dalam tutorial ini, Anda akan mempelajari cara:

  • Buat sumber data yang menghasilkan urutan elemen data secara asinkron.
  • Konsumsi sumber data tersebut secara asinkron.
  • Mendukung pembatalan dan konteks yang diambil untuk aliran asinkron.
  • Kenali kapan antarmuka dan sumber data baru lebih disukai daripada urutan data sinkron sebelumnya.

Prasyarat

Anda harus menyiapkan komputer untuk menjalankan .NET, termasuk pengkompilasi C#. Pengkompilasi C# tersedia dengan Visual Studio 2022 atau .NET SDK.

Anda harus membuat token akses GitHub sehingga Anda dapat mengakses titik akhir GitHub GraphQL. Pilih izin berikut untuk Token Akses GitHub Anda:

  • repo:status
  • repo_publik

Simpan token akses di tempat yang aman sehingga Anda dapat menggunakannya untuk mendapatkan akses ke titik akhir GITHub API.

Peringatan

Jaga keamanan token akses pribadi Anda. Perangkat lunak apa pun dengan token akses pribadi Anda dapat melakukan panggilan API GitHub menggunakan hak akses Anda.

Tutorial ini mengasumsikan Anda terbiasa dengan C# dan .NET, termasuk Visual Studio atau .NET CLI.

Jalankan aplikasi pemula

Anda bisa mendapatkan kode untuk aplikasi starter yang digunakan dalam tutorial ini dari repositori dotnet/docs di folder asynchronous-programming/snippets.

Aplikasi awal adalah aplikasi konsol yang menggunakan antarmuka GitHub GraphQL untuk mengambil isu terbaru di repositori dotnet/docs. Mulailah dengan melihat kode berikut untuk metode aplikasi awal Main.

static async Task Main(string[] args)
{
    //Follow these steps to create a GitHub Access Token
    // https://help.github.com/articles/creating-a-personal-access-token-for-the-command-line/#creating-a-token
    //Select the following permissions for your GitHub Access Token:
    // - repo:status
    // - public_repo
    // Replace the 3rd parameter to the following code with your GitHub access token.
    var key = GetEnvVariable("GitHubKey",
    "You must store your GitHub key in the 'GitHubKey' environment variable",
    "");

    var client = new GitHubClient(new Octokit.ProductHeaderValue("IssueQueryDemo"))
    {
        Credentials = new Octokit.Credentials(key)
    };

    var progressReporter = new progressStatus((num) =>
    {
        Console.WriteLine($"Received {num} issues in total");
    });
    CancellationTokenSource cancellationSource = new CancellationTokenSource();

    try
    {
        var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
            cancellationSource.Token, progressReporter);
        foreach(var issue in results)
            Console.WriteLine(issue);
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Work has been cancelled");
    }
}

Anda dapat mengatur GitHubKey variabel lingkungan ke token akses pribadi Anda, atau Anda dapat mengganti argumen terakhir dalam panggilan ke GetEnvVariable dengan token akses pribadi Anda. Jangan letakkan kode akses Anda dalam kode sumber jika Anda akan berbagi sumber dengan orang lain. Jangan pernah mengunggah kode akses ke repositori sumber bersama.

Setelah membuat klien GitHub, kode dalam Main membuat objek pelaporan kemajuan dan token pembatalan. Setelah objek tersebut dibuat, Main memanggil RunPagedQueryAsync untuk mengambil 250 masalah yang paling baru dibuat. Setelah tugas selesai, hasilnya ditampilkan.

Saat menjalankan aplikasi starter, Anda dapat membuat beberapa pengamatan penting tentang cara aplikasi ini berjalan. Anda akan melihat kemajuan yang dilaporkan untuk setiap halaman yang dikembalikan dari GitHub. Anda dapat mengamati jeda yang nyata sebelum GitHub mengembalikan setiap halaman masalah baru. Akhirnya, masalah ditampilkan hanya setelah semua 10 halaman diambil dari GitHub.

Meninjau pelaksanaan

Implementasi mengungkapkan mengapa Anda mengamati perilaku yang dibahas di bagian sebelumnya. Periksa kode untuk RunPagedQueryAsync:

private static async Task<JArray> RunPagedQueryAsync(GitHubClient client, string queryText, string repoName, CancellationToken cancel, IProgress<int> progress)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    JArray finalResults = new JArray();
    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();
        finalResults.Merge(issues(results)["nodes"]!);
        progress?.Report(issuesReturned);
        cancel.ThrowIfCancellationRequested();
    }
    return finalResults;

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Hal pertama yang dilakukan metode ini adalah membuat objek POST, menggunakan GraphQLRequest kelas :

public class GraphQLRequest
{
    [JsonProperty("query")]
    public string? Query { get; set; }

    [JsonProperty("variables")]
    public IDictionary<string, object> Variables { get; } = new Dictionary<string, object>();

    public string ToJsonText() =>
        JsonConvert.SerializeObject(this);
}

yang membantu membentuk isi objek POST dan mengonversinya dengan benar ke JSON yang disajikan sebagai string tunggal menggunakan metode ToJsonText, yang menghapus semua karakter baris baru dari isi permintaan Anda dan menggantinya dengan karakter escape \ (garis miring terbalik).

Mari kita berkonsentrasi pada algoritma halaman dan struktur asinkron dari kode sebelumnya. (Anda dapat berkonsultasi dengan dokumentasi GitHub GraphQL untuk detail tentang GitHub GraphQL API.) Metode ini RunPagedQueryAsync menghitung masalah dari yang terbaru hingga terlama. Ini meminta 25 masalah per halaman dan memeriksa pageInfo struktur respons untuk melanjutkan dengan halaman sebelumnya. Hal ini mengikuti dukungan penomoran halaman standar GraphQL untuk respons multi-halaman. Respons menyertakan objek pageInfo yang mencakup nilai hasPreviousPages dan nilai startCursor yang digunakan untuk meminta halaman sebelumnya. Masalahnya ada dalam nodes array. Metode ini RunPagedQueryAsync menambahkan simpul-simpul ini ke array yang berisi semua hasil dari semua halaman.

Setelah mengambil dan memulihkan halaman hasil, RunPagedQueryAsync melaporkan kemajuan dan memeriksa pembatalan. Jika pembatalan telah diminta, RunPagedQueryAsync akan melemparkan OperationCanceledException.

Ada beberapa elemen dalam kode ini yang dapat ditingkatkan. Yang terpenting, RunPagedQueryAsync harus mengalokasikan penyimpanan untuk semua masalah yang dikembalikan. Sampel ini berhenti pada 250 masalah karena mengambil semua masalah yang terbuka akan membutuhkan lebih banyak memori untuk menyimpan semua masalah yang diambil. Protokol untuk mendukung laporan kemajuan dan pembatalan membuat algoritma lebih sulit dipahami pada pembacaan pertamanya. Lebih banyak jenis dan API terlibat. Anda harus melacak komunikasi melalui CancellationTokenSource dan CancellationToken yang terkait untuk memahami di mana pembatalan diminta dan di mana pembatalan tersebut diberikan.

Aliran asinkron memberikan cara yang lebih baik

Aliran asinkron dan dukungan bahasa terkait mengatasi semua kekhawatiran tersebut. Kode yang menghasilkan urutan sekarang dapat digunakan yield return untuk mengembalikan elemen dalam metode yang dideklarasikan dengan pengubah async . Anda dapat mengonsumsi aliran asinkron dengan perulangan await foreach sama seperti mengonsumsi urutan apa pun dengan perulangan foreach.

Fitur bahasa baru ini bergantung pada tiga antarmuka baru yang ditambahkan ke .NET Standard 2.1 dan diimplementasikan dalam .NET Core 3.0:

Ketiga antarmuka ini harus akrab bagi sebagian besar pengembang C#. Mereka berulah dengan cara yang mirip dengan rekan-rekan sinkron mereka:

Salah satu jenis yang mungkin tidak dikenal adalah System.Threading.Tasks.ValueTask. Struktur ValueTask ini menyediakan API serupa dengan System.Threading.Tasks.Task kelas . ValueTask digunakan dalam antarmuka ini karena alasan performa.

Mengonversi ke aliran asinkron

Selanjutnya, konversikan RunPagedQueryAsync metode untuk menghasilkan aliran asinkron. Pertama, ubah tanda tangan RunPagedQueryAsync untuk mengembalikan IAsyncEnumerable<JToken>, dan hapus token pembatalan dan objek kemajuan dari daftar parameter seperti yang ditunjukkan dalam kode berikut:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)

Kode pemula memproses setiap halaman saat halaman diambil, seperti yang ditunjukkan dalam kode berikut:

finalResults.Merge(issues(results)["nodes"]!);
progress?.Report(issuesReturned);
cancel.ThrowIfCancellationRequested();

Ganti tiga baris tersebut dengan kode berikut:

foreach (JObject issue in issues(results)["nodes"]!)
    yield return issue;

Anda juga dapat menghapus deklarasi finalResults yang dinyatakan lebih awal dalam metode ini dan pernyataan return yang muncul setelah perulangan yang Anda ubah.

Anda telah menyelesaikan perubahan untuk menghasilkan aliran asinkron. Metode yang sudah selesai harus menyerupai kode berikut:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Selanjutnya, Anda mengubah kode yang memanfaatkan koleksi untuk menggunakan aliran asinkron. Temukan kode berikut dalam Main yang memproses kumpulan isu:

var progressReporter = new progressStatus((num) =>
{
    Console.WriteLine($"Received {num} issues in total");
});
CancellationTokenSource cancellationSource = new CancellationTokenSource();

try
{
    var results = await RunPagedQueryAsync(client, PagedIssueQuery, "docs",
        cancellationSource.Token, progressReporter);
    foreach(var issue in results)
        Console.WriteLine(issue);
}
catch (OperationCanceledException)
{
    Console.WriteLine("Work has been cancelled");
}

Ganti kode tersebut dengan perulangan berikut await foreach :

int num = 0;
await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs"))
{
    Console.WriteLine(issue);
    Console.WriteLine($"Received {++num} issues in total");
}

Antarmuka IAsyncEnumerator<T> baru berasal dari IAsyncDisposable. Itu berarti perulangan sebelumnya akan menghapus aliran secara asinkron ketika perulangan selesai. Anda dapat membayangkan loop terlihat seperti kode berikut:

int num = 0;
var enumerator = RunPagedQueryAsync(client, PagedIssueQuery, "docs").GetAsyncEnumerator();
try
{
    while (await enumerator.MoveNextAsync())
    {
        var issue = enumerator.Current;
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
} finally
{
    if (enumerator != null)
        await enumerator.DisposeAsync();
}

Secara default, elemen stream diproses dalam konteks yang diambil. Jika Anda ingin menonaktifkan pengambilan konteks, gunakan TaskAsyncEnumerableExtensions.ConfigureAwait metode ekstensi. Untuk informasi selengkapnya tentang konteks sinkronisasi dan menangkap konteks saat ini, lihat artikel tentang menggunakan pola asinkron berbasis Tugas.

Aliran asinkron mendukung pembatalan menggunakan protokol yang sama dengan metode lain async . Anda akan mengubah tanda tangan untuk metode iterator asinkron sebagai berikut untuk mendukung pembatalan:

private static async IAsyncEnumerable<JToken> RunPagedQueryAsync(GitHubClient client,
    string queryText, string repoName, [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    var issueAndPRQuery = new GraphQLRequest
    {
        Query = queryText
    };
    issueAndPRQuery.Variables["repo_name"] = repoName;

    bool hasMorePages = true;
    int pagesReturned = 0;
    int issuesReturned = 0;

    // Stop with 10 pages, because these are large repos:
    while (hasMorePages && (pagesReturned++ < 10))
    {
        var postBody = issueAndPRQuery.ToJsonText();
        var response = await client.Connection.Post<string>(new Uri("https://api.github.com/graphql"),
            postBody, "application/json", "application/json");

        JObject results = JObject.Parse(response.HttpResponse.Body.ToString()!);

        int totalCount = (int)issues(results)["totalCount"]!;
        hasMorePages = (bool)pageInfo(results)["hasPreviousPage"]!;
        issueAndPRQuery.Variables["start_cursor"] = pageInfo(results)["startCursor"]!.ToString();
        issuesReturned += issues(results)["nodes"]!.Count();

        foreach (JObject issue in issues(results)["nodes"]!)
            yield return issue;
    }

    JObject issues(JObject result) => (JObject)result["data"]!["repository"]!["issues"]!;
    JObject pageInfo(JObject result) => (JObject)issues(result)["pageInfo"]!;
}

Atribut System.Runtime.CompilerServices.EnumeratorCancellationAttribute menyebabkan pengkompilasi menghasilkan kode untuk IAsyncEnumerator<T> yang membuat token diteruskan ke GetAsyncEnumerator terlihat oleh isi iterator asinkron sebagai argumen tersebut. Di dalam runQueryAsync, Anda dapat memeriksa status token dan membatalkan pekerjaan lebih lanjut jika diminta.

Anda menggunakan metode ekstensi lain, WithCancellation, untuk meneruskan token pembatalan ke aliran asinkron. Anda akan memodifikasi perulangan yang menghitung masalah sebagai berikut:

private static async Task EnumerateWithCancellation(GitHubClient client)
{
    int num = 0;
    var cancellation = new CancellationTokenSource();
    await foreach (var issue in RunPagedQueryAsync(client, PagedIssueQuery, "docs")
        .WithCancellation(cancellation.Token))
    {
        Console.WriteLine(issue);
        Console.WriteLine($"Received {++num} issues in total");
    }
}

Anda bisa mendapatkan kode untuk tutorial yang sudah selesai dari repositori dotnet/docs di folder asinkron-pemrograman/cuplikan .

Jalankan aplikasi yang sudah selesai

Jalankan aplikasi lagi. Bandingkan perilakunya dengan perilaku aplikasi awal. Halaman pertama hasil dijumlahkan segera setelah tersedia. Ada jeda yang dapat diamati karena setiap halaman baru diminta dan diambil, maka hasil halaman berikutnya dengan cepat dijumlahkan. Blok try / catch tidak diperlukan untuk menangani pembatalan: pemanggil dapat berhenti menghitung koleksi. Kemajuan dilaporkan dengan jelas karena aliran asinkron menghasilkan hasil saat setiap halaman diunduh. Status untuk setiap masalah yang dikembalikan disertakan dengan mulus dalam perulangan await foreach . Anda tidak memerlukan objek panggilan balik untuk melacak kemajuan.

Anda dapat melihat peningkatan dalam penggunaan memori dengan memeriksa kode. Anda tidak perlu lagi mengalokasikan koleksi untuk menyimpan semua hasil sebelum dijumlahkan. Pemanggil dapat menentukan cara menggunakan hasilnya dan apakah koleksi penyimpanan diperlukan.

Jalankan aplikasi starter dan selesai dan Anda dapat mengamati perbedaan antara implementasi untuk diri Anda sendiri. Anda dapat menghapus token akses GitHub yang Anda buat saat memulai tutorial ini setelah selesai. Jika penyerang mendapatkan akses ke token tersebut, mereka dapat mengakses API GitHub menggunakan kredensial Anda.

Dalam tutorial ini, Anda menggunakan aliran asinkron untuk membaca item individual dari API jaringan yang mengembalikan halaman data. Aliran asinkron juga dapat membaca dari "aliran tak berujung" seperti papan harga saham, atau perangkat sensor. Panggilan ke MoveNextAsync mengembalikan item berikutnya begitu tersedia.