Bagikan melalui


pustaka System.Threading.Channels

Namespace System.Threading.Channels menyediakan serangkaian struktur data sinkronisasi untuk meneruskan data antara produsen dan konsumen secara asinkron. Pustaka menargetkan .NET Standard dan berfungsi pada semua implementasi .NET.

Pustaka ini tersedia dalam paket System.Threading.Channels NuGet. Namun, jika Anda menggunakan .NET Core 3.0 atau yang lebih baru, paket disertakan sebagai bagian dari kerangka kerja.

Model pemrograman konseptual konsumen/produsen

Saluran adalah implementasi dari model pemrograman konseptual produser/konsumen. Dalam model pemrograman ini, produsen secara asinkron menghasilkan data, dan konsumen secara asinkron mengonsumsi data tersebut. Dengan kata lain, model ini meneruskan data dari satu pihak ke pihak lain melalui antrean first-in first-out ("FIFO"). Anggap saluran sebagai jenis koleksi umum lainnya, seperti List<T>. Perbedaan utamanya adalah koleksi ini mengelola sinkronisasi dan menyediakan berbagai model konsumsi melalui opsi pembuatan pabrik. Opsi ini mengontrol perilaku saluran, seperti berapa banyak elemen yang diizinkan untuk disimpan dan apa yang terjadi jika batas tersebut tercapai, atau apakah saluran diakses oleh beberapa produsen atau beberapa konsumen secara bersamaan.

Strategi pembatas

Tergantung pada cara pembuatan Channel<T>, pembaca dan penulisnya akan berperilaku berbeda.

Untuk membuat saluran yang menentukan kapasitas maksimum, panggil Channel.CreateBounded. Untuk membuat saluran yang digunakan oleh sejumlah pembaca dan penulis secara bersamaan, hubungi Channel.CreateUnbounded. Setiap strategi pembatas mengekspos berbagai opsi yang ditentukan oleh pembuat, baik BoundedChannelOptions atau UnboundedChannelOptions.

Catatan

Terlepas dari strategi pembatas ChannelClosedException , saluran selalu melempar ketika digunakan setelah ditutup.

Saluran tidak terikat

Untuk membuat saluran yang tidak terbatas, panggil salah satu overload Channel.CreateUnbounded:

var channel = Channel.CreateUnbounded<T>();

Saat Anda membuat saluran yang tidak terbatas, secara default, saluran dapat digunakan oleh sejumlah pembaca dan penulis secara bersamaan. Atau, Anda dapat menentukan perilaku nondefault saat membuat saluran yang tidak terbatas dengan menyediakan UnboundedChannelOptions instans. Kapasitas saluran tidak terbatas dan semua penulisan dilakukan secara sinkron. Untuk contoh selengkapnya, lihat Pola pembuatan yang tidak terbatas.

Saluran terbatas

Untuk membuat saluran terikat, panggil salah satu metode overload Channel.CreateBounded:

var channel = Channel.CreateBounded<T>(7);

Kode sebelumnya membuat saluran yang memiliki kapasitas maksimum sebanyak 7 item. Saat Anda membuat saluran terikat, saluran terikat ke kapasitas maksimum. Ketika batas tercapai, perilaku default adalah saluran secara asinkron memblokir produsen sampai ruang tersedia. Anda dapat mengonfigurasi perilaku ini dengan menentukan opsi saat membuat saluran. Saluran terikat dapat dibuat dengan nilai kapasitas apa pun yang lebih besar dari nol. Untuk contoh lain, lihat Pola penciptaan yang dibatasi.

Perilaku mode penuh

Saat menggunakan saluran terbatas, Anda dapat menentukan perilaku saluran saat batas yang dikonfigurasi tercapai. Tabel berikut ini mencantumkan perilaku mode penuh untuk setiap BoundedChannelFullMode nilai:

Nilai Perilaku
BoundedChannelFullMode.Wait Ini adalah nilai default. Panggilan untuk WriteAsync menunggu hingga ruang tersedia agar dapat menyelesaikan operasi tulis. Panggilan ke TryWrite akan segera mengembalikan false.
BoundedChannelFullMode.DropNewest Menghapus dan mengabaikan item terbaru di saluran untuk memberi ruang bagi item yang sedang ditulis.
BoundedChannelFullMode.DropOldest Menghapus dan mengabaikan item terlama di saluran untuk memberi ruang bagi item yang sedang ditulis.
BoundedChannelFullMode.DropWrite Menghilangkan item yang sedang ditulis.

Penting

Setiap kali Channel<TWrite,TRead>.Writer menghasilkan lebih cepat daripada yang dapat dikonsumsi oleh Channel<TWrite,TRead>.Reader, penulis saluran mengalami tekanan balik.

API Produser

Fungsionalitas produsen diekspos pada Channel<TWrite,TRead>.Writer. API produsen dan perilaku yang diharapkan dirinci dalam tabel berikut:

API Perilaku yang diperkirakan
ChannelWriter<T>.Complete Menandai saluran sebagai selesai, yang berarti tidak ada lagi item yang ditulis ke saluran tersebut.
ChannelWriter<T>.TryComplete Mencoba menandai saluran sebagai selesai, yang berarti tidak ada lagi data yang ditulis ke saluran tersebut.
ChannelWriter<T>.TryWrite Mencoba untuk menuliskan item yang ditentukan ke dalam saluran. Ketika digunakan dengan saluran yang tidak terbatas, nilai ini akan selalu kembali true kecuali jika pengirim saluran memberi sinyal penyelesaian dengan ChannelWriter<T>.Complete, atau ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Mengembalikan sebuah ValueTask<TResult> yang berfungsi ketika terdapat ruang tersedia untuk menulis item.
ChannelWriter<T>.WriteAsync Menuliskan suatu item secara asinkron ke dalam saluran.

Consumer API

Fungsionalitas konsumen diekspos pada Channel<TWrite,TRead>.Reader. API konsumen dan perilaku yang diharapkan dirinci dalam tabel berikut:

API Perilaku yang diperkirakan
ChannelReader<T>.ReadAllAsync Membuat IAsyncEnumerable<T> yang memungkinkan pembacaan semua data dari saluran.
ChannelReader<T>.ReadAsync Secara asinkron membaca item dari saluran.
ChannelReader<T>.TryPeek Mencoba melihat item dari saluran.
ChannelReader<T>.TryRead Upaya untuk membaca item dari kanal.
ChannelReader<T>.WaitToReadAsync Mengembalikan sebuah ValueTask<TResult> yang selesai ketika data siap untuk dibaca.

Pola penggunaan umum

Ada beberapa pola penggunaan untuk saluran. API dirancang agar sederhana, konsisten, dan fleksibel mungkin. Semua metode asinkron mengembalikan ValueTask (atau ValueTask<bool>) yang mewakili operasi asinkron ringan yang dapat menghindari alokasi jika operasi selesai secara sinkron dan berpotensi bahkan secara asinkron. Selain itu, API dirancang agar dapat dikomposisikan, karena pembuat saluran membuat janji tentang penggunaan yang dimaksudkan. Ketika saluran dibuat dengan parameter tertentu, implementasi internal dapat beroperasi lebih efisien mengetahui janji-janji ini.

Pola pembuatan

Bayangkan Anda membuat solusi produsen/konsumen untuk sistem posisi global (GPS). Anda ingin melacak koordinat perangkat dari waktu ke waktu. Objek koordinat sampel mungkin terlihat seperti ini:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Pola pembuatan yang tidak terikat

Salah satu pola penggunaan umum adalah membuat saluran tidak terikat default:

var channel = Channel.CreateUnbounded<Coordinates>();

Namun, mari kita bayangkan bahwa Anda ingin membuat saluran yang tidak terbatas dengan beberapa produsen dan konsumen:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Dalam kasus ini, semua penulisan dilakukan secara sinkron, termasuk WriteAsync. Ini karena saluran yang tidak terbatas selalu memiliki ruang yang tersedia untuk penulisan secara efektif segera. Namun, dengan AllowSynchronousContinuations diatur ke true, proses penulisan mungkin akhirnya melakukan pekerjaan yang berhubungan dengan pembaca dengan melanjutkan eksekusi. Ini tidak memengaruhi sinkronisasi operasi.

