Delen via


System.Threading.Channels-bibliotheek

De System.Threading.Channels naamruimte biedt een set synchronisatiegegevensstructuren voor het asynchroon doorgeven van gegevens tussen producenten en consumenten. De bibliotheek is gericht op .NET Standard en werkt op alle .NET-implementaties.

Deze bibliotheek is beschikbaar in het NuGet-pakket System.Threading.Channels . Als u echter .NET Core 3.0 of hoger gebruikt, wordt het pakket opgenomen als onderdeel van het framework.

Conceptueel programmeermodel producent/consument

Kanalen zijn een implementatie van het conceptueel programmeermodel voor producenten/consumenten. In dit programmeermodel produceren producenten asynchroon gegevens en verbruiken ze die gegevens asynchroon. Met andere woorden, dit model draagt gegevens van de ene partij naar de andere af. Probeer kanalen te beschouwen zoals elk ander algemeen algemeen verzamelingstype, zoals een List<T>. Het belangrijkste verschil is dat deze verzameling synchronisatie beheert en verschillende verbruiksmodellen biedt via opties voor het maken van fabrieken. Deze opties bepalen het gedrag van de kanalen, zoals het aantal elementen dat ze mogen opslaan en wat er gebeurt als die limiet is bereikt, of of het kanaal wordt geopend door meerdere producenten of meerdere consumenten tegelijk.

Begrenzingsstrategieën

Afhankelijk van hoe een Channel<T> wordt gemaakt, gedragen de lezer en schrijver zich anders.

Als u een kanaal wilt maken dat een maximale capaciteit aangeeft, roept u aan Channel.CreateBounded. Als u een kanaal wilt maken dat door een willekeurig aantal lezers en schrijvers tegelijk wordt gebruikt, roept u aan Channel.CreateUnbounded. Elke begrenzingsstrategie bevat verschillende door de maker gedefinieerde opties, of BoundedChannelOptionsUnboundedChannelOptions respectievelijk.

Notitie

Ongeacht de begrenzingsstrategie genereert een kanaal altijd een ChannelClosedException wanneer het wordt gebruikt nadat het is gesloten.

Niet-gekoppelde kanalen

Als u een niet-afhankelijk kanaal wilt maken, roept u een van de Channel.CreateUnbounded overbelastingen aan:

var channel = Channel.CreateUnbounded<T>();

Wanneer u een niet-gebonden kanaal maakt, kan het kanaal standaard door een willekeurig aantal lezers en schrijvers tegelijk worden gebruikt. U kunt ook niet-standaardgedrag opgeven bij het maken van een niet-gebonden kanaal door een UnboundedChannelOptions exemplaar op te geven. De capaciteit van het kanaal is niet gebonden en alle schrijfbewerkingen worden synchroon uitgevoerd. Zie Niet-gebonden aanmaakpatronen voor meer voorbeelden.

Gebonden kanalen

Als u een gebonden kanaal wilt maken, roept u een van de Channel.CreateBounded overbelastingen aan:

var channel = Channel.CreateBounded<T>(7);

Met de voorgaande code maakt u een kanaal met een maximale capaciteit van 7 items. Wanneer u een gebonden kanaal maakt, is het kanaal gebonden aan een maximale capaciteit. Wanneer de limiet is bereikt, is het standaardgedrag dat het kanaal asynchroon de producent blokkeert totdat er ruimte beschikbaar is. U kunt dit gedrag configureren door een optie op te geven wanneer u het kanaal maakt. Gebonden kanalen kunnen worden gemaakt met elke capaciteitswaarde die groter is dan nul. Zie Voor andere voorbeelden patronen voor het maken van gebonden items.

Gedrag van de volledige modus

Wanneer u een gebonden kanaal gebruikt, kunt u het gedrag opgeven waarop het kanaal zich houdt wanneer de geconfigureerde afhankelijkheid wordt bereikt. De volgende tabel bevat het gedrag van de volledige modus voor elke BoundedChannelFullMode waarde:

Weergegeven als Gedrag
BoundedChannelFullMode.Wait Dit is de standaardwaarde. Oproepen om te WriteAsync wachten tot er ruimte beschikbaar is om de schrijfbewerking te voltooien. Oproepen om onmiddellijk terug te TryWrite keren false .
BoundedChannelFullMode.DropNewest Hiermee verwijdert en negeert u het nieuwste item in het kanaal om ruimte te maken voor het item dat wordt geschreven.
BoundedChannelFullMode.DropOldest Hiermee verwijdert en negeert u het oudste item in het kanaal om ruimte te maken voor het item dat wordt geschreven.
BoundedChannelFullMode.DropWrite Hiermee wordt het item weggeschreven.

Belangrijk

Wanneer een Channel<TWrite,TRead>.Writer produceert sneller dan een Channel<TWrite,TRead>.Reader kan verbruiken, ervaart de schrijver van het kanaal rugdruk.

Producer-API's

De functionaliteit van de producent wordt weergegeven op de Channel<TWrite,TRead>.Writer. De producer-API's en het verwachte gedrag worden in de volgende tabel beschreven:

API Verwacht gedrag
ChannelWriter<T>.Complete Markeert het kanaal als voltooid, wat betekent dat er geen items meer naar het kanaal worden geschreven.
ChannelWriter<T>.TryComplete Pogingen om het kanaal als voltooid te markeren, wat betekent dat er geen gegevens meer naar het kanaal worden geschreven.
ChannelWriter<T>.TryWrite Probeert het opgegeven item naar het kanaal te schrijven. Wanneer het wordt gebruikt met een niet-gebonden kanaal, wordt dit altijd geretourneerd true , tenzij de schrijversignalen van het kanaal zijn voltooid met ofwel ChannelWriter<T>.Complete, of ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Retourneert een ValueTask<TResult> bewerking die wordt voltooid wanneer er ruimte beschikbaar is om een item te schrijven.
ChannelWriter<T>.WriteAsync Asynchroon schrijft een item naar het kanaal.

Api's voor consumenten

De consumentenfunctionaliteit wordt weergegeven op de Channel<TWrite,TRead>.Reader. De consumenten-API's en het verwachte gedrag worden in de volgende tabel beschreven:

API Verwacht gedrag
ChannelReader<T>.ReadAllAsync Hiermee maakt u een IAsyncEnumerable<T> bestand waarmee alle gegevens uit het kanaal kunnen worden gelezen.
ChannelReader<T>.ReadAsync Asynchroon leest een item uit het kanaal.
ChannelReader<T>.TryPeek Probeert een item van het kanaal te bekijken.
ChannelReader<T>.TryRead Probeert een item uit het kanaal te lezen.
ChannelReader<T>.WaitToReadAsync Retourneert een ValueTask<TResult> bewerking die wordt voltooid wanneer gegevens beschikbaar zijn om te lezen.

Algemene gebruikspatronen

Er zijn verschillende gebruikspatronen voor kanalen. De API is ontworpen om eenvoudig, consistent en zo flexibel mogelijk te zijn. Alle asynchrone methoden retourneren een ValueTask (of ValueTask<bool>) die een lichtgewicht asynchrone bewerking vertegenwoordigt die kan voorkomen dat de bewerking synchroon en mogelijk zelfs asynchroon wordt uitgevoerd. Bovendien is de API ontworpen om te kunnen worden composeerbaar, omdat de maker van een kanaal beloftes doet over het beoogde gebruik. Wanneer een kanaal met bepaalde parameters wordt gemaakt, kan de interne implementatie efficiënter werken met de kennis van deze beloften.

Patronen maken

Stel dat u een producent-/consumentenoplossing maakt voor een global position system (GPS). U wilt de coördinaten van een apparaat na verloop van tijd bijhouden. Een voorbeeldcoördinaatobject kan er als volgt uitzien:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Niet-gebonden aanmaakpatronen

Een veelvoorkomend gebruikspatroon is het maken van een standaard niet-gebonden kanaal:

var channel = Channel.CreateUnbounded<Coordinates>();

Maar stel dat u een niet-gebonden kanaal wilt maken met meerdere producenten en consumenten:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

In dit geval zijn alle schrijfbewerkingen synchroon, zelfs de WriteAsync. Dit komt doordat een niet-gebonden kanaal altijd direct beschikbare ruimte heeft voor een schrijfbewerking. Als AllowSynchronousContinuations dit is ingesteld true, kunnen de schrijfbewerkingen echter wel worden uitgevoerd die zijn gekoppeld aan een lezer door hun vervolgbewerkingen uit te voeren. Dit heeft geen invloed op de synchroniteit van de bewerking.

Patronen voor gebonden creatie

Bij gebonden kanalen moet de configureerbaarheid van het kanaal bekend zijn bij de consument om ervoor te zorgen dat het juiste verbruik wordt gegarandeerd. Dat wil zeggen dat de consument moet weten welk gedrag het kanaal vertoont wanneer de geconfigureerde afhankelijkheid wordt bereikt. Laten we enkele veelvoorkomende patronen voor het maken van grenzen verkennen.

De eenvoudigste manier om een gebonden kanaal te maken, is door een capaciteit op te geven:

var channel = Channel.CreateBounded<Coordinates>(1);

Met de voorgaande code wordt een gebonden kanaal gemaakt met een maximale capaciteit van 1. Andere opties zijn beschikbaar, sommige opties zijn hetzelfde als een niet-gebonden kanaal, terwijl andere specifiek zijn voor niet-gebonden kanalen:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

In de voorgaande code wordt het kanaal gemaakt als een gebonden kanaal dat beperkt is tot 1000 items, met één schrijver maar veel lezers. Het gedrag van de volledige modus wordt gedefinieerd als DropWrite, wat betekent dat het item wordt weggeschreven als het kanaal vol is.

Als u items wilt bekijken die worden verwijderd wanneer u gebonden kanalen gebruikt, registreert u een itemDropped callback:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Wanneer het kanaal vol is en er een nieuw item wordt toegevoegd, wordt de itemDropped callback aangeroepen. In dit voorbeeld schrijft de opgegeven callback het item naar de console, maar u kunt alle andere acties ondernemen die u wilt uitvoeren.

Producentenpatronen

Stel dat de producent in dit scenario nieuwe coördinaten naar het kanaal schrijft. De producent kan dit doen door het volgende aan te roepen TryWrite:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

De voorgaande producentcode:

  • Accepteert de Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) als argument, samen met het begin Coordinates.
  • Definieert een voorwaardelijke while lus die probeert de coördinaten te verplaatsen met behulp van TryWrite.

Een alternatieve producent kan de WriteAsync methode gebruiken:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Nogmaals, de Channel<Coordinates>.Writer wordt gebruikt binnen een while lus. Maar deze keer wordt de WriteAsync methode aangeroepen. De methode wordt pas voortgezet nadat de coördinaten zijn geschreven. Wanneer de while lus wordt afgesloten, wordt een aanroep uitgevoerd Complete , die aangeeft dat er geen gegevens meer naar het kanaal worden geschreven.

Een ander producentpatroon is om de WaitToWriteAsync methode te gebruiken, rekening houdend met de volgende code:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Als onderdeel van de voorwaarde whilewordt het resultaat van de WaitToWriteAsync aanroep gebruikt om te bepalen of de lus moet worden voortgezet.

Consumentenpatronen

Er zijn verschillende algemene kanaalconsumerpatronen. Wanneer een kanaal nooit eindigt, wat betekent dat het voor onbepaalde tijd gegevens produceert, kan de consument een while (true) lus gebruiken en gegevens lezen zodra het beschikbaar is:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Notitie

Met deze code wordt een uitzondering gegenereerd als het kanaal wordt gesloten.

Een alternatieve consument kan dit probleem vermijden door een geneste while-lus te gebruiken, zoals wordt weergegeven in de volgende code:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

In de voorgaande code wacht de consument op het lezen van gegevens. Zodra de gegevens beschikbaar zijn, probeert de consument deze te lezen. Deze lussen blijven evalueren totdat de producent van het kanaalsignalen dat deze geen gegevens meer bevat, moet worden gelezen. Met dat gezegd, wanneer een producent bekend is dat het een eindig aantal items heeft dat het produceert en de consument signalen voltooiing, kan de consument semantiek gebruiken await foreach om de items te herhalen:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

De voorgaande code gebruikt de ReadAllAsync methode om alle coördinaten van het kanaal te lezen.

Zie ook