System.Threading.Channels könyvtár

A System.Threading.Channels névtér szinkronizálási adatstruktúrák készletét biztosítja az adatok termelők és fogyasztók közötti aszinkron továbbításához. A kódtár a .NET, a .NET Standard és a .NET-keretrendszert célozza meg, és minden .NET-implementáción működik.

Ez a kódtár a 📦 System.Threading.Channels NuGet csomagban érhető el. Ha azonban .NET Core 3.0-s vagy újabb verziót használ, a csomag a megosztott keretrendszer részeként lesz elérhető.

Gyártói/fogyasztói elméleti programozási modell

A csatornák a gyártói/fogyasztói fogalmi programozási modell implementációi. Ebben a programozási modellben a gyártók aszinkron módon termelnek adatokat, és a fogyasztók aszinkron módon felhasználják ezeket az adatokat. Más szóval ez a modell egy "első be, első ki" (FIFO) soron keresztül továbbítja az adatokat az egyik féltől a másiknak. Gondoljon a csatornákra, mint bármely más általános gyűjteménytípusra, például egy List<T>. Az elsődleges különbség az, hogy ez a gyűjtemény kezeli a szinkronizálást, és különböző használati modelleket biztosít a gyári létrehozási lehetőségeken keresztül. Ezek a beállítások szabályozzák a csatornák viselkedését, például:

  • Hány elemet tárolhatnak, és mi történik, ha elérik ezt a korlátot.
  • Azt jelzi, hogy a csatornát egyszerre több gyártó vagy több felhasználó éri-e el.

Alapszintű használat

Az alábbi példa egy csatorna alapszintű használatát mutatja be, ahol a gyártó elemeket ír, és a fogyasztó felolvassa őket:

static async Task BasicUsageAsync()
{
    Channel<int> channel = Channel.CreateUnbounded<int>();

    Task producer = ProduceAsync(channel.Writer);
    Task consumer = ConsumeAsync(channel.Reader);

    await Task.WhenAll(producer, consumer);

    static async Task ProduceAsync(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 5; i++)
        {
            await writer.WriteAsync(i);
        }

        writer.Complete();
    }

    static async Task ConsumeAsync(ChannelReader<int> reader)
    {
        await foreach (int item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

Határoló stratégiák

A Channel<T> létrehozásának módjától függően az olvasó és az író eltérően viselkedik.

A maximális kapacitást meghatározó csatorna létrehozásához hívja meg a következőt Channel.CreateBounded: . Ha egy tetszőleges számú olvasó és író által egyidejűleg használt csatornát szeretne létrehozni, hívja meg a következőt Channel.CreateUnbounded: . Minden egyes határoló stratégia különböző, a létrehozó által definiált lehetőségeket tesz elérhetővé, illetve BoundedChannelOptions vagy UnboundedChannelOptions.

Megjegyzés

A határoló stratégiától függetlenül, egy csatorna mindig ChannelClosedException dob, amikor lezárása után használják.

Kötetlen csatornák

Korlátlan csatorna létrehozásához hívja meg az Channel.CreateUnbounded egyik túlterhelési változatot.

var channel = Channel.CreateUnbounded<T>();

Ha kötetlen csatornát hoz létre, alapértelmezés szerint a csatornát egyszerre tetszőleges számú olvasó és író használhatja. Másik lehetőségként megadhatja a nem alapértelmezett viselkedést egy kötetlen csatorna létrehozásakor egy UnboundedChannelOptions példány megadásával. A csatorna kapacitása korlátlan, és az összes írás szinkron módon történik. További példákért tekintse meg a kötetlen létrehozási mintákat.

Határolókeretes csatornák

Határolt csatorna létrehozásához hívja meg az egyik Channel.CreateBounded túlterhelt függvényt:

var channel = Channel.CreateBounded<T>(7);

Az előző kód létrehoz egy csatornát, amely maximális kapacitással rendelkezik 7 elemek számára. Határolókeretes csatorna létrehozásakor a csatorna maximális kapacitáshoz van kötve. Amikor elérjük a határt, az alapértelmezett viselkedés az, hogy a csatorna aszinkron módon blokkolja a termelőt, amíg nem szabadul fel hely. Ezt a viselkedést úgy konfigurálhatja, hogy megad egy lehetőséget a csatorna létrehozásakor. A határolt csatornák nullánál nagyobb kapacitásértékkel hozhatók létre. További példákért lásd a határolt létrehozási mintákat.

Teljes módú viselkedés

Határolókeretes csatorna használata esetén megadhatja, hogy a csatorna milyen viselkedést követ a konfigurált kötés elérésekor. Az alábbi táblázat az egyes BoundedChannelFullMode értékek teljes módú viselkedését sorolja fel:

Érték Működés
BoundedChannelFullMode.Wait Ez az alapértelmezett érték. Az WriteAsync függvényhívások megvárják, hogy hely rendelkezésre álljon az írási művelet befejezéséhez. A(z) TryWrite hívások azonnal visszatérnek false.
BoundedChannelFullMode.DropNewest Eltávolítja a csatorna legújabb elemét, hogy helyet adjon az újonnan írandó elemnek.
BoundedChannelFullMode.DropOldest Eltávolítja és figyelmen kívül hagyja a csatorna legrégebbi elemét, hogy helyet biztosítson az éppen megírt elemnek.
BoundedChannelFullMode.DropWrite Elveti az írás alatt lévő elemet.

Fontos

Amikor egy Channel<TWrite,TRead>.Writer gyorsabban termel, mint ahogy egy Channel<TWrite,TRead>.Reader fel tudja fogyasztani, a csatorna írója hátrányos nyomással találkozik.

Gyártói API-k

A gyártó funkció a Channel<TWrite,TRead>.Writer felületén kerül bemutatásra. A gyártó API-jait és a várt viselkedést az alábbi táblázatban találja:

API Várható viselkedés
ChannelWriter<T>.Complete Befejezettként jelöli meg a csatornát, ami azt jelenti, hogy a program nem ír bele több elemet.
ChannelWriter<T>.TryComplete A csatorna megjelölésére tett kísérletek befejezettként jelennek meg, ami azt jelenti, hogy a rendszer nem ír több adatot.
ChannelWriter<T>.TryWrite Megkísérli írni a megadott elemet a csatornába. Ha kötetlen csatornával használják, az mindig true-t ad vissza, hacsak a csatorna író nem jelzi a befejezést a következőkkel: ChannelWriter<T>.Complete vagy ChannelWriter<T>.TryComplete.
ChannelWriter<T>.WaitToWriteAsync Olyan ValueTask<TResult> értéket ad vissza, amely akkor fejeződik be, amikor rendelkezésre áll hely egy elem írásához.
ChannelWriter<T>.WriteAsync Aszinkron módon ír egy elemet a csatornába.

Fogyasztói API-k

A fogyasztói funkciók meg vannak jelenítve a Channel<TWrite,TRead>.Reader. A fogyasztói API-k és a várt viselkedés az alábbi táblázatban található:

API Várható viselkedés
ChannelReader<T>.ReadAllAsync Létrehoz egy olyan alkalmazást IAsyncEnumerable<T> , amely lehetővé teszi az összes adat beolvasását a csatornából.
ChannelReader<T>.ReadAsync Aszinkron módon olvas be egy elemet a csatornából.
ChannelReader<T>.TryPeek Megkísérli megtekinteni egy elemet a csatornából.
ChannelReader<T>.TryRead Megpróbál beolvasni egy elemet a csatornából.
ChannelReader<T>.WaitToReadAsync Olyan értéket ValueTask<TResult> ad vissza, amely akkor fejeződik be, ha az adatok olvashatók.

Gyakori használati minták

A csatornákhoz több használati minta is létezik:

Az API-t úgy tervezték, hogy egyszerű, konzisztens és lehető legrugalmasabb legyen. Az összes aszinkron metódus olyan ValueTask (vagy ValueTask<bool>) aszinkron műveletet ad vissza, amely egy egyszerűsített aszinkron műveletet jelöl, amely elkerülheti az allokálást, ha a művelet szinkron módon és akár aszinkron módon is befejeződik. Emellett az API-t úgy tervezték, hogy összeállítható legyen, mert a csatorna létrehozója ígéreteket tesz a kívánt használatról. Ha egy csatornát bizonyos paraméterekkel hoznak létre, a belső implementáció hatékonyabban működhet ezeknek az ígéreteknek az ismeretében.

Létrehozási minták

Tegyük fel, hogy gyártói/fogyasztói megoldást hoz létre egy globális pozíciórendszerhez (GPS). Nyomon szeretné követni egy eszköz koordinátáit az idő függvényében. A mintakoordináta-objektumok a következőképpen nézhetnek ki:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Kötetlen létrehozási minták

Az egyik gyakori használati minta egy alapértelmezett kötetlen csatorna létrehozása:

var channel = Channel.CreateUnbounded<Coordinates>();

Tegyük fel azonban, hogy több gyártóval és fogyasztóval szeretne kötetlen csatornát létrehozni. Adja meg a SingleWriter = false és SingleReader = false értékeket a csatorna beállításaiban.

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

Ebben az esetben minden írás szinkron, még a WriteAsync is. Ez a viselkedés azért fordul elő, mert a kötetlen csatornák mindig azonnal rendelkezésre állnak az íráshoz. Ezzel a AllowSynchronousContinuations beállítással azonban előfordulhat, hogy true az írásműveletek folytatások végrehajtásával olyan munkát végeznek, amely egy olvasóhoz van társítva. Ez a beállítás nem befolyásolja a művelet szinkronizálását.

Határolt létrehozási minták

Határolókeretes csatornák esetén a csatorna konfigurálhatóságát a fogyasztónak ismernie kell a megfelelő fogyasztás biztosítása érdekében. Vagyis a fogyasztónak tudnia kell, hogy a csatorna milyen viselkedést mutat a konfigurált kötés elérésekor. Az alábbi példák a közös határolt létrehozási mintákat mutatják be.

A határolt csatornák létrehozásának legegyszerűbb módja egy kapacitás megadása. Az alábbi kód egy határolt csatornát hoz létre, amelynek maximális kapacitása 1.

var channel = Channel.CreateBounded<Coordinates>(1);

Más beállítások is elérhetők. Egyes lehetőségek megegyeznek a kötetlen csatornákkal, míg mások a határolt csatornákra vonatkoznak. A következő kódban a csatorna egy 1000 elemre korlátozott, határolt csatornaként jön létre, egyetlen íróval, de sok olvasóval. A teljes üzemmód viselkedése a DropWritekövetkezőképpen van definiálva, ami azt jelenti, hogy a csatorna megtelése esetén a megírt elemet elveti.

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Ha a határolt csatornák használatakor elvetett elemeket szeretné megfigyelni, regisztráljon egy visszahívást itemDropped :

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Amikor megtelik a csatorna, és új elemet adnak hozzá, a rendszer meghívja a itemDropped visszahívási függvényt. Ebben a példában a megadott visszahívás írja ki az elemet a konzolra, de szabadon választhat bármilyen más műveletet is.

Gyártói minták

Tegyük fel, hogy ebben a forgatókönyvben a gyártó új koordinátákat ír a csatornának. A gyártó ezt a következő hívással TryWriteteheti meg:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Az előző gyártókód:

  • A Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) argumentumot elfogadja, a kezdeti Coordinates-vel együtt.
  • Olyan feltételes while hurkot definiál, amely a koordinátákat a következővel TryWritepróbálja áthelyezni: .

Egy alternatív gyártó a következő módszert használhatja WriteAsync :

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Ismét a Channel<Coordinates>.Writer használatos egy while ciklusban. Ezúttal azonban a WriteAsync metódust hívják meg. A metódus csak a koordináták megírása után folytatódik. Amikor a while hurokból kilépés történik, meghívásra kerül Complete, ezzel jelezve, hogy már nincs több adat a csatornára írva.

Egy másik gyártói minta a WaitToWriteAsync módszer használata, vegye figyelembe a következő kódot:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

A feltételes while részeként a WaitToWriteAsync hívás eredménye határozza meg, hogy folytatódik-e a ciklus.

Fogyasztói minták

Számos gyakori csatorna-fogyasztói minta létezik. Ha egy csatorna soha nem fejeződik be, vagyis határozatlan ideig állítja elő az adatokat, a fogyasztó használhat egy hurkot while (true) , és beolvassa az adatokat, amint elérhetővé válik:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Megjegyzés

Ez a kód kivételt eredményez, ha a csatorna bezárult.

Egy alternatív fogyasztó egy beágyazott while ciklus használatával elkerülheti ezt a problémát, amint az a következő kódban látható:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Az előző kódban a fogyasztó megvárja az adatok olvasását. Miután az adatok elérhetővé válnak, a fogyasztó megpróbálja elolvasni. Ezek a hurkok mindaddig kiértékelésre kerülnek, amíg a csatorna gyártója nem jelzi, hogy már nem rendelkezik beolvasandó adatokkal. Ha egy termelőről ismert, hogy véges számú tételt állít elő és jelzi a befejezést, a fogyasztó await foreach szemantikával iterálhatja a tételeket.

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Az előző kód a ReadAllAsync metódus használatával olvassa be a csatorna összes koordinátáját.

Több gyártó és fogyasztó

A csatornák több egyidejű gyártót és fogyasztót támogatnak. Ennek engedélyezéséhez hozzon létre egy csatornát, amely a csatorna beállításainál tartalmazza a SingleWriter = false és SingleReader = false elemeket. Akkor a feladatokat úgy oszthatja szét, hogy több termelői feladatra ír, és több fogyasztói feladatra olvashat.

static async Task UseMultipleProducersAndConsumersAsync()
{
    Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
        new UnboundedChannelOptions
        {
            SingleWriter = false,
            SingleReader = false
        });

    // Start three concurrent producer tasks.
    Task[] producerTasks = Enumerable.Range(0, 3)
        .Select(id => ProduceAsync(id, channel))
        .ToArray();

    // Start two concurrent consumer tasks.
    Task[] consumerTasks = Enumerable.Range(0, 2)
        .Select(_ => ConsumeAsync(channel))
        .ToArray();

    // Wait for all producers to finish, then mark the channel as complete.
    await Task.WhenAll(producerTasks);
    channel.Writer.Complete();

    // Wait for all consumers to finish.
    await Task.WhenAll(consumerTasks);

    static async Task ProduceAsync(int id, Channel<Coordinates> channel)
    {
        Coordinates coordinates = new(
            DeviceId: Guid.NewGuid(),
            Latitude: -90 + (id * 30),
            Longitude: -180 + (id * 60));

        while (coordinates is { Latitude: < 90, Longitude: < 180 })
        {
            coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + 0.5,
                Longitude = coordinates.Longitude + 1
            };
                
            await channel.Writer.WriteAsync(coordinates);
        }
    }

    static async Task ConsumeAsync(Channel<Coordinates> channel)
    {
        await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(coordinates);
        }
    }
}

Az előző kód:

  • Létrehoz egy kötetlen csatornát, amely explicit módon támogatja több egyidejű írót és olvasót.
  • Három párhuzamos készítői feladatot indít el, amelyek mindegyike egy sor koordinátát ír egyedi eszközazonosítóval.
  • Két egyidejű fogyasztói feladatot indít el, mindegyik ugyanazt a csatornát használja ReadAllAsync.
  • Megvárja, amíg minden gyártó befejezi a munkáját, majd meghívja Complete-t, hogy jelezzük, nincs több adat a csatornára írva.
  • Megvárja, amíg az összes fogyasztó befejezi a csatorna fennmaradó adatainak kiürítését.

Jótanács

Több gyártó esetén csak akkor hívja meg a channel.Writer.Complete()-t, ha minden gyártó befejezte az írást. Ez azt jelzi, hogy nem írnak több adatot, ami lehetővé teszi a ReadAllAsync() számára a befejezést az összes fennmaradó elem felhasználása után.

Lásd még