Pola pembuatan terbatas

Dengan saluran terikat, konfigurasi saluran harus diketahui oleh konsumen untuk membantu memastikan konsumsi yang tepat. Artinya, konsumen harus tahu perilaku apa yang ditunjukkan saluran ketika batas yang dikonfigurasi tercapai. Mari kita jelajahi beberapa pola pembuatan terikat umum.

Cara paling sederhana untuk membuat saluran terikat adalah dengan menentukan kapasitas:

var channel = Channel.CreateBounded<Coordinates>(1);

Kode sebelumnya membuat saluran terikat dengan kapasitas 1maksimum . Opsi lain tersedia, beberapa opsi sama dengan saluran yang tidak terbatas, sementara yang lain khusus untuk saluran yang tidak terbatas:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Dalam kode sebelumnya, saluran dibuat sebagai saluran terikat yang dibatasi hingga 1.000 item, dengan satu penulis tetapi banyak pembaca. Perilaku mode penuhnya didefinisikan sebagai DropWrite, yang berarti bahwa item yang sedang ditulis akan dibuang jika saluran penuh.

Untuk mengamati item yang dihilangkan saat menggunakan saluran terikat, daftarkan itemDropped panggilan balik:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Setiap kali saluran penuh dan item baru ditambahkan, itemDropped panggilan balik dipanggil. Dalam contoh ini, panggilan balik yang disediakan menulis item ke konsol, tetapi Anda bebas untuk mengambil tindakan lain yang Anda inginkan.

Pola produsen

Bayangkan bahwa produser dalam skenario ini menulis koordinat baru ke saluran. Produser dapat melakukan ini dengan memanggil TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Kode produsen sebelumnya:

  • Channel<Coordinates>.Writer menerima ChannelWriter<Coordinates> sebagai argumen, bersamaan dengan Coordinates awal.
  • Menentukan perulangan bersyarat while yang mencoba memindahkan koordinat menggunakan TryWrite.

Produsen alternatif mungkin menggunakan metode :WriteAsync

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Sekali lagi, Channel<Coordinates>.Writer digunakan dalam perulangan while . Tetapi kali ini, metode WriteAsync dipanggil. Metode berlanjut hanya setelah koordinat ditulis. Ketika loop while berakhir, panggilan ke Complete dilakukan, yang menandakan bahwa tidak ada lagi data yang ditulis ke kanal.

Pola produsen lain adalah menggunakan metode WaitToWriteAsync; pertimbangkan kode berikut:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Sebagai bagian dari kondisi while, hasil panggilan WaitToWriteAsync digunakan untuk menentukan apakah perulangan akan dilanjutkan.

Pola konsumen

Ada beberapa pola konsumen saluran yang umum. Ketika saluran tidak pernah berakhir, yang berarti menghasilkan data tanpa batas waktu, konsumen dapat menggunakan perulangan while (true) , dan membaca data saat tersedia:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Catatan

Kode ini melemparkan pengecualian jika saluran ditutup.

Pengguna alternatif dapat menghindari kekhawatiran ini dengan menggunakan perulangan while berlapis, seperti yang ditunjukkan dalam kode berikut:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Dalam kode sebelumnya, konsumen menunggu untuk membaca data. Setelah data tersedia, konsumen mencoba membacanya. Perulangan ini terus mengevaluasi sampai produsen saluran memberi sinyal bahwa ia tidak lagi memiliki data untuk dibaca. Dengan demikian, ketika produsen diketahui memiliki jumlah item terbatas yang dihasilkannya dan memberi sinyal penyelesaian, konsumen dapat menggunakan semantik await foreach untuk mengiterasi item tersebut.

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Kode sebelumnya menggunakan ReadAllAsync metode untuk membaca semua koordinat dari saluran.

Lihat juga