Dela via


System.Threading.Channels-bibliotek

Namnområdet System.Threading.Channels innehåller en uppsättning synkroniseringsdatastrukturer för att skicka data mellan producenter och konsumenter asynkront. Biblioteket riktar in sig på .NET Standard och fungerar på alla .NET-implementeringar.

Det här biblioteket är tillgängligt i NuGet-paketet System.Threading.Channels . Men om du använder .NET Core 3.0 eller senare ingår paketet som en del av ramverket.

Konceptuell programmeringsmodell för producent/konsument

Kanaler är en implementering av den konceptuella programmeringsmodellen producent/konsument. I den här programmeringsmodellen producerar producenter asynkront data och konsumenterna använder asynkront dessa data. Med andra ord skickar den här modellen data från en part till en annan via en först-i-först-ut-kö ("FIFO"). Försök att tänka på kanaler som du skulle göra med andra vanliga generiska samlingstyper, till exempel en List<T>. Den primära skillnaden är att den här samlingen hanterar synkronisering och tillhandahåller olika förbrukningsmodeller via fabriksskapande alternativ. De här alternativen styr beteendet för kanalerna, till exempel hur många element de får lagra och vad som händer om den gränsen nås, eller om kanalen nås av flera producenter eller flera konsumenter samtidigt.

Avgränsningsstrategier

Beroende på hur en Channel<T> skapas beter sig läsaren och författaren annorlunda.

Om du vill skapa en kanal som anger en maximal kapacitet anropar du Channel.CreateBounded. Om du vill skapa en kanal som används av valfritt antal läsare och författare samtidigt anropar du Channel.CreateUnbounded. Varje avgränsningsstrategi exponerar olika skapardefinierade alternativ, antingen BoundedChannelOptions eller UnboundedChannelOptions .

Kommentar

Oavsett avgränsningsstrategin genererar en kanal alltid en ChannelClosedException när den används när den har stängts.

Obundna kanaler

Om du vill skapa en obundna kanal anropar du en av överlagringarna Channel.CreateUnbounded :

var channel = Channel.CreateUnbounded<T>();

När du skapar en obundna kanal kan kanalen som standard användas av valfritt antal läsare och författare samtidigt. Du kan också ange nondefault-beteende när du skapar en obundna kanal genom att ange en UnboundedChannelOptions instans. Kanalens kapacitet är obundna och alla skrivningar utförs synkront. Fler exempel finns i Obundna skapandemönster.

Avgränsade kanaler

Om du vill skapa en begränsad kanal anropar du en av överlagringarna Channel.CreateBounded :

var channel = Channel.CreateBounded<T>(7);

Föregående kod skapar en kanal som har en maximal kapacitet för 7 objekt. När du skapar en begränsad kanal är kanalen bunden till en maximal kapacitet. När gränsen nås är standardbeteendet att kanalen asynkront blockerar producenten tills utrymmet blir tillgängligt. Du kan konfigurera det här beteendet genom att ange ett alternativ när du skapar kanalen. Avgränsade kanaler kan skapas med ett kapacitetsvärde som är större än noll. Andra exempel finns i Mönster för begränsad skapande.

Fullständigt lägesbeteende

När du använder en begränsad kanal kan du ange vilket beteende kanalen följer när den konfigurerade gränsen nås. I följande tabell visas de fullständiga beteendena för varje BoundedChannelFullMode värde:

Värde Funktionssätt
BoundedChannelFullMode.Wait Detta är standardvärdet. Anrop för att WriteAsync vänta på att utrymme ska vara tillgängligt för att slutföra skrivåtgärden. Anrop för att TryWrite returnera false omedelbart.
BoundedChannelFullMode.DropNewest Tar bort och ignorerar det senaste objektet i kanalen för att göra plats för objektet som skrivs.
BoundedChannelFullMode.DropOldest Tar bort och ignorerar det äldsta objektet i kanalen för att göra plats för objektet som skrivs.
BoundedChannelFullMode.DropWrite Släpper objektet som skrivs.

Viktigt!

När en Channel<TWrite,TRead>.Writer producerar snabbare än en Channel<TWrite,TRead>.Reader kan konsumera, upplever kanalens författare tillbaka tryck.

Producent-API:er

Producentfunktionen exponeras på Channel<TWrite,TRead>.Writer. Producent-API:erna och det förväntade beteendet beskrivs i följande tabell:

API Förväntat beteende
ChannelWriter<T>.Complete Markerar kanalen som fullständig, vilket innebär att inga fler objekt skrivs till den.
ChannelWriter<T>.TryComplete Försöker markera kanalen som slutförd, vilket innebär att inga fler data skrivs till den.
ChannelWriter<T>.TryWrite Försöker skriva det angivna objektet till kanalen. När det används med en obunden kanal returneras true detta alltid om inte kanalens skrivare signalerar slutförande med antingen ChannelWriter<T>.Complete, eller ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Returnerar en ValueTask<TResult> som slutförs när det finns utrymme för att skriva ett objekt.
ChannelWriter<T>.WriteAsync Skriver asynkront ett objekt till kanalen.

Konsument-API:er

Konsumentfunktionerna exponeras på Channel<TWrite,TRead>.Reader. Konsument-API:er och förväntat beteende beskrivs i följande tabell:

API Förväntat beteende
ChannelReader<T>.ReadAllAsync Skapar ett IAsyncEnumerable<T> som gör det möjligt att läsa alla data från kanalen.
ChannelReader<T>.ReadAsync Läser asynkront ett objekt från kanalen.
ChannelReader<T>.TryPeek Försök att titta på ett objekt från kanalen.
ChannelReader<T>.TryRead Försöker läsa ett objekt från kanalen.
ChannelReader<T>.WaitToReadAsync Returnerar en ValueTask<TResult> som slutförs när data är tillgängliga för läsning.

Vanliga användningsmönster

Det finns flera användningsmönster för kanaler. API:et är utformat för att vara enkelt, konsekvent och så flexibelt som möjligt. Alla asynkrona metoder returnerar en ValueTask (eller ValueTask<bool>) som representerar en enkel asynkron åtgärd som kan undvika allokering om åtgärden slutförs synkront och potentiellt till och med asynkront. Dessutom är API:et utformat för att vara komposterbart, eftersom skaparen av en kanal ger löften om sin avsedda användning. När en kanal skapas med vissa parametrar kan den interna implementeringen fungera mer effektivt med dessa löften.

Skapandemönster

Tänk dig att du skapar en producent-/konsumentlösning för ett globalt positionssystem (GPS). Du vill spåra koordinaterna för en enhet över tid. Ett exempelkoordinatobjekt kan se ut så här:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Obundna skapandemönster

Ett vanligt användningsmönster är att skapa en obundna standardkanal:

var channel = Channel.CreateUnbounded<Coordinates>();

Men i stället kan vi anta att du vill skapa en obundna kanal med flera producenter och konsumenter:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

I det här fallet är alla skrivningar synkrona, även WriteAsync. Detta beror på att en obundna kanal alltid har tillgängligt utrymme för en skrivning effektivt omedelbart. Men med AllowSynchronousContinuations inställt på truekan skrivningarna i slutändan utföra arbete som är associerat med en läsare genom att köra deras fortsättningar. Detta påverkar inte åtgärdens synkronicitet.

Mönster för avgränsad skapande

Med begränsade kanaler bör konfigurerbarheten för kanalen vara känd för konsumenten för att säkerställa korrekt förbrukning. Konsumenten bör alltså veta vilket beteende kanalen uppvisar när den konfigurerade gränsen nås. Nu ska vi utforska några av de vanliga avgränsade skapandemönstren.

Det enklaste sättet att skapa en begränsad kanal är att ange en kapacitet:

var channel = Channel.CreateBounded<Coordinates>(1);

Föregående kod skapar en begränsad kanal med en maximal kapacitet på 1. Andra alternativ är tillgängliga, vissa alternativ är samma som en obundna kanal, medan andra är specifika för obundna kanaler:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

I föregående kod skapas kanalen som en begränsad kanal som är begränsad till 1 000 objekt, med en enda skrivare men många läsare. Dess funktionssätt i fullständigt läge definieras som DropWrite, vilket innebär att objektet som skrivs om kanalen är full tas bort.

Om du vill observera objekt som tas bort när du använder begränsade kanaler registrerar du ett itemDropped återanrop:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

När kanalen är full och ett nytt objekt läggs till anropas återanropet itemDropped . I det här exemplet skriver det angivna återanropet objektet till konsolen, men du kan vidta andra åtgärder som du vill.

Producentmönster

Anta att producenten i det här scenariot skriver nya koordinater till kanalen. Producenten kan göra detta genom att anropa TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Föregående producentkod:

  • Channel<Coordinates>.Writer Accepterar (ChannelWriter<Coordinates>) som ett argument, tillsammans med den inledande Coordinates.
  • Definierar en villkorlig while loop som försöker flytta koordinaterna med hjälp av TryWrite.

En alternativ producent kan använda WriteAsync metoden:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Återigen används i Channel<Coordinates>.Writer en while loop. Men den här gången anropas WriteAsync metoden. Metoden fortsätter först när koordinaterna har skrivits. När loopen while avslutas görs ett anrop till Complete , vilket signalerar att inga fler data skrivs till kanalen.

Ett annat producentmönster är att använda WaitToWriteAsync metoden, tänk på följande kod:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Som en del av villkoret whileanvänds resultatet av anropet WaitToWriteAsync för att avgöra om loopen ska fortsätta.

Konsumentmönster

Det finns flera vanliga kanalkonsumentmönster. När en kanal aldrig slutar, vilket innebär att den producerar data på obestämd tid, kan konsumenten använda en while (true) loop och läsa data när den blir tillgänglig:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Kommentar

Den här koden utlöser ett undantag om kanalen stängs.

En alternativ konsument skulle kunna undvika detta problem med hjälp av en kapslad while-loop, som du ser i följande kod:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

I föregående kod väntar konsumenten på att läsa data. När data är tillgängliga försöker konsumenten läsa dem. Dessa loopar fortsätter att utvärderas tills producenten av kanalen signalerar att den inte längre har data att läsa. Med detta sagt, när en producent är känd för att ha ett begränsat antal objekt som den producerar och det signalerar slutförande, kan konsumenten använda await foreach semantik för att iterera över objekten:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Föregående kod använder ReadAllAsync metoden för att läsa alla koordinater från kanalen.

Se även