System.Threading.Channels kitaplığı

Ad System.Threading.Channels alanı, verileri üreticiler ve tüketiciler arasında zaman uyumsuz olarak geçirmek için bir eşitleme veri yapıları kümesi sağlar. Kitaplık .NET Standard'ı hedefler ve tüm .NET uygulamaları üzerinde çalışır.

Bu kitaplık System.Threading.Channels NuGet paketinde kullanılabilir. Ancak. .NET Core 3.0 veya üzerini kullanıyorsanız paket, çerçevenin bir parçası olarak eklenir.

Üretici/tüketici kavramsal programlama modeli

Kanallar, üretici/tüketici kavramsal programlama modelinin bir uygulamasıdır. Bu programlama modelinde üreticiler zaman uyumsuz olarak veri üretir ve tüketiciler bu verileri zaman uyumsuz olarak kullanır. Başka bir deyişle, bu model verileri bir taraftan diğerine devreder. Kanalları, gibi diğer yaygın genel koleksiyon türlerinde olduğu gibi List<T>düşünün. Birincil fark, bu koleksiyonun eşitlemeyi yönetmesi ve fabrika oluşturma seçenekleri aracılığıyla çeşitli tüketim modelleri sağlamasıdır. Bu seçenekler, kaç öğe depolamalarına izin verilenler ve bu sınıra ulaşılırsa ne olacağı veya kanala eşzamanlı olarak birden çok üretici veya birden çok tüketici tarafından erişilip erişilemeyeceği gibi kanalların davranışını denetler.

Sınırlayıcı stratejiler

Bir Channel<T> öğesinin nasıl oluşturulduğuna bağlı olarak, okuyucusu ve yazıcısı farklı davranır.

Maksimum kapasiteyi belirten bir kanal oluşturmak için çağrısında bulunur Channel.CreateBounded. Herhangi bir sayıda okuyucu ve yazar tarafından eşzamanlı olarak kullanılan bir kanal oluşturmak için öğesini çağırın Channel.CreateUnbounded. Her sınırlayıcı strateji, BoundedChannelOptions sırasıyla veya UnboundedChannelOptions sırasıyla çeşitli oluşturucu tanımlı seçenekleri kullanıma sunar.

Not

Sınırlayıcı stratejiden bağımsız olarak, bir kanal kapatıldıktan sonra kullanıldığında her zaman bir ChannelClosedException oluşturur.

İlişkisiz kanallar

İlişkisiz bir kanal oluşturmak için aşırı yüklemelerden birini çağırın Channel.CreateUnbounded :

var channel = Channel.CreateUnbounded<T>();

İlişkisiz bir kanal oluşturduğunuzda, kanal varsayılan olarak herhangi bir sayıda okuyucu ve yazar tarafından eşzamanlı olarak kullanılabilir. Alternatif olarak, bir örnek sağlayarak UnboundedChannelOptions ilişkisiz bir kanal oluştururken beklenmeyen bir davranış belirtebilirsiniz. Kanalın kapasitesi sınırsızdır ve tüm yazma işlemleri zaman uyumlu olarak gerçekleştirilir. Daha fazla örnek için bkz . Sınırsız oluşturma desenleri.

Sınırlanmış kanallar

Sınırlanmış kanal oluşturmak için aşırı yüklemelerden birini çağırın Channel.CreateBounded :

var channel = Channel.CreateBounded<T>(7);

Yukarıdaki kod, maksimum öğe kapasitesine 7 sahip bir kanal oluşturur. Sınırlanmış bir kanal oluşturduğunuzda, kanal maksimum kapasiteye bağlıdır. Sınıra ulaşıldığında, varsayılan davranış kanalın alan kullanılabilir duruma gelene kadar üreticiyi zaman uyumsuz olarak engellemesidir. Kanalı oluştururken bir seçenek belirterek bu davranışı yapılandırabilirsiniz. Sınırlanmış kanallar, sıfırdan büyük herhangi bir kapasite değeriyle oluşturulabilir. Diğer örnekler için bkz . Sınırlanmış oluşturma desenleri.

Tam mod davranışı

Sınırlanmış kanal kullanırken, yapılandırılan sınıra ulaşıldığında kanalın bağlı olduğu davranışı belirtebilirsiniz. Aşağıdaki tabloda her BoundedChannelFullMode değer için tam mod davranışları listeleniyor:

Değer Davranış
BoundedChannelFullMode.Wait Bu varsayılan değerdir. WriteAsync Yazma işlemini tamamlamak için kullanılabilir alan beklemeye yönelik çağrılar. Hemen geri dönmek false için TryWrite aramalar.
BoundedChannelFullMode.DropNewest Yazılan öğeye yer açmak için kanaldaki en yeni öğeyi kaldırır ve yoksayar.
BoundedChannelFullMode.DropOldest Yazılan öğeye yer açmak için kanaldaki en eski öğeyi kaldırır ve yoksayar.
BoundedChannelFullMode.DropWrite Yazılan öğeyi bırakır.

Önemli

a Channel<TWrite,TRead>.Writer , bir'den Channel<TWrite,TRead>.Reader daha hızlı bir şekilde ürettiğinde kanalın yazarı sırt baskısı yaşar.

Üretici API'leri

Üretici işlevselliği üzerinde Channel<TWrite,TRead>.Writerkullanıma sunulur. Üretici API'leri ve beklenen davranış aşağıdaki tabloda ayrıntılı olarak yer alır:

API Beklenen davranış
ChannelWriter<T>.Complete Kanalı tamamlandı olarak işaretler; başka öğe yazılmaması anlamına gelir.
ChannelWriter<T>.TryComplete Kanalı tamamlandı olarak işaretlemeye çalışır; başka veri yazılması gerekmez.
ChannelWriter<T>.TryWrite Belirtilen öğeyi kanala yazmaya çalışır. İlişkisiz bir kanalla kullanıldığında, kanalın yazıcısı veya ChannelWriter<T>.TryCompleteile ChannelWriter<T>.Completetamamlanma sinyali vermediği sürece bu her zaman döndürürtrue.
ChannelWriter<T>.WaitToWriteAsync ValueTask<TResult> Bir öğe yazmak için kullanılabilir alan olduğunda tamamlanan bir döndürür.
ChannelWriter<T>.WriteAsync Kanala zaman uyumsuz olarak bir öğe yazar.

Tüketici API’leri

Tüketici işlevselliği üzerinde Channel<TWrite,TRead>.Readerkullanıma sunulur. Tüketici API'leri ve beklenen davranış aşağıdaki tabloda ayrıntılı olarak yer alır:

API Beklenen davranış
ChannelReader<T>.ReadAllAsync Kanaldaki tüm verilerin okunmasını sağlayan bir IAsyncEnumerable<T> oluşturur.
ChannelReader<T>.ReadAsync Kanaldan bir öğeyi zaman uyumsuz olarak okur.
ChannelReader<T>.TryPeek Kanaldan bir öğeye göz atmaya çalışır.
ChannelReader<T>.TryRead Kanaldan bir öğeyi okumaya çalışır.
ChannelReader<T>.WaitToReadAsync Veriler okunmaya uygun olduğunda tamamlanan bir ValueTask<TResult> döndürür.

Yaygın kullanım desenleri

Kanallar için çeşitli kullanım desenleri vardır. API basit, tutarlı ve olabildiğince esnek olacak şekilde tasarlanmıştır. Zaman uyumsuz yöntemlerin tümü, işlem zaman uyumlu ve hatta zaman uyumsuz olarak tamamlanırsa ayırmayı önleyebilecek basit bir zaman uyumsuz işlemi temsil eden bir (veya ValueTask<bool>) döndürür ValueTask . Buna ek olarak, BIR kanalın oluşturucusunun hedeflenen kullanımı hakkında sözler verdiği için API, birleştirilebilir olacak şekilde tasarlanmıştır. Belirli parametrelerle bir kanal oluşturulduğunda, iç uygulama bu vaatleri bilerek daha verimli çalışabilir.

Oluşturma desenleri

Küresel bir konum sistemi (GPS) için bir üretici/tüketici çözümü oluşturduğunuzu düşünün. Bir cihazın koordinatlarını zaman içinde izlemek istiyorsunuz. Örnek koordinatlar nesnesi şöyle görünebilir:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

İlişkisiz oluşturma desenleri

Yaygın kullanım desenlerinden biri, varsayılan bir ilişkisiz kanal oluşturmaktır:

var channel = Channel.CreateUnbounded<Coordinates>();

Ancak bunun yerine, birden çok üretici ve tüketiciyle ilişkisiz bir kanal oluşturmak istediğinizi düşünelim:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Bu durumda, tüm yazma işlemleri zaman uyumlu, hatta WriteAsync. Bunun nedeni, ilişkisiz bir kanalın hemen etkili bir şekilde yazma için her zaman uygun alanı olmasıdır. Ancak, olarak ayarlandığındatrue, AllowSynchronousContinuations yazma işlemleri devamlarını yürüterek okuyucuyla ilişkili işler yapabilir. Bu işlem eşitlemesini etkilemez.

Sınırlanmış oluşturma desenleri

Sınırlanmış kanallarda, doğru tüketimin sağlanmasına yardımcı olmak için kanalın yapılandırılabilirliği tüketici tarafından bilinmelidir. Başka bir ifadeyle, yapılandırılan sınıra ulaşıldığında tüketici kanalın hangi davranışı sergilediğini bilmelidir. Şimdi yaygın sınırlanmış oluşturma desenlerinden bazılarını inceleyelim.

Sınırlanmış kanal oluşturmanın en basit yolu bir kapasite belirtmektir:

var channel = Channel.CreateBounded<Coordinates>(1);

Yukarıdaki kod, maksimum kapasiteye 1sahip sınırlanmış bir kanal oluşturur. Diğer seçenekler kullanılabilir, bazı seçenekler ilişkisiz kanalla aynıdır, diğerleri ise ilişkisiz kanallara özeldir:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Önceki kodda, kanal 1.000 öğeyle sınırlı bir sınırlanmış kanal olarak oluşturulur ve tek bir yazar ancak çok sayıda okuyucu bulunur. Tam mod davranışı olarak DropWritetanımlanır, yani kanal doluysa yazılan öğeyi bırakır.

Sınırlanmış kanallar kullanılırken bırakılan öğeleri gözlemlemek için bir itemDropped geri çağırma kaydedin:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Kanal dolu olduğunda ve yeni bir öğe eklendiğinde geri itemDropped arama çağrılır. Bu örnekte, sağlanan geri arama öğeyi konsola yazar, ancak istediğiniz başka bir eylemi gerçekleştirebilirsiniz.

Üretici desenleri

Bu senaryoda yapımcının kanala yeni koordinatlar yazdığını düşünün. Üretici şunu çağırarak TryWritebunu yapabilir:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Yukarıdaki üretici kodu:

  • (ChannelWriter<Coordinates>) öğesini ilk Coordinatesile birlikte bağımsız değişken olarak kabul eder Channel<Coordinates>.Writer .
  • kullanarak TryWritekoordinatları taşımaya çalışan bir koşullu while döngü tanımlar.

Alternatif bir üretici şu WriteAsync yöntemi kullanabilir:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Tekrar, Channel<Coordinates>.Writer bir while döngü içinde kullanılır. Ancak bu kez WriteAsync yöntemi çağrılır. yöntemi ancak koordinatlar yazıldıktan sonra devam eder. Döngüden while çıkıldığında, kanala Complete daha fazla veri yazıldığını belirten bir çağrısı yapılır.

Başka bir üretici deseni yöntemini kullanmaktır WaitToWriteAsync , aşağıdaki kodu göz önünde bulundurun:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Koşullu whileöğesinin bir parçası olarak, döngüye WaitToWriteAsync devam edilip edilmeyeceğini belirlemek için çağrının sonucu kullanılır.

Tüketici desenleri

Çeşitli yaygın kanal tüketici desenleri vardır. Bir kanal hiç bitmediğinde, yani süresiz olarak veri ürettiğinde, tüketici bir while (true) döngü kullanabilir ve kullanılabilir hale geldikçe verileri okuyabilir:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Not

Kanal kapatılırsa bu kod bir özel durum oluşturur.

Alternatif bir tüketici, aşağıdaki kodda gösterildiği gibi iç içe while döngüsü kullanarak bu sorundan kaçınabilir:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Önceki kodda tüketici verileri okumak için bekler. Veriler kullanılabilir olduğunda, tüketici verileri okumaya çalışır. Bu döngüler, kanalın üreticisi artık okunacak veri olmadığını belirtene kadar değerlendirmeye devam eder. Bununla birlikte, bir üretici ürettiği sonlu sayıda öğeye sahip olduğu biliniyorsa ve tamamlanma sinyali veriyorsa, tüketici öğeler üzerinde yineleme yapmak için semantiği kullanabilir await foreach :

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Yukarıdaki kod, kanalın ReadAllAsync tüm koordinatlarını okumak için yöntemini kullanır.

Ayrıca bkz.