Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
System.Threading.Channels ad alanı, verileri üreticiler ve tüketiciler arasında eşzamanlı olmayan bir şekilde geçirmek için bir senkronizasyon veri yapıları kümesi sağlar. Kitaplık .NET, .NET Standard ve .NET Framework'i hedefler ve tüm .NET uygulamaları üzerinde çalışır.
Bu kitaplık System.Threading.Channels NuGet paketinde📦 kullanılabilir. Bununla birlikte, .NET Core 3.0 veya üzerini kullanıyorsanız, paket paylaşılan çerçevenin bir parçası olarak eklenir.
Üretici/tüketici kavramsal programlama modeli
Kanallar, üretici/tüketici kavramsal programlama modelinin bir uygulamasıdır. Bu programlama modelinde üreticiler zaman uyumsuz olarak veri üretir ve tüketiciler bu verileri zaman uyumsuz olarak kullanır. Başka bir deyişle, bu model ilk giren ilk çıkar ("FIFO") kuyruğu aracılığıyla verileri bir taraftan diğerine geçirir. Kanalları, List<T> gibi başka bir yaygın genel toplama türü olarak düşünün. Birincil fark, bu koleksiyonun eşitlemeyi yönetmesi ve fabrika oluşturma seçenekleri aracılığıyla çeşitli tüketim modelleri sağlamasıdır. Bu seçenekler kanalların davranışını denetler, örneğin:
- Kaç öğe depolamalarına izin verilir ve bu sınıra ulaşılırsa ne olur?
- Kanala birden çok üretici veya birden çok tüketici tarafından eşzamanlı olarak erişilip erişilemediği.
Temel kullanım
Aşağıdaki örnekte, bir üreticinin öğeleri yazdığı ve tüketicinin bunları okuduğu bir kanalın temel kullanımı gösterilmektedir:
static async Task BasicUsageAsync()
{
Channel<int> channel = Channel.CreateUnbounded<int>();
Task producer = ProduceAsync(channel.Writer);
Task consumer = ConsumeAsync(channel.Reader);
await Task.WhenAll(producer, consumer);
static async Task ProduceAsync(ChannelWriter<int> writer)
{
for (int i = 0; i < 5; i++)
{
await writer.WriteAsync(i);
}
writer.Complete();
}
static async Task ConsumeAsync(ChannelReader<int> reader)
{
await foreach (int item in reader.ReadAllAsync())
{
Console.WriteLine($"Received: {item}");
}
}
}
Sınırlayıcı stratejiler
Bir Channel<T> öğesinin nasıl oluşturulduğuna bağlı olarak, okuyucusu ve yazıcısı farklı davranır.
Maksimum kapasiteyi belirten bir kanal oluşturmak için çağrısında bulunur Channel.CreateBounded. Herhangi bir sayıda okuyucu ve yazar tarafından eşzamanlı olarak kullanılan bir kanal oluşturmak için öğesini çağırın Channel.CreateUnbounded. Her sınırlayıcı strateji, ya BoundedChannelOptions ya da UnboundedChannelOptions olmak üzere çeşitli oluşturucu tanımlı seçenekleri kullanıma sunar.
Not
Sınırlayıcı stratejiden bağımsız olarak, bir kanal kapandıktan sonra yeniden kullanıldığında her zaman bir ChannelClosedException fırlatır.
İlişkisiz kanallar
Sınırsız bir kanal oluşturmak için aşırı yüklemelerden birini çağırınChannel.CreateUnbounded.
var channel = Channel.CreateUnbounded<T>();
İlişkisiz bir kanal oluşturduğunuzda, kanal varsayılan olarak herhangi bir sayıda okuyucu ve yazar tarafından eşzamanlı olarak kullanılabilir. Alternatif olarak, bir örnek sağlayarak UnboundedChannelOptions ilişkisiz bir kanal oluştururken beklenmeyen bir davranış belirtebilirsiniz. Kanalın kapasitesi sınırsızdır ve tüm yazma işlemleri zaman uyumlu olarak gerçekleştirilir. Daha fazla örnek için bkz . Sınırsız oluşturma desenleri.
Sınırlanmış kanallar
Sınırlanmış kanal oluşturmak için aşırı yüklemelerden birini çağırın Channel.CreateBounded :
var channel = Channel.CreateBounded<T>(7);
Yukarıdaki kod, maksimum öğe kapasitesine 7 sahip bir kanal oluşturur. Sınırlanmış bir kanal oluşturduğunuzda, kanal maksimum kapasiteye bağlıdır. Sınıra ulaşıldığında, varsayılan davranış kanalın alan kullanılabilir duruma gelene kadar üreticiyi zaman uyumsuz olarak engellemesidir. Kanalı oluştururken bir seçenek belirterek bu davranışı yapılandırabilirsiniz. Sınırlanmış kanallar, sıfırdan büyük herhangi bir kapasite değeriyle oluşturulabilir. Diğer örnekler için bkz . Sınırlanmış oluşturma desenleri.
Tam mod davranışı
Sınırlanmış kanal kullanırken, yapılandırılan sınıra ulaşıldığında kanalın bağlı olduğu davranışı belirtebilirsiniz. Aşağıdaki tabloda her BoundedChannelFullMode değer için tam mod davranışları listeleniyor:
| Değer | Davranış |
|---|---|
| BoundedChannelFullMode.Wait | Bu varsayılan değerdir.
WriteAsync Yazma işlemini tamamlamak için kullanılabilir alan beklemeye yönelik çağrılar.
TryWrite çağrıları hemen false döner. |
| BoundedChannelFullMode.DropNewest | Yazılan öğeye yer açmak için kanaldaki en yeni öğeyi kaldırır ve görmezden gelir. |
| BoundedChannelFullMode.DropOldest | Yazılan öğeye yer açmak için kanaldaki en eski öğeyi kaldırır ve yoksayar. |
| BoundedChannelFullMode.DropWrite | Yazılan veriyi/öğeyi bırakır. |
Önemli
Channel<TWrite,TRead>.Writer, Channel<TWrite,TRead>.Reader 'in tüketebileceğinden daha hızlı ürettiğinde, kanalın yazarı geri basınç yaşar.
Üretici API'leri
Üretici işlevselliği Channel<TWrite,TRead>.Writer üzerinden kullanıma sunulur. Üretici API'leri ve beklenen davranış aşağıdaki tabloda ayrıntılı olarak yer alır:
| Uygulama Programlama Arayüzü (API) | Beklenen davranış |
|---|---|
| ChannelWriter<T>.Complete | Kanalı tamamlandı olarak işaretler; başka öğe yazılmaması anlamına gelir. |
| ChannelWriter<T>.TryComplete | Kanalı tamamlandı olarak işaretlemeye çalışır; başka veri yazılması gerekmez. |
| ChannelWriter<T>.TryWrite | Belirtilen öğeyi kanala yazmaya çalışır. Sınırsız bir kanalla kullanıldığında, kanalın yazıcı nesnesi true veya ChannelWriter<T>.Complete ile tamamlanma sinyali vermediği sürece bu her zaman ChannelWriter<T>.TryComplete döndürür. |
| ChannelWriter<T>.WaitToWriteAsync | Bir öğe yazmak için kullanılabilir alan olduğunda tamamlanan bir ValueTask<TResult> döndürür. |
| ChannelWriter<T>.WriteAsync | Kanala zaman uyumsuz olarak bir öğe yazar. |
Tüketici API’leri
Tüketici işlevselliği Channel<TWrite,TRead>.Reader üzerinden kullanıma sunulur. Tüketici API'leri ve beklenen davranış aşağıdaki tabloda ayrıntılı olarak yer alır:
| Uygulama Programlama Arayüzü (API) | Beklenen davranış |
|---|---|
| ChannelReader<T>.ReadAllAsync | Kanaldaki tüm verilerin okunmasını sağlayan bir IAsyncEnumerable<T> oluşturur. |
| ChannelReader<T>.ReadAsync | Kanaldan bir öğeyi zaman uyumsuz olarak okur. |
| ChannelReader<T>.TryPeek | Kanaldan bir öğeye göz atmaya çalışır. |
| ChannelReader<T>.TryRead | Kanaldan bir öğeyi okumaya çalışır. |
| ChannelReader<T>.WaitToReadAsync | Veriler okunmaya uygun olduğunda tamamlanan bir ValueTask<TResult> döndürür. |
Yaygın kullanım desenleri
Kanallar için çeşitli kullanım desenleri vardır:
API basit, tutarlı ve olabildiğince esnek olacak şekilde tasarlanmıştır. Tüm zaman uyumsuz yöntemler, bir ValueTask (veya ValueTask<bool>) döndürür; bu, işlemin zaman uyumlu tamamlanması durumunda ve potansiyel olarak zaman uyumsuz tamamlanması durumunda ayırmadan kaçınabilecek hafif bir zaman uyumsuz işlemi temsil eder. Buna ek olarak, bir kanalın oluşturucusunun hedeflenen kullanımı hakkında taahhütlerde bulunduğu için API, birleştirilebilir olacak şekilde tasarlanmıştır. Belirli parametrelerle bir kanal oluşturulduğunda, iç uygulama bu vaatleri bilerek daha verimli çalışabilir.
Oluşturma desenleri
Küresel bir konum sistemi (GPS) için bir üretici/tüketici çözümü oluşturduğunuzu düşünün. Bir cihazın koordinatlarını zaman içinde izlemek istiyorsunuz. Örnek koordinatlar nesnesi şöyle görünebilir:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Sınırsız oluşturma desenleri
Yaygın kullanım desenlerinden biri, varsayılan bir ilişkisiz kanal oluşturmaktır :
var channel = Channel.CreateUnbounded<Coordinates>();
Ancak bunun yerine, birden çok üretici ve tüketici ile ilişkisiz bir kanal oluşturmak istediğinizi düşünün. Kanal seçeneklerinde SingleWriter = false ve SingleReader = false ayarlayın.
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
Bu durumda, WriteAsync işlemleri de dahil olmak üzere, tüm yazma işlemleri eşzamanlıdır. Bu davranış, ilişkisiz bir kanalın hemen yazma için her zaman uygun alana sahip olması nedeniyle oluşur. Ancak, AllowSynchronousContinuations ve true olarak ayarlandığında yazma işlemleri, okuyucu ile ilişkilendirilmiş işleri yaparak devamları yürütebilir. Bu ayar işlemin eşzamanlılığını etkilemez.
Sınırlanmış oluşturma desenleri
Sınırlanmış kanallarda, doğru tüketimin sağlanmasına yardımcı olmak için kanalın yapılandırılabilirliği tüketici tarafından bilinmelidir. Başka bir ifadeyle, yapılandırılan sınıra ulaşıldığında tüketici kanalın hangi davranışı sergilediğini bilmelidir. Aşağıdaki örneklerde yaygın sınırlanmış oluşturma desenlerinden bazıları gösterilmektedir.
Sınırlanmış kanal oluşturmanın en basit yolu kapasite belirtmektir. Aşağıdaki kod maksimum kapasiteye 1sahip sınırlanmış bir kanal oluşturur.
var channel = Channel.CreateBounded<Coordinates>(1);
Bunların dışında farklı seçenekler de vardır. Bazı seçenekler ilişkisiz kanalla aynıdır, diğerleri ise sınırlanmış kanallara özgü olur. Aşağıdaki kodda, kanal 1.000 öğeyle sınırlı bir sınırlanmış kanal olarak oluşturulur ve tek bir yazar ancak çok sayıda okuyucu bulunur. Tam mod davranışı olarak DropWritetanımlanır, yani kanal doluysa yazılan öğeyi bırakır.
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
Sınırlanmış kanallar kullanılırken bırakılan öğeleri gözlemlemek için bir itemDropped geri çağırma kaydedin:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Kanal dolu olduğunda ve yeni bir öğe eklendiğinde, itemDropped geri arama işlevi çağrılır. Bu örnekte, sağlanan geri arama öğeyi konsola yazar, ancak istediğiniz başka bir eylemi gerçekleştirebilirsiniz.
Üretim desenleri
Bu senaryoda yapımcının kanala yeni koordinatlar yazdığını düşünün. Üretici, TryWrite'u çağırarak bunu yapabilir.
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Yukarıdaki üretici kodu:
-
Channel<Coordinates>.Writer(ChannelWriter<Coordinates>) öğesini, ilkCoordinatesile birlikte bağımsız değişken olarak kabul eder. - kullanarak
whilekoordinatları taşımaya çalışan bir koşulluTryWritedöngü tanımlar.
Alternatif bir üretici şu WriteAsync yöntemi kullanabilir:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Tekrar, Channel<Coordinates>.Writer bir while döngü içinde kullanılır. Ancak bu kez WriteAsync yöntemi çağrılır. yöntemi ancak koordinatlar yazıldıktan sonra devam eder. Döngüden while çıkıldığında, kanala artık veri yazılmayacağını bildiren Complete çağrısı yapılır.
Başka bir üretici deseni yöntemi WaitToWriteAsync'nin kullanılmasıdır, şu kodu göz önünde bulundurun:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
Koşullu whileöğesinin bir parçası olarak, döngüye WaitToWriteAsync devam edilip edilmeyeceğini belirlemek için çağrının sonucu kullanılır.
Tüketici davranışları
Çeşitli yaygın kanal tüketici desenleri vardır. Bir kanal hiç bitmediğinde, yani süresiz olarak veri ürettiğinde, tüketici bir while (true) döngü kullanabilir ve kullanılabilir hale geldikçe verileri okuyabilir:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Not
Bu kod, kanal kapatılırsa bir özel durum oluşturur.
Alternatif bir tüketici, aşağıdaki kodda gösterildiği gibi iç içe while döngüsü kullanarak bu sorundan kaçınabilir:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
Önceki kodda tüketici verileri okumak için bekler. Veriler kullanılabilir olduğunda, tüketici verileri okumaya çalışır. Bu döngüler, kanalın üreticisi artık okunacak veri olmadığını belirtene kadar değerlendirmeye devam eder. Bununla birlikte, bir üretici ürettiği sonlu sayıda öğeye sahip olduğu biliniyorsa ve tamamlanma sinyali veriyorsa, tüketici öğeler üzerinde yineleme yapmak için semantiği kullanabilir await foreach :
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Yukarıdaki kod, kanalın ReadAllAsync tüm koordinatlarını okumak için yöntemini kullanır.
Birden çok üretici ve tüketici
Kanallar birden çok eşzamanlı üreticiyi ve tüketiciyi destekler. Bunu etkinleştirmek için, kanal seçeneklerinde SingleWriter = false ve SingleReader = false ile bir kanal oluşturun. Daha sonra birden çok üretici görevine yazma işini dağıtır ve birden çok tüketici görevi arasında okuma işini birleştirirsiniz.
static async Task UseMultipleProducersAndConsumersAsync()
{
Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false
});
// Start three concurrent producer tasks.
Task[] producerTasks = Enumerable.Range(0, 3)
.Select(id => ProduceAsync(id, channel))
.ToArray();
// Start two concurrent consumer tasks.
Task[] consumerTasks = Enumerable.Range(0, 2)
.Select(_ => ConsumeAsync(channel))
.ToArray();
// Wait for all producers to finish, then mark the channel as complete.
await Task.WhenAll(producerTasks);
channel.Writer.Complete();
// Wait for all consumers to finish.
await Task.WhenAll(consumerTasks);
static async Task ProduceAsync(int id, Channel<Coordinates> channel)
{
Coordinates coordinates = new(
DeviceId: Guid.NewGuid(),
Latitude: -90 + (id * 30),
Longitude: -180 + (id * 60));
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
coordinates = coordinates with
{
Latitude = coordinates.Latitude + 0.5,
Longitude = coordinates.Longitude + 1
};
await channel.Writer.WriteAsync(coordinates);
}
}
static async Task ConsumeAsync(Channel<Coordinates> channel)
{
await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
}
Önceki kod:
- Birden çok eşzamanlı yazıcıyı ve okuyucuyu açıkça destekleyen ilişkisiz bir kanal oluşturur.
- Her biri benzersiz bir cihaz tanımlayıcısı ile bir dizi koordinat yazan üç eşzamanlı üretici görevi başlatır.
- Aynı kanaldan
ReadAllAsynckullanarak okuma yapan iki eşzamanlı tüketici görevi başlatır. - Tüm üreticilerin tamamlanmasını bekler, ardından kanala artık veri yazılmayacağını belirten bir sinyal gönderir.
- Tüm tüketicilerin kanaldaki kalan verileri tamamen boşaltmasını bekler.
Tip
Birden çok üreticiyle, yalnızca channel.Writer.Complete() yapımcılar yazmayı bitirdikten sonra arayın. Bu, daha fazla veri yazılmadığını gösterir ve ReadAllAsync(), kalan tüm öğeler tükendikten sonra tamamlanabilir.