Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Namespace System.Threading.Channels menyediakan serangkaian struktur data sinkronisasi untuk meneruskan data antara produsen dan konsumen secara asinkron. Pustaka menargetkan .NET Standard dan berfungsi pada semua implementasi .NET.
Pustaka ini tersedia dalam paket System.Threading.Channels NuGet. Namun, jika Anda menggunakan .NET Core 3.0 atau yang lebih baru, paket disertakan sebagai bagian dari kerangka kerja.
Model pemrograman konseptual konsumen/produsen
Saluran adalah implementasi dari model pemrograman konseptual produser/konsumen. Dalam model pemrograman ini, produsen secara asinkron menghasilkan data, dan konsumen secara asinkron mengonsumsi data tersebut. Dengan kata lain, model ini meneruskan data dari satu pihak ke pihak lain melalui antrean first-in first-out ("FIFO"). Anggap saluran sebagai jenis koleksi umum lainnya, seperti List<T>. Perbedaan utamanya adalah koleksi ini mengelola sinkronisasi dan menyediakan berbagai model konsumsi melalui opsi pembuatan pabrik. Opsi ini mengontrol perilaku saluran, seperti berapa banyak elemen yang diizinkan untuk disimpan dan apa yang terjadi jika batas tersebut tercapai, atau apakah saluran diakses oleh beberapa produsen atau beberapa konsumen secara bersamaan.
Strategi pembatas
Tergantung pada cara pembuatan Channel<T>, pembaca dan penulisnya akan berperilaku berbeda.
Untuk membuat saluran yang menentukan kapasitas maksimum, panggil Channel.CreateBounded. Untuk membuat saluran yang digunakan oleh sejumlah pembaca dan penulis secara bersamaan, hubungi Channel.CreateUnbounded. Setiap strategi pembatas mengekspos berbagai opsi yang ditentukan oleh pembuat, baik BoundedChannelOptions atau UnboundedChannelOptions.
Catatan
Terlepas dari strategi pembatas ChannelClosedException , saluran selalu melempar ketika digunakan setelah ditutup.
Saluran tidak terikat
Untuk membuat saluran yang tidak terbatas, panggil salah satu overload Channel.CreateUnbounded:
var channel = Channel.CreateUnbounded<T>();
Saat Anda membuat saluran yang tidak terbatas, secara default, saluran dapat digunakan oleh sejumlah pembaca dan penulis secara bersamaan. Atau, Anda dapat menentukan perilaku nondefault saat membuat saluran yang tidak terbatas dengan menyediakan UnboundedChannelOptions instans. Kapasitas saluran tidak terbatas dan semua penulisan dilakukan secara sinkron. Untuk contoh selengkapnya, lihat Pola pembuatan yang tidak terbatas.
Saluran terbatas
Untuk membuat saluran terikat, panggil salah satu metode overload Channel.CreateBounded:
var channel = Channel.CreateBounded<T>(7);
Kode sebelumnya membuat saluran yang memiliki kapasitas maksimum sebanyak 7 item. Saat Anda membuat saluran terikat, saluran terikat ke kapasitas maksimum. Ketika batas tercapai, perilaku default adalah saluran secara asinkron memblokir produsen sampai ruang tersedia. Anda dapat mengonfigurasi perilaku ini dengan menentukan opsi saat membuat saluran. Saluran terikat dapat dibuat dengan nilai kapasitas apa pun yang lebih besar dari nol. Untuk contoh lain, lihat Pola penciptaan yang dibatasi.
Perilaku mode penuh
Saat menggunakan saluran terbatas, Anda dapat menentukan perilaku saluran saat batas yang dikonfigurasi tercapai. Tabel berikut ini mencantumkan perilaku mode penuh untuk setiap BoundedChannelFullMode nilai:
| Nilai | Perilaku |
|---|---|
| BoundedChannelFullMode.Wait | Ini adalah nilai default. Panggilan untuk WriteAsync menunggu hingga ruang tersedia agar dapat menyelesaikan operasi tulis. Panggilan ke TryWrite akan segera mengembalikan false. |
| BoundedChannelFullMode.DropNewest | Menghapus dan mengabaikan item terbaru di saluran untuk memberi ruang bagi item yang sedang ditulis. |
| BoundedChannelFullMode.DropOldest | Menghapus dan mengabaikan item terlama di saluran untuk memberi ruang bagi item yang sedang ditulis. |
| BoundedChannelFullMode.DropWrite | Menghilangkan item yang sedang ditulis. |
Penting
Setiap kali Channel<TWrite,TRead>.Writer menghasilkan lebih cepat daripada yang dapat dikonsumsi oleh Channel<TWrite,TRead>.Reader, penulis saluran mengalami tekanan balik.
API Produser
Fungsionalitas produsen diekspos pada Channel<TWrite,TRead>.Writer. API produsen dan perilaku yang diharapkan dirinci dalam tabel berikut:
| API | Perilaku yang diperkirakan |
|---|---|
| ChannelWriter<T>.Complete | Menandai saluran sebagai selesai, yang berarti tidak ada lagi item yang ditulis ke saluran tersebut. |
| ChannelWriter<T>.TryComplete | Mencoba menandai saluran sebagai selesai, yang berarti tidak ada lagi data yang ditulis ke saluran tersebut. |
| ChannelWriter<T>.TryWrite | Mencoba untuk menuliskan item yang ditentukan ke dalam saluran. Ketika digunakan dengan saluran yang tidak terbatas, nilai ini akan selalu kembali true kecuali jika pengirim saluran memberi sinyal penyelesaian dengan ChannelWriter<T>.Complete, atau ChannelWriter<T>.TryComplete. |
| ChannelWriter<T>.WaitToWriteAsync | Mengembalikan sebuah ValueTask<TResult> yang berfungsi ketika terdapat ruang tersedia untuk menulis item. |
| ChannelWriter<T>.WriteAsync | Menuliskan suatu item secara asinkron ke dalam saluran. |
Consumer API
Fungsionalitas konsumen diekspos pada Channel<TWrite,TRead>.Reader. API konsumen dan perilaku yang diharapkan dirinci dalam tabel berikut:
| API | Perilaku yang diperkirakan |
|---|---|
| ChannelReader<T>.ReadAllAsync | Membuat IAsyncEnumerable<T> yang memungkinkan pembacaan semua data dari saluran. |
| ChannelReader<T>.ReadAsync | Secara asinkron membaca item dari saluran. |
| ChannelReader<T>.TryPeek | Mencoba melihat item dari saluran. |
| ChannelReader<T>.TryRead | Upaya untuk membaca item dari kanal. |
| ChannelReader<T>.WaitToReadAsync | Mengembalikan sebuah ValueTask<TResult> yang selesai ketika data siap untuk dibaca. |
Pola penggunaan umum
Ada beberapa pola penggunaan untuk saluran. API dirancang agar sederhana, konsisten, dan fleksibel mungkin. Semua metode asinkron mengembalikan ValueTask (atau ValueTask<bool>) yang mewakili operasi asinkron ringan yang dapat menghindari alokasi jika operasi selesai secara sinkron dan berpotensi bahkan secara asinkron. Selain itu, API dirancang agar dapat dikomposisikan, karena pembuat saluran membuat janji tentang penggunaan yang dimaksudkan. Ketika saluran dibuat dengan parameter tertentu, implementasi internal dapat beroperasi lebih efisien mengetahui janji-janji ini.
Pola pembuatan
Bayangkan Anda membuat solusi produsen/konsumen untuk sistem posisi global (GPS). Anda ingin melacak koordinat perangkat dari waktu ke waktu. Objek koordinat sampel mungkin terlihat seperti ini:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Pola pembuatan yang tidak terikat
Salah satu pola penggunaan umum adalah membuat saluran tidak terikat default:
var channel = Channel.CreateUnbounded<Coordinates>();
Namun, mari kita bayangkan bahwa Anda ingin membuat saluran yang tidak terbatas dengan beberapa produsen dan konsumen:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
Dalam kasus ini, semua penulisan dilakukan secara sinkron, termasuk WriteAsync. Ini karena saluran yang tidak terbatas selalu memiliki ruang yang tersedia untuk penulisan secara efektif segera. Namun, dengan AllowSynchronousContinuations diatur ke true, proses penulisan mungkin akhirnya melakukan pekerjaan yang berhubungan dengan pembaca dengan melanjutkan eksekusi. Ini tidak memengaruhi sinkronisasi operasi.
Pola pembuatan terbatas
Dengan saluran terikat, konfigurasi saluran harus diketahui oleh konsumen untuk membantu memastikan konsumsi yang tepat. Artinya, konsumen harus tahu perilaku apa yang ditunjukkan saluran ketika batas yang dikonfigurasi tercapai. Mari kita jelajahi beberapa pola pembuatan terikat umum.
Cara paling sederhana untuk membuat saluran terikat adalah dengan menentukan kapasitas:
var channel = Channel.CreateBounded<Coordinates>(1);
Kode sebelumnya membuat saluran terikat dengan kapasitas 1maksimum . Opsi lain tersedia, beberapa opsi sama dengan saluran yang tidak terbatas, sementara yang lain khusus untuk saluran yang tidak terbatas:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
Dalam kode sebelumnya, saluran dibuat sebagai saluran terikat yang dibatasi hingga 1.000 item, dengan satu penulis tetapi banyak pembaca. Perilaku mode penuhnya didefinisikan sebagai DropWrite, yang berarti bahwa item yang sedang ditulis akan dibuang jika saluran penuh.
Untuk mengamati item yang dihilangkan saat menggunakan saluran terikat, daftarkan itemDropped panggilan balik:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Setiap kali saluran penuh dan item baru ditambahkan, itemDropped panggilan balik dipanggil. Dalam contoh ini, panggilan balik yang disediakan menulis item ke konsol, tetapi Anda bebas untuk mengambil tindakan lain yang Anda inginkan.
Pola produsen
Bayangkan bahwa produser dalam skenario ini menulis koordinat baru ke saluran. Produser dapat melakukan ini dengan memanggil TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Kode produsen sebelumnya:
-
Channel<Coordinates>.WritermenerimaChannelWriter<Coordinates>sebagai argumen, bersamaan denganCoordinatesawal. - Menentukan perulangan bersyarat
whileyang mencoba memindahkan koordinat menggunakanTryWrite.
Produsen alternatif mungkin menggunakan metode :WriteAsync
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Sekali lagi, Channel<Coordinates>.Writer digunakan dalam perulangan while . Tetapi kali ini, metode WriteAsync dipanggil. Metode berlanjut hanya setelah koordinat ditulis. Ketika loop while berakhir, panggilan ke Complete dilakukan, yang menandakan bahwa tidak ada lagi data yang ditulis ke kanal.
Pola produsen lain adalah menggunakan metode WaitToWriteAsync; pertimbangkan kode berikut:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Sebagai bagian dari kondisi while, hasil panggilan WaitToWriteAsync digunakan untuk menentukan apakah perulangan akan dilanjutkan.
Pola konsumen
Ada beberapa pola konsumen saluran yang umum. Ketika saluran tidak pernah berakhir, yang berarti menghasilkan data tanpa batas waktu, konsumen dapat menggunakan perulangan while (true) , dan membaca data saat tersedia:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Catatan
Kode ini melemparkan pengecualian jika saluran ditutup.
Pengguna alternatif dapat menghindari kekhawatiran ini dengan menggunakan perulangan while berlapis, seperti yang ditunjukkan dalam kode berikut:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
Dalam kode sebelumnya, konsumen menunggu untuk membaca data. Setelah data tersedia, konsumen mencoba membacanya. Perulangan ini terus mengevaluasi sampai produsen saluran memberi sinyal bahwa ia tidak lagi memiliki data untuk dibaca. Dengan demikian, ketika produsen diketahui memiliki jumlah item terbatas yang dihasilkannya dan memberi sinyal penyelesaian, konsumen dapat menggunakan semantik await foreach untuk mengiterasi item tersebut.
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Kode sebelumnya menggunakan ReadAllAsync metode untuk membaca semua koordinat dari saluran.