Bagikan melalui


Pustaka System.Threading.Channels

Namespace System.Threading.Channels menyediakan serangkaian struktur data sinkronisasi untuk meneruskan data antara produsen dan konsumen secara asinkron. Pustaka menargetkan .NET Standard dan berfungsi pada semua implementasi .NET.

Pustaka ini tersedia dalam paket System.Threading.Channels NuGet. Namun, jika Anda menggunakan .NET Core 3.0 atau yang lebih baru, paket disertakan sebagai bagian dari kerangka kerja.

Model pemrograman konseptual produsen/konsumen

Saluran adalah implementasi dari model pemrograman konseptual produsen/konsumen. Dalam model pemrograman ini, produsen secara asinkron menghasilkan data, dan konsumen secara asinkron mengonsumsi data tersebut. Dengan kata lain, model ini menyerahkan data dari satu pihak ke pihak lain. Cobalah untuk memikirkan saluran seperti yang Anda lakukan jenis koleksi umum lainnya, seperti List<T>. Perbedaan utamanya adalah koleksi ini mengelola sinkronisasi dan menyediakan berbagai model konsumsi melalui opsi pembuatan pabrik. Opsi ini mengontrol perilaku saluran, seperti berapa banyak elemen yang diizinkan untuk disimpan dan apa yang terjadi jika batas tersebut tercapai, atau apakah saluran diakses oleh beberapa produsen atau beberapa konsumen secara bersamaan.

Strategi pembatas

Tergantung pada bagaimana dibuat Channel<T> , pembaca dan penulisnya bersifat berbeda.

Untuk membuat saluran yang menentukan kapasitas maksimum, panggil Channel.CreateBounded. Untuk membuat saluran yang digunakan oleh sejumlah pembaca dan penulis secara bersamaan, hubungi Channel.CreateUnbounded. Setiap strategi pembatas mengekspos berbagai opsi yang ditentukan kreator, baik BoundedChannelOptions atau UnboundedChannelOptions masing-masing.

Catatan

Terlepas dari strategi pembatas, saluran akan selalu melempar ChannelClosedException ketika digunakan setelah ditutup.

Saluran tidak terikat

Untuk membuat saluran yang tidak terbatas, panggil salah Channel.CreateUnbounded satu kelebihan beban:

var channel = Channel.CreateUnbounded<T>();

Saat Anda membuat saluran yang tidak terbatas, secara default, saluran dapat digunakan oleh sejumlah pembaca dan penulis secara bersamaan. Atau, Anda dapat menentukan perilaku nondefault saat membuat saluran yang tidak terbatas dengan menyediakan UnboundedChannelOptions instans. Kapasitas saluran tidak terbatas dan semua penulisan dilakukan secara sinkron. Untuk contoh selengkapnya, lihat Pola pembuatan yang tidak terbatas.

Saluran terikat

Untuk membuat saluran terikat, panggil salah Channel.CreateBounded satu kelebihan beban:

var channel = Channel.CreateBounded<T>(7);

Kode sebelumnya membuat saluran yang memiliki kapasitas 7 maksimum item. Saat Anda membuat saluran terikat, saluran terikat ke kapasitas maksimum. Ketika terikat tercapai, perilaku defaultnya adalah bahwa saluran secara asinkron memblokir produsen sampai ruang tersedia. Anda dapat mengonfigurasi perilaku ini dengan menentukan opsi saat membuat saluran. Saluran terikat dapat dibuat dengan nilai kapasitas apa pun yang lebih besar dari nol. Untuk contoh lain, lihat Pola pembuatan terikat.

Perilaku mode penuh

Saat menggunakan saluran terikat, Anda dapat menentukan perilaku yang dipatuhi saluran saat terikat yang dikonfigurasi tercapai. Tabel berikut ini mencantumkan perilaku mode penuh untuk setiap BoundedChannelFullMode nilai:

Nilai Perilaku
BoundedChannelFullMode.Wait Ini adalah nilai default. Panggilan untuk WriteAsync menunggu ruang tersedia untuk menyelesaikan operasi tulis. Panggilan untuk TryWrite segera kembali false .
BoundedChannelFullMode.DropNewest Menghapus dan mengabaikan item terbaru di saluran untuk memberi ruang bagi item yang sedang ditulis.
BoundedChannelFullMode.DropOldest Menghapus dan mengabaikan item terlama di saluran untuk memberi ruang bagi item yang sedang ditulis.
BoundedChannelFullMode.DropWrite Menghilangkan item yang sedang ditulis.

Penting

Channel<TWrite,TRead>.Writer Setiap kali menghasilkan lebih cepat daripada yang Channel<TWrite,TRead>.Reader dapat dikonsumsi, penulis saluran mengalami tekanan balik.

API Produser

Fungsionalitas produsen diekspos pada Channel<TWrite,TRead>.Writer. API produsen dan perilaku yang diharapkan dirinci dalam tabel berikut:

API Perilaku yang diperkirakan
ChannelWriter<T>.Complete Menandai saluran sebagai selesai, yang berarti tidak ada lagi item yang ditulis ke saluran tersebut.
ChannelWriter<T>.TryComplete Mencoba menandai saluran sebagai selesai, yang berarti tidak ada lagi data yang ditulis ke saluran tersebut.
ChannelWriter<T>.TryWrite Mencoba menulis item yang ditentukan ke saluran. Ketika digunakan dengan saluran yang tidak terbatas, ini selalu kembali true kecuali penulis saluran memberi sinyal penyelesaian dengan ChannelWriter<T>.Complete, atau ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Mengembalikan yang ValueTask<TResult> selesai ketika spasi tersedia untuk menulis item.
ChannelWriter<T>.WriteAsync Menulis item secara asinkron ke saluran.

Consumer API

Fungsionalitas konsumen diekspos pada Channel<TWrite,TRead>.Reader. API konsumen dan perilaku yang diharapkan dirinci dalam tabel berikut:

API Perilaku yang diperkirakan
ChannelReader<T>.ReadAllAsync Membuat yang IAsyncEnumerable<T> memungkinkan membaca semua data dari saluran.
ChannelReader<T>.ReadAsync Secara asinkron membaca item dari saluran.
ChannelReader<T>.TryPeek Mencoba mengintip item dari saluran.
ChannelReader<T>.TryRead Mencoba membaca item dari saluran.
ChannelReader<T>.WaitToReadAsync Mengembalikan yang ValueTask<TResult> selesai saat data tersedia untuk dibaca.

Pola penggunaan umum

Ada beberapa pola penggunaan untuk saluran. API dirancang agar sederhana, konsisten, dan fleksibel mungkin. Semua metode asinkron mengembalikan ValueTask (atau ValueTask<bool>) yang mewakili operasi asinkron ringan yang dapat menghindari alokasi jika operasi selesai secara sinkron dan berpotensi bahkan secara asinkron. Selain itu, API dirancang agar dapat dikomposisikan, karena pembuat saluran membuat janji tentang penggunaan yang dimaksudkan. Ketika saluran dibuat dengan parameter tertentu, implementasi internal dapat beroperasi lebih efisien mengetahui janji-janji ini.

Pola pembuatan

Bayangkan Anda membuat solusi produsen/konsumen untuk sistem posisi global (GPS). Anda ingin melacak koordinat perangkat dari waktu ke waktu. Objek koordinat sampel mungkin terlihat seperti ini:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Pola pembuatan yang tidak terikat

Salah satu pola penggunaan umum adalah membuat saluran tidak terikat default:

var channel = Channel.CreateUnbounded<Coordinates>();

Namun, mari kita bayangkan bahwa Anda ingin membuat saluran yang tidak terbatas dengan beberapa produsen dan konsumen:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Dalam hal ini, semua tulisan sinkron, bahkan WriteAsync. Ini karena saluran yang tidak terbatas selalu memiliki ruang yang tersedia untuk penulisan secara efektif segera. Namun, dengan AllowSynchronousContinuations diatur ke true, penulisan mungkin akhirnya melakukan pekerjaan yang terkait dengan pembaca dengan menjalankan kelanjutannya. Ini tidak memengaruhi sinkronisasi operasi.

Pola pembuatan terikat

Dengan saluran terikat, konfigurasi saluran harus diketahui oleh konsumen untuk membantu memastikan konsumsi yang tepat. Artinya, konsumen harus tahu perilaku apa yang dipamerkan saluran ketika terikat yang dikonfigurasi tercapai. Mari kita jelajahi beberapa pola pembuatan terikat umum.

Cara paling sederhana untuk membuat saluran terikat adalah dengan menentukan kapasitas:

var channel = Channel.CreateBounded<Coordinates>(1);

Kode sebelumnya membuat saluran terikat dengan kapasitas 1maksimum . Opsi lain tersedia, beberapa opsi sama dengan saluran yang tidak terbatas, sementara yang lain khusus untuk saluran yang tidak terbatas:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Dalam kode sebelumnya, saluran dibuat sebagai saluran terikat yang dibatasi hingga 1.000 item, dengan satu penulis tetapi banyak pembaca. Perilaku mode penuhnya didefinisikan sebagai DropWrite, yang berarti bahwa ia menjatuhkan item yang ditulis jika saluran penuh.

Untuk mengamati item yang dihilangkan saat menggunakan saluran terikat, daftarkan itemDropped panggilan balik:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Setiap kali saluran penuh dan item baru ditambahkan, itemDropped panggilan balik dipanggil. Dalam contoh ini, panggilan balik yang disediakan menulis item ke konsol, tetapi Anda bebas untuk mengambil tindakan lain yang Anda inginkan.

Pola produsen

Bayangkan bahwa produser dalam skenario ini menulis koordinat baru ke saluran. Produser dapat melakukan ini dengan memanggil TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Kode produsen sebelumnya:

  • Channel<Coordinates>.Writer Menerima (ChannelWriter<Coordinates>) sebagai argumen, bersama dengan awal Coordinates.
  • Menentukan perulangan bersyarat while yang mencoba memindahkan koordinat menggunakan TryWrite.

Produsen alternatif mungkin menggunakan metode :WriteAsync

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Sekali lagi, Channel<Coordinates>.Writer digunakan dalam perulangan while . Tapi kali ini, WriteAsync metode ini dipanggil. Metode akan berlanjut hanya setelah koordinat ditulis. Ketika perulangan while keluar, panggilan ke Complete dilakukan, yang menandakan bahwa tidak ada lagi data yang ditulis ke saluran.

Pola produsen lain adalah menggunakan metode , WaitToWriteAsync pertimbangkan kode berikut:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Sebagai bagian dari kondisi while, hasil WaitToWriteAsync panggilan digunakan untuk menentukan apakah akan melanjutkan perulangan.

Pola konsumen

Ada beberapa pola konsumen saluran umum. Ketika saluran tidak pernah berakhir, yang berarti menghasilkan data tanpa batas waktu, konsumen dapat menggunakan perulangan while (true) , dan membaca data saat tersedia:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Catatan

Kode ini akan melemparkan pengecualian jika saluran ditutup.

Konsumen alternatif dapat menghindari kekhawatiran ini dengan menggunakan perulangan sementara berlapis, seperti yang ditunjukkan dalam kode berikut:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Dalam kode sebelumnya, konsumen menunggu untuk membaca data. Setelah data tersedia, konsumen mencoba membacanya. Perulangan ini terus mengevaluasi sampai produsen saluran memberi sinyal bahwa ia tidak lagi memiliki data untuk dibaca. Dengan demikian, ketika produsen diketahui memiliki jumlah item terbatas yang dihasilkannya dan menandakan penyelesaian, konsumen dapat menggunakan await foreach semantik untuk melakukan iterasi atas item:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Kode sebelumnya menggunakan ReadAllAsync metode untuk membaca semua koordinat dari saluran.

Lihat juga