Freigeben über


Bibliothek „System.Threading.Channels“

Der System.Threading.Channels-Namespace bietet eine Reihe von Synchronisierungsdatenstrukturen für den asynchronen Datenaustausch zwischen Producer und Consumer. Die Bibliothek zielt auf .NET, .NET Standard und .NET Framework ab und funktioniert für alle .NET-Implementierungen.

Diese Bibliothek ist im 📦 NuGet-Paket "System.Threading.Channels " verfügbar. Wenn Sie jedoch .NET Core 3.0 oder höher verwenden, wird das Paket als Teil des freigegebenen Frameworks enthalten.

Konzeptionelles Producer/Consumer-Programmiermodell

Kanäle sind eine Implementierung des konzeptuellen Producer/Consumer-Programmiermodells. Bei diesem Programmiermodell erzeugen Producer asynchron Daten, und Consumer nutzen diese Daten asynchron. Anders gesagt: Dieses Modell leitet Daten von einer Partei an eine andere über eine First-In-First-Out-Warteschlange („FIFO“) weiter. Stellen Sie sich Kanäle als einen anderen allgemeinen generischen Sammlungstyp vor, z. B. ein List<T>. Der Hauptunterschied besteht darin, dass mit dieser Sammlung die Synchronisierung verwaltet wird und über die Factoryerstellungsoptionen verschiedene Verbrauchsmodelle bereitgestellt werden. Diese Optionen steuern das Verhalten der Kanäle, z. B.:

  • Wie viele Elemente sie speichern dürfen und was passiert, wenn dieser Grenzwert erreicht ist.
  • Unabhängig davon, ob auf den Kanal von mehreren Produzenten oder mehreren Verbrauchern gleichzeitig zugegriffen wird.

Grundlegende Nutzung

Im folgenden Beispiel wird die grundlegende Verwendung eines Kanals veranschaulicht, in dem ein Produzent Elemente schreibt und ein Verbraucher sie liest:

static async Task BasicUsageAsync()
{
    Channel<int> channel = Channel.CreateUnbounded<int>();

    Task producer = ProduceAsync(channel.Writer);
    Task consumer = ConsumeAsync(channel.Reader);

    await Task.WhenAll(producer, consumer);

    static async Task ProduceAsync(ChannelWriter<int> writer)
    {
        for (int i = 0; i < 5; i++)
        {
            await writer.WriteAsync(i);
        }

        writer.Complete();
    }

    static async Task ConsumeAsync(ChannelReader<int> reader)
    {
        await foreach (int item in reader.ReadAllAsync())
        {
            Console.WriteLine($"Received: {item}");
        }
    }
}

Begrenzungsstrategien

Je nachdem, wie ein Channel<T> erstellt wird, verhalten sich Reader und Writer unterschiedlich.

Rufen Sie Channel.CreateBounded auf, um einen Kanal zu erstellen, der eine maximale Kapazität angibt. Rufen Sie zum Erstellen eines Kanals, der von einer beliebigen Anzahl von Readern und Writern gleichzeitig verwendet werden kann, Channel.CreateUnbounded auf. Jede Begrenzungsstrategie macht verschiedene benutzerdefinierte Optionen verfügbar, entweder BoundedChannelOptions oder UnboundedChannelOptions.

Hinweis

Unabhängig von der Begrenzungsstrategie löst ein Kanal immer einen ChannelClosedException Zeitpunkt aus, wenn er nach dem Schließen verwendet wird.

Unbegrenzte Kanäle

Rufen Sie zum Erstellen eines unbegrenzten Kanals eine der Channel.CreateUnbounded-Überladungen auf:

var channel = Channel.CreateUnbounded<T>();

Wenn Sie einen unbegrenzten Kanal erstellen, kann der Kanal standardmäßig von einer beliebigen Anzahl von Readern und Writern gleichzeitig verwendet werden. Alternativ können Sie beim Erstellen eines unbegrenzten Kanals ein nicht standardmäßiges Verhalten angeben, indem Sie eine UnboundedChannelOptions-Instanz bereitstellen. Die Kapazität des Kanals ist unbegrenzt, und alle Schreibvorgänge werden synchron ausgeführt. Weitere Beispiele finden Sie unter Unbegrenzte Erstellungsmodelle.

Begrenzte Kanäle

Rufen Sie zum Erstellen eines begrenzten Kanals eine der Channel.CreateBounded-Überladungen auf:

var channel = Channel.CreateBounded<T>(7);

Mit dem obigen Code wird ein Kanal mit einer maximalen Kapazität von 7 Elementen erstellt. Wenn Sie einen begrenzten Kanal erstellen, ist der Kanal auf eine maximale Kapazität begrenzt. Wenn die Grenze erreicht ist, verhält sich der Kanal standardmäßig so, dass er den Producer asynchron blockiert, bis Speicherplatz verfügbar wird. Sie können dieses Verhalten konfigurieren, indem Sie beim Erstellen des Kanals eine Option angeben. Begrenzte Kanäle können mit einem beliebigen Kapazitätswert größer als null erstellt werden. Andere Beispiele finden Sie unter Begrenzte Erstellmuster.

Vollmodusverhalten

Wenn Sie einen begrenzten Kanal verwenden, können Sie das Verhalten des Kanals bei Erreichen der konfigurierten Grenze angeben. In der folgenden Tabelle sind die Verhaltensweisen im Vollmodus für jeden BoundedChannelFullMode-Wert aufgeführt:

Wert Verhalten
BoundedChannelFullMode.Wait Dies ist der Standardwert. Aufrufe an WriteAsync warten darauf, dass Platz verfügbar ist, um den Schreibvorgang abzuschließen. Aufrufe an TryWrite geben sofort false zurück.
BoundedChannelFullMode.DropNewest Entfernt das neueste Element im Kanal und ignoriert es, um Platz für das Element zu schaffen, das geschrieben werden soll.
BoundedChannelFullMode.DropOldest Entfernt und ignoriert das älteste Element im Kanal, um Platz für das zu schreibende Element zu schaffen.
BoundedChannelFullMode.DropWrite Löscht das Element, das geschrieben wird.

Wichtig

Wenn ein Channel<TWrite,TRead>.Writer Elemente schneller erzeugt, als ein Channel<TWrite,TRead>.Reader sie nutzen kann, entsteht beim Writer des Kanals ein Rückstau.

Producer-APIs

Die Producerfunktionalität wird auf dem Channel<TWrite,TRead>.Writer verfügbar gemacht. Die Producer-APIs und das erwartete Verhalten sind in der folgenden Tabelle aufgeführt:

Programmierschnittstelle (API) Erwartetes Verhalten
ChannelWriter<T>.Complete Markiert den Kanal als abgeschlossen, d. h. es werden keine weiteren Elemente geschrieben.
ChannelWriter<T>.TryComplete Versuche, den Kanal als abgeschlossen zu markieren, bedeuten, dass keine weiteren Daten geschrieben werden.
ChannelWriter<T>.TryWrite Versucht, das angegebene Element an den Kanal zu schreiben. Bei Verwendung mit einem unbegrenzten Kanal wird damit immer true zurückgegeben, es sei denn, der Writer des Kanals signalisiert mit ChannelWriter<T>.Complete oder ChannelWriter<T>.TryComplete den Abschluss.
ChannelWriter<T>.WaitToWriteAsync Gibt ein ValueTask<TResult> zurück, das abgeschlossen wird, sobald Platz zum Schreiben eines Elements verfügbar ist.
ChannelWriter<T>.WriteAsync Schreibt asynchron ein Element in den Kanal.

Consumer-APIs

Die Consumerfunktionalität wird auf dem Channel<TWrite,TRead>.Reader verfügbar gemacht. Die Consumer-APIs und das erwartete Verhalten sind in der folgenden Tabelle aufgeführt:

Programmierschnittstelle (API) Erwartetes Verhalten
ChannelReader<T>.ReadAllAsync Erstellt ein IAsyncEnumerable<T>, das das Lesen aller Daten aus dem Kanal ermöglicht.
ChannelReader<T>.ReadAsync Liest asynchron ein Element aus dem Kanal.
ChannelReader<T>.TryPeek Versucht, ein Element aus dem Kanal einzusehen.
ChannelReader<T>.TryRead Versucht, ein Element aus dem Kanal zu lesen.
ChannelReader<T>.WaitToReadAsync Gibt eine ValueTask<TResult> zurück, die abgeschlossen wird, wenn Daten zum Lesen verfügbar sind.

Gängige Verwendungsmuster

Es gibt mehrere Verwendungsmuster für Kanäle:

Die API ist so konzipiert, dass sie einfach, konsistent und so flexibel wie möglich ist. Alle asynchronen Methoden geben einen ValueTask (oder ValueTask<bool>) zurück, der einen einfachen asynchronen Vorgang darstellt, mit dem eine Zuordnung vermieden wird, wenn der Vorgang synchron und potenziell sogar asynchron durchgeführt wird. Darüber hinaus ist die API so konzipiert, dass sie zusammengesetzt werden kann, indem der Creator eines Kanals Zusagen hinsichtlich der beabsichtigten Nutzung macht. Wenn ein Kanal mit bestimmten Parametern erstellt wird, kann die interne Implementierung effizienter arbeiten, indem sie diese Versprechen kennt.

Erzeugungsmuster

Stellen Sie sich vor, Sie erstellen eine Producer/Consumer-Lösung für ein globales Positionssystem (GPS). Sie möchten die Koordinaten eines Geräts im zeitlichen Verlauf nachverfolgen. Ein Beispiel für ein Koordinatenobjekt könnte wie folgt aussehen:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Unbegrenzte Erstellungsmuster

Ein gängiges Verwendungsmuster besteht darin, einen standardmäßigen ungebundenen Kanal zu erstellen:

var channel = Channel.CreateUnbounded<Coordinates>();

Stellen Sie sich aber vor, Sie möchten einen ungebundenen Kanal mit mehreren Produzenten und Verbrauchern erstellen. Festlegen SingleWriter = false und SingleReader = false in den Kanaloptionen:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

In diesem Fall sind alle Schreibvorgänge synchron, selbst WriteAsync. Dieses Verhalten tritt auf, da ein ungebundener Kanal immer sofort Platz für einen Schreibzugriff hat. Durch das Festlegen von AllowSynchronousContinuations auf true können die Schreibvorgänge jedoch Arbeiten übernehmen, die normalerweise einem Leser zugeordnet sind, indem sie deren Fortsetzungen ausführen. Diese Einstellung wirkt sich nicht auf die Synchronisierung des Vorgangs aus.

Begrenzte Erstellungsmuster

Bei gebundenen Kanälen sollte die Konfigurierbarkeit des Kanals für den Verbraucher bekannt sein, um einen ordnungsgemäßen Verbrauch zu gewährleisten. Das heißt, der Consumer muss wissen, wie sich der Kanal verhält, wenn die konfigurierte Grenze erreicht ist. Die folgenden Beispiele zeigen einige der allgemeinen gebundenen Erstellungsmuster.

Die einfachste Möglichkeit zum Erstellen eines gebundenen Kanals besteht darin, eine Kapazität anzugeben. Der folgende Code erstellt einen gebundenen Kanal mit einer maximalen Kapazität von 1.

var channel = Channel.CreateBounded<Coordinates>(1);

Es stehen weitere Optionen zur Verfügung. Einige Optionen sind identisch mit einem ungebundenen Kanal, während andere für gebundene Kanäle spezifisch sind. Im folgenden Code wird der Kanal als gebundener Kanal erstellt, der auf 1.000 Elemente beschränkt ist, mit einem einzigen Writer, aber vielen Lesern. Das Vollmodusverhalten wird als DropWrite definiert, was bedeutet, dass das geschriebene Element gelöscht wird, wenn der Kanal voll ist.

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Um Elemente zu beobachten, die bei der Verwendung von begrenzten Kanälen verloren gehen, registrieren Sie einen itemDropped-Rückruf:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Wenn der Kanal voll ist und ein neues Element hinzugefügt wird, wird der itemDropped-Rückruf aufgerufen. In diesem Beispiel wird das Element mit dem bereitgestellten Rückruf in die Konsole geschrieben. Sie können jedoch auch eine andere Aktion ausführen.

Produzentenmuster

Stellen Sie sich vor, dass der Produzent in diesem Szenario neue Koordinaten in den Kanal schreibt. Der Producer kann dies tun, indem er TryWriteaufruft:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Der vorangegangene Produzentencode:

  • Akzeptiert Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) als Argument zusammen mit den ursprünglichen Coordinates.
  • Definiert eine bedingte while-Schleife, mit der die Koordinaten mithilfe von TryWrite verschoben werden.

Ein alternativer Produzent könnte die WriteAsync-Methode verwenden:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Auch hier werden die Channel<Coordinates>.Writer in einer while-Schleife verwendet. Dieses Mal wird jedoch die WriteAsync-Methode aufgerufen. Die Methode wird erst fortgesetzt, nachdem die Koordinaten geschrieben wurden. Wenn die while-Schleife beendet wird, wird Complete aufgerufen und damit signalisiert, dass keine weiteren Daten in den Kanal geschrieben werden.

Ein weiteres Producermuster besteht darin, die WaitToWriteAsync-Methode zu verwenden. Verwenden Sie dazu folgenden Code:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Im Rahmen der bedingten while-Schleife wird das Ergebnis des WaitToWriteAsync-Aufrufs verwendet, um festzustellen, ob die Schleife fortgesetzt werden soll

Verbrauchermuster

Es gibt mehrere gängige Kanalconsumermuster. Wenn ein Kanal nie endet, was bedeutet, dass er unendlich Daten erzeugt, kann der Consumer eine while (true)-Schleife verwenden und Daten lesen, sobald er verfügbar wird:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Hinweis

Dieser Code löst eine Ausnahme aus, wenn der Kanal geschlossen ist.

Ein alternativer Verbraucher kann dieses Problem umgehen, indem er eine geschachtelte WHILE-Schleife wie im folgenden Code verwendet:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Im vorhergehenden Code wartet der Consumer darauf, Daten zu lesen. Sobald die Daten verfügbar sind, versucht der Consumer, sie zu lesen. Diese Schleifen werden so lange ausgewertet, bis der Produzent des Kanals signalisiert, dass keine Daten mehr gelesen werden müssen. Wenn also bekannt ist, dass ein producer eine endliche Anzahl von Elementen erzeugt und signalisiert, dass er fertig ist, kann der consumer mithilfe von await foreach-Semantik über die Elemente iterieren:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Im vorherigen Code wird die ReadAllAsync-Methode verwendet, um alle Koordinaten aus dem Kanal zu lesen.

Mehrere Produzenten und Verbraucher

Kanäle unterstützen mehrere gleichzeitige Erzeuger und Verbraucher. Um dies zu aktivieren, erstellen Sie einen Kanal mit SingleWriter = false und SingleReader = false in den Kanaloptionen. Dann verteilen Sie das Schreiben auf mehrere Produzententasks und das Lesen auf mehrere Konsumententasks.

static async Task UseMultipleProducersAndConsumersAsync()
{
    Channel<Coordinates> channel = Channel.CreateUnbounded<Coordinates>(
        new UnboundedChannelOptions
        {
            SingleWriter = false,
            SingleReader = false
        });

    // Start three concurrent producer tasks.
    Task[] producerTasks = Enumerable.Range(0, 3)
        .Select(id => ProduceAsync(id, channel))
        .ToArray();

    // Start two concurrent consumer tasks.
    Task[] consumerTasks = Enumerable.Range(0, 2)
        .Select(_ => ConsumeAsync(channel))
        .ToArray();

    // Wait for all producers to finish, then mark the channel as complete.
    await Task.WhenAll(producerTasks);
    channel.Writer.Complete();

    // Wait for all consumers to finish.
    await Task.WhenAll(consumerTasks);

    static async Task ProduceAsync(int id, Channel<Coordinates> channel)
    {
        Coordinates coordinates = new(
            DeviceId: Guid.NewGuid(),
            Latitude: -90 + (id * 30),
            Longitude: -180 + (id * 60));

        while (coordinates is { Latitude: < 90, Longitude: < 180 })
        {
            coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + 0.5,
                Longitude = coordinates.Longitude + 1
            };
                
            await channel.Writer.WriteAsync(coordinates);
        }
    }

    static async Task ConsumeAsync(Channel<Coordinates> channel)
    {
        await foreach (Coordinates coordinates in channel.Reader.ReadAllAsync())
        {
            Console.WriteLine(coordinates);
        }
    }
}

Der vorherige Code:

  • Erstellt einen ungebundenen Kanal, der explizit mehrere gleichzeitige Autoren und Leser unterstützt.
  • Startet drei parallele Produzentenaufgaben, die jeweils eine Reihe von Koordinaten mit einer eindeutigen Gerätekennzeichnung schreiben.
  • Startet zwei gleichzeitige Consumeraufgaben, wobei jede Aufgabe mit ReadAllAsync aus demselben Kanal liest.
  • Wartet, bis alle Produzenten fertig sind, und ruft dann auf Complete , um zu signalisieren, dass keine weiteren Daten in den Kanal geschrieben werden.
  • Wartet, bis alle Konsumenten die verbleibenden Daten aus dem Kanal verarbeiten.

Tipp

Rufen Sie bei mehreren Produzenten channel.Writer.Complete() nur auf, nachdem alle Produzenten das Schreiben abgeschlossen haben. Dadurch wird signalisiert, dass keine weiteren Daten mehr geschrieben werden, damit ReadAllAsync() nach dem Verbrauch aller verbleibenden Elemente abgeschlossen werden kann.

Siehe auch