Knihovna System.Threading.Channels

Obor System.Threading.Channels názvů poskytuje sadu synchronizačních datových struktur pro předávání dat mezi producenty a příjemci asynchronně. Knihovna cílí na .NET, .NET Standard a .NET Framework a funguje na všech implementacích .NET.

Tato knihovna je k dispozici v 📦 balíčku NuGet System.Threading.Channels . Pokud ale používáte .NET Core 3.0 nebo novější, balíček se zahrne jako součást sdílené architektury.

Koncepční programovací model producenta/příjemce

Kanály představují implementaci konceptuálního programovacího modelu producenta/příjemce. V tomto programovacím modelu producenti asynchronně vytvářejí data a spotřebitelé asynchronně tato data spotřebovávají. Jinými slovy, tento model předává data mezi stranami prostřednictvím fronty typu FIFO (první dovnitř, první ven). Představte si kanály jako jakýkoli jiný běžný obecný typ kolekce, například List<T>. Hlavním rozdílem je, že tato kolekce spravuje synchronizaci a poskytuje různé modely spotřeby prostřednictvím možností vytváření továrny. Tyto možnosti řídí chování kanálů, například:

  • Kolik prvků může uložit a co se stane, když dosáhnete daného limitu.
  • Jestli je kanál současně přístupný více producentům nebo více příjemcům.

Základní použití

Následující příklad ukazuje základní použití kanálu, kde producent zapisuje položky a příjemce je přečte:

static async Task BasicUsageAsync()
{
    Channel<int> channel = Channel.CreateUnbounded<int>();

    Task producer = ProduceAsync(channel.Writer);
    Task consumer = ConsumeAsync(channel.Reader);

    await Task.WhenAll(producer, consumer);

    static async Task ProduceAsync(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 5; i++)
        {
            await writer.WriteAsync(i);
        }

        writer.Complete();
    }

    static async Task ConsumeAsync(ChannelReader<int> reader)
    {
        await foreach (int item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

Strategie ohraničování

V závislosti na tom, jak se vytvoří Channel<T> , se její čtenář a spisovatel chovají odlišně.

Chcete-li vytvořit kanál, který určuje maximální kapacitu, zavolejte Channel.CreateBounded. Chcete-li vytvořit kanál, který je používán libovolným počtem čtenářů a zapisovačů souběžně, zavolejte Channel.CreateUnbounded. Každá strategie ohraničující nabízí různé možnosti definované tvůrcem, buď BoundedChannelOptions nebo UnboundedChannelOptions.

Poznámka:

Bez ohledu na ohraničující strategii kanál vždy vyvolá ChannelClosedException, když se použije po zavření.

Nevázané kanály

Pokud chcete vytvořit nevázaný kanál, zavolejte jedno z Channel.CreateUnbounded přetížení:

var channel = Channel.CreateUnbounded<T>();

Když ve výchozím nastavení vytvoříte nevázaný kanál, může kanál používat libovolný počet čtenářů a zapisovačů současně. Alternativně můžete určit nestandardní chování při vytváření nevázaného kanálu poskytnutím UnboundedChannelOptions instance. Kapacita kanálu je nevázaná a všechny zápisy se provádějí synchronně. Další příklady najdete v tématu Nevázané vzory vytváření.

Ohraničené kanály

Pokud chcete vytvořit ohraničený kanál, zavolejte jedno z Channel.CreateBounded přetížení:

var channel = Channel.CreateBounded<T>(7);

Předchozí kód vytvoří kanál, který má maximální kapacitu 7 položek. Když vytvoříte ohraničený kanál, je kanál vázán na maximální kapacitu. Po dosažení hranice je výchozím chováním, že kanál asynchronně blokuje producenta tak, že dokud nebude k dispozici volné místo. Toto chování můžete nakonfigurovat zadáním možnosti při vytváření kanálu. Vázané kanály je možné vytvořit s libovolnou hodnotou kapacity větší než nula. Další příklady naleznete v tématu Ohraničené vzory vytváření.

Chování plného režimu

Při použití vázaného kanálu můžete určit chování, které kanál dodržuje při dosažení nakonfigurované vazby. Následující tabulka uvádí chování celého režimu pro každou BoundedChannelFullMode hodnotu:

Hodnota Chování
BoundedChannelFullMode.Wait Tato hodnota je výchozí. Volání na WriteAsync čekají, až bude dostupné místo, aby mohla být operace zápisu dokončena. Volání na TryWrite se false okamžitě vracejí.
BoundedChannelFullMode.DropNewest Odebere a ignoruje nejnovější položku v kanálu, aby se uvolnilo místo pro zapisování položky.
BoundedChannelFullMode.DropOldest Odebere a ignoruje nejstarší položku v kanálu, abyste uvolnili místo pro zápis nové položky.
BoundedChannelFullMode.DropWrite Zahodí zapisovanou položku.

Důležité

Kdykoli Channel<TWrite,TRead>.Writer produkuje rychleji, než Channel<TWrite,TRead>.Reader může spotřebovávat, zapisovač kanálu zaznamená zpětný tlak.

Rozhraní API pro producenty

Funkcionalita producenta je dostupná na Channel<TWrite,TRead>.Writer. Rozhraní API producenta a očekávané chování jsou podrobně popsány v následující tabulce:

API Očekávané chování
ChannelWriter<T>.Complete Označí kanál jako dokončený, což znamená, že do něj nejsou zapsány žádné další položky.
ChannelWriter<T>.TryComplete Pokusí se kanál označit jako dokončený, což znamená, že se do něj nezapisují žádná další data.
ChannelWriter<T>.TryWrite Pokusí se napsat zadanou položku do kanálu. Pokud se používá s nevázaným kanálem, vždy se vrátí true, pokud zapisovač kanálu nesignalizuje dokončení buď ChannelWriter<T>.Complete nebo ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync ValueTask<TResult> Vrátí hodnotu, která se dokončí, když je k dispozici mezera pro zápis položky.
ChannelWriter<T>.WriteAsync Asynchronně zapíše položku do kanálu.

Rozhraní API pro spotřebitele

Funkce pro uživatele je zpřístupněna na Channel<TWrite,TRead>.Reader. Rozhraní API pro spotřebitele a očekávané chování jsou podrobně popsány v následující tabulce:

API Očekávané chování
ChannelReader<T>.ReadAllAsync Vytvoří IAsyncEnumerable<T>, které umožňuje načítat všechna data z kanálu.
ChannelReader<T>.ReadAsync Asynchronně čte položku z kanálu.
ChannelReader<T>.TryPeek Pokusí se nahlédnout na položku z kanálu.
ChannelReader<T>.TryRead Pokusí se přečíst položku z kanálu.
ChannelReader<T>.WaitToReadAsync ValueTask<TResult> Vrátí hodnotu, která se dokončí, když jsou data k dispozici ke čtení.

Běžné vzory použití

Kanály používají několik vzorů použití:

Rozhraní API je navržené tak, aby bylo co nejjednodušší, konzistentní a co nejflexibilnější. Všechny asynchronní metody vrací ValueTask (nebo ValueTask<bool>) jednoduchou asynchronní operaci, která se může vyhnout přidělování, pokud se operace dokončí synchronně a potenciálně i asynchronně. Kromě toho je rozhraní API navrženo tak, aby bylo modulární, přičemž tvůrce kanálu dává záruky ohledně jeho zamýšleného použití. Při vytváření kanálu s určitými parametry může interní implementace pracovat efektivněji s vědomím těchto příslibů.

Vzory tvorby

Představte si, že vytváříte řešení producenta/spotřebitele pro globální systém umístění (GPS). Chcete sledovat souřadnice zařízení v průběhu času. Ukázkový objekt souřadnic může vypadat takto:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Nevázané vzory vytváření

Jedním z běžných vzorů použití je vytvoření výchozího nevázaného kanálu:

var channel = Channel.CreateUnbounded<Coordinates>();

Představte si ale, že chcete vytvořit nevázaný kanál s více producenty a spotřebiteli. Nastavte SingleWriter = false a SingleReader = false v možnostech kanálu:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

V tomto případě jsou všechny zápisy synchronní, dokonce i WriteAsync. K tomuto chování dochází, protože nevázaný kanál má vždy k dispozici místo pro zápis okamžitě. Když ale nastavíte možnost AllowSynchronousContinuationstrue, zápisy můžou skončit v práci související se čtenářem spuštěním jejich pokračování. Toto nastavení nemá vliv na synchronicity operace.

Ohraničené vzory vytváření

U ohraničených kanálů by měla být konfigurovatelnost kanálu známa příjemci, aby se zajistila správná spotřeba. To znamená, že uživatel by měl vědět, jaké chování kanál vykazuje při dosažení nakonfigurované meze. Následující příklady ukazují některé běžné vázané vzory vytváření.

Nejjednodušší způsob, jak vytvořit ohraničený kanál, je zadat kapacitu. Následující kód vytvoří ohraničený kanál s maximální kapacitou 1.

var channel = Channel.CreateBounded<Coordinates>(1);

K dispozici jsou další možnosti, Některé možnosti jsou stejné jako nevázaný kanál, zatímco jiné jsou specifické pro vázané kanály. V následujícím kódu se kanál vytvoří jako ohraničený kanál, který je omezen na 1 000 položek s jedním zapisovačem, ale mnoha čtenáři. Jeho chování v plném režimu je definováno jako DropWrite, což znamená, že zahodí položku, která je zapsána, pokud je kanál plný.

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Pokud chcete sledovat položky, které se zahodí při použití ohraničených kanálů, zaregistrujte itemDropped zpětné volání:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Pokaždé, když je kanál plný a přidá se nová položka, itemDropped vyvolá se zpětné volání. V tomto příkladu zapíše poskytnutá callback funkce položku do konzole, ale můžete provést jakoukoli jinou akci, kterou chcete.

Vzory výrobce

Představte si, že producent v tomto scénáři píše do kanálu nové souřadnice. Producent to může provést voláním TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Předchozí kód producenta:

  • Channel<Coordinates>.Writer přijme (ChannelWriter<Coordinates>) jako argument spolu s počátečním Coordinates.
  • Definuje podmíněnou while smyčku, která se pokusí přesunout souřadnice pomocí TryWrite.

Alternativní producent může použít metodu WriteAsync :

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Opět se Channel<Coordinates>.Writer používá ve smyčce while . Tentokrát je však volána metoda WriteAsync. Metoda pokračuje až po napsání souřadnic. Když smyčka while skončí, provede se volání Complete , které signalizuje, že do kanálu nejsou zapsána žádná další data.

Dalším vzorem producenta je použití metody WaitToWriteAsync. Prohlédněte si následující kód:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

V rámci podmíněného while se výsledek WaitToWriteAsync volání používá k určení, zda má smyčka pokračovat.

Vzory spotřebitelů

Existuje několik běžných vzorů příjemců kanálu. Když kanál nikdy nekončí, znamená to, že vytváří data po neomezenou dobu, uživatel může použít smyčku while (true) a číst data, jakmile bude k dispozici:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Poznámka:

Tento kód vyvolá výjimku, pokud je kanál zavřený.

Alternativnímu uživateli by se tento problém mohl vyhnout použitím vnořené smyčky while, jak je znázorněno v následujícím kódu:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

V předchozím kódu příjemce čeká na čtení dat. Jakmile jsou data k dispozici, uživatel se je pokusí přečíst. Tyto smyčky se nadále vyhodnocují, dokud výrobce kanálu nesignalizuje, že už nemá data k přečtení. S tím řečeno, když je známo, že výrobce má konečný počet položek, které vytváří, a signalizuje dokončení, může spotřebitel použít sémantiku await foreach k iteraci položek.

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Předchozí kód používá metodu ReadAllAsync ke čtení všech souřadnic z kanálu.

Více výrobců a spotřebitelů

Kanály podporují více souběžných výrobců a příjemců. Aby bylo toto možné, vytvořte kanál s nastaveními SingleWriter = false a SingleReader = false v možnostech kanálu. Pak rozdělíte psaní mezi několik úkolů pro producenta a konsolidujete čtení napříč několika spotřebitelskými úkoly.

static async Task UseMultipleProducersAndConsumersAsync()
{
    Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
        new UnboundedChannelOptions
        {
            SingleWriter = false,
            SingleReader = false
        });

    // Start three concurrent producer tasks.
    Task[] producerTasks = Enumerable.Range(0, 3)
        .Select(id => ProduceAsync(id, channel))
        .ToArray();

    // Start two concurrent consumer tasks.
    Task[] consumerTasks = Enumerable.Range(0, 2)
        .Select(_ => ConsumeAsync(channel))
        .ToArray();

    // Wait for all producers to finish, then mark the channel as complete.
    await Task.WhenAll(producerTasks);
    channel.Writer.Complete();

    // Wait for all consumers to finish.
    await Task.WhenAll(consumerTasks);

    static async Task ProduceAsync(int id, Channel<Coordinates> channel)
    {
        Coordinates coordinates = new(
            DeviceId: Guid.NewGuid(),
            Latitude: -90 + (id * 30),
            Longitude: -180 + (id * 60));

        while (coordinates is { Latitude: < 90, Longitude: < 180 })
        {
            coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + 0.5,
                Longitude = coordinates.Longitude + 1
            };
                
            await channel.Writer.WriteAsync(coordinates);
        }
    }

    static async Task ConsumeAsync(Channel<Coordinates> channel)
    {
        await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(coordinates);
        }
    }
}

Předchozí kód:

  • Vytvoří nevázaný kanál, který explicitně podporuje více souběžných zapisovačů a čtenářů.
  • Spustí tři souběžné úlohy producenta, přičemž každá zapisuje řadu souřadnic s jedinečným identifikátorem zařízení.
  • Spustí dvě souběžné úlohy spotřebitele, každá čte ze stejného kanálu pomocí ReadAllAsync.
  • Čeká, až všichni producenti dokončí, a potom zavolá Complete aby signalizoval, že se do kanálu již nezapisují další data.
  • Čeká, až všichni příjemci dokončí vyprázdnění zbývajících dat z kanálu.

Návod

U více výrobců volejte channel.Writer.Complete() až po dokončení psaní všech producentů. To značí, že již nejsou zapisována žádná data, což umožňuje ReadAllAsync() dokončit po spotřebování všech zbývajících položek.

Viz také