Teilen über


Bibliothek „System.Threading.Channels“

Der System.Threading.Channels-Namespace bietet eine Reihe von Synchronisierungsdatenstrukturen für den asynchronen Datenaustausch zwischen Producer und Consumer. Die Bibliothek ist für .NET Standard bestimmt und funktioniert für alle .NET-Implementierungen.

Diese Bibliothek ist im NuGet-Paket System.Threading.Channels verfügbar. Wenn Sie jedoch .NET Core 3.0 oder höher verwenden, ist das Paket als Teil des Frameworks enthalten.

Konzeptionelles Producer/Consumer-Programmiermodell

Kanäle sind eine Implementierung des konzeptuellen Producer/Consumer-Programmiermodells. Bei diesem Programmiermodell erzeugen Producer asynchron Daten, und Consumer nutzen diese Daten asynchron. Anders gesagt: Dieses Modell leitet Daten von einer Partei an eine andere über eine First-In-First-Out-Warteschlange („FIFO“) weiter. Sie können sich Kanäle wie jeden anderen allgemeinen generischen Sammlungstyp vorstellen, z. B. List<T>. Der Hauptunterschied besteht darin, dass mit dieser Sammlung die Synchronisierung verwaltet wird und über die Factoryerstellungsoptionen verschiedene Verbrauchsmodelle bereitgestellt werden. Diese Optionen steuern das Verhalten der Kanäle, z. B. die Anzahl der Elemente, die sie speichern dürfen, und was passiert, wenn dieser Grenzwert erreicht wird, oder ob der Kanal von mehreren Producern oder mehreren Consumern gleichzeitig aufgerufen wird.

Begrenzungsstrategien

Je nachdem, wie ein Channel<T> erstellt wird, verhalten sich Reader und Writer unterschiedlich.

Rufen Sie Channel.CreateBounded auf, um einen Kanal zu erstellen, der eine maximale Kapazität angibt. Rufen Sie zum Erstellen eines Kanals, der von einer beliebigen Anzahl von Readern und Writern gleichzeitig verwendet werden kann, Channel.CreateUnbounded auf. Jede Begrenzungsstrategie macht verschiedene creatordefinierte Optionen verfügbar, entweder BoundedChannelOptions oder UnboundedChannelOptions.

Hinweis

Unabhängig von der Begrenzungsstrategie löst ein Kanal immer eine ChannelClosedException aus, wenn er nach dem Schließen verwendet wird.

Unbegrenzte Kanäle

Rufen Sie zum Erstellen eines unbegrenzten Kanals eine der Channel.CreateUnbounded-Überladungen auf:

var channel = Channel.CreateUnbounded<T>();

Wenn Sie einen unbegrenzten Kanal erstellen, kann der Kanal standardmäßig von einer beliebigen Anzahl von Readern und Writern gleichzeitig verwendet werden. Alternativ können Sie beim Erstellen eines unbegrenzten Kanals ein nicht standardmäßiges Verhalten angeben, indem Sie eine UnboundedChannelOptions-Instanz bereitstellen. Die Kapazität des Kanals ist unbegrenzt, und alle Schreibvorgänge werden synchron ausgeführt. Weitere Beispiele finden Sie unter Muster zum Erstellen eines unbegrenzten Kanals.

Begrenzte Kanäle

Rufen Sie zum Erstellen eines begrenzten Kanals eine der Channel.CreateBounded-Überladungen auf:

var channel = Channel.CreateBounded<T>(7);

Mit dem obigen Code wird ein Kanal mit einer maximalen Kapazität von 7 Elementen erstellt. Wenn Sie einen begrenzten Kanal erstellen, ist der Kanal auf eine maximale Kapazität begrenzt. Wenn die Grenze erreicht ist, verhält sich der Kanal standardmäßig so, dass er den Producer asynchron blockiert, bis Speicherplatz verfügbar wird. Sie können dieses Verhalten konfigurieren, indem Sie beim Erstellen des Kanals eine Option angeben. Begrenzte Kanäle können mit einem beliebigen Kapazitätswert größer als null erstellt werden. Andere Beispiele finden Sie unter Muster zum Erstellen eines begrenzten Kanals.

Vollmodusverhalten

Wenn Sie einen begrenzten Kanal verwenden, können Sie das Verhalten des Kanals bei Erreichen der konfigurierten Grenze angeben. In der folgenden Tabelle sind die Verhaltensweisen im Vollmodus für jeden BoundedChannelFullMode-Wert aufgeführt:

Wert Verhalten
BoundedChannelFullMode.Wait Dies ist der Standardwert. Aufrufe an WriteAsync warten darauf, dass Platz verfügbar ist, um den Schreibvorgang abzuschließen. Aufrufe an TryWrite geben sofort false zurück.
BoundedChannelFullMode.DropNewest Entfernt und ignoriert das neueste Element im Kanal, um Platz für das zu schreibende Element zu schaffen.
BoundedChannelFullMode.DropOldest Entfernt und ignoriert das älteste Element im Kanal, um Platz für das zu schreibende Element zu schaffen.
BoundedChannelFullMode.DropWrite Löscht das Element, das geschrieben wird.

Wichtig

Wenn ein Channel<TWrite,TRead>.Writer Elemente schneller erzeugt, als ein Channel<TWrite,TRead>.Reader sie nutzen kann, entsteht beim Writer des Kanals ein Rückstau.

Producer-APIs

Die Producerfunktionalität wird auf dem Channel<TWrite,TRead>.Writer verfügbar gemacht. Die Producer-APIs und das erwartete Verhalten sind in der folgenden Tabelle aufgeführt:

API Erwartetes Verhalten
ChannelWriter<T>.Complete Markiert den Kanal als abgeschlossen, d. h. es werden keine weiteren Elemente geschrieben.
ChannelWriter<T>.TryComplete Versuche, den Kanal als abgeschlossen zu markieren, bedeuten, dass keine weiteren Daten geschrieben werden.
ChannelWriter<T>.TryWrite Versucht, das angegebene Element an den Kanal zu schreiben. Bei Verwendung mit einem unbegrenzten Kanal wird damit immer true zurückgegeben, es sei denn, der Writer des Kanals signalisiert mit ChannelWriter<T>.Complete oder ChannelWriter<T>.TryComplete den Abschluss.
ChannelWriter<T>.WaitToWriteAsync Gibt eine ValueTask<TResult> zurück, die abgeschlossen wird, wenn Speicherplatz zum Schreiben eines Elements verfügbar ist.
ChannelWriter<T>.WriteAsync Schreibt asynchron ein Element in den Kanal.

Consumer-APIs

Die Consumerfunktionalität wird auf dem Channel<TWrite,TRead>.Reader verfügbar gemacht. Die Consumer-APIs und das erwartete Verhalten sind in der folgenden Tabelle aufgeführt:

API Erwartetes Verhalten
ChannelReader<T>.ReadAllAsync Es erstellt eine IAsyncEnumerable<T>, die das Lesen aller Daten aus dem Kanal ermöglicht.
ChannelReader<T>.ReadAsync Liest asynchron ein Element aus dem Kanal.
ChannelReader<T>.TryPeek Versucht, ein Element aus dem Kanal einzusehen.
ChannelReader<T>.TryRead Versucht, ein Element aus dem Kanal zu lesen.
ChannelReader<T>.WaitToReadAsync Gibt eine ValueTask<TResult> zurück, die abgeschlossen wird, wenn Daten zum Lesen verfügbar sind.

Gängige Verwendungsmuster

Es gibt mehrere Nutzungsmuster für Kanäle. Die API ist so konzipiert, dass sie einfach, konsistent und so flexibel wie möglich ist. Alle asynchronen Methoden geben einen ValueTask (oder ValueTask<bool>) zurück, der einen einfachen asynchronen Vorgang darstellt, mit dem eine Zuordnung vermieden wird, wenn der Vorgang synchron und potenziell sogar asynchron durchgeführt wird. Darüber hinaus ist die API so konzipiert, dass sie zusammengesetzt werden kann, indem der Creator eines Kanals Zusagen hinsichtlich der beabsichtigten Nutzung macht. Wenn ein Kanal mit bestimmten Parametern erstellt wird, kann die interne Implementierung effizienter mit diesen Zusagen arbeiten.

Erstellungsmuster

Stellen Sie sich vor, Sie erstellen eine Producer/Consumer-Lösung für ein globales Positionssystem (GPS). Sie möchten die Koordinaten eines Geräts im zeitlichen Verlauf nachverfolgen. Ein Beispiel für ein Koordinatenobjekt könnte wie folgt aussehen:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

Muster zum Erstellen eines unbegrenzten Kanals

Ein gängiges Nutzungsmuster besteht darin, einen unbegrenzten Standardkanal zu erstellen:

var channel = Channel.CreateUnbounded<Coordinates>();

Stellen Sie sich stattdessen vor, dass Sie einen unbegrenzten Kanal mit mehreren Producern und Consumern erstellen:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

In diesem Fall sind alle Schreibvorgänge synchron, auch die WriteAsync. Der Grund dafür ist, dass ein unbegrenzter Kanal immer Platz zum sofortigen Ausführen eines Schreibvorgangs hat. Wenn AllowSynchronousContinuations jedoch auf true festgelegt ist, können die Schreibvorgänge dazu führen, dass die mit einem Reader verknüpften Arbeiten ausgeführt werden, indem deren Fortsetzung ausgeführt wird. Dies wirkt sich nicht auf die Synchronisierung des Vorgangs aus.

Muster zum Erstellen eines begrenzten Kanals

Mit begrenzten Kanälen sollte dem Consumer zur Gewährleistung einer ordnungsgemäßen Nutzung die Konfigurierbarkeit des Kanals bekannt sein. Das heißt, der Consumer muss wissen, wie sich der Kanal verhält, wenn die konfigurierte Grenze erreicht ist. Im Folgenden finden Sie einige gängige Muster zum Erstellen eines begrenzten Kanals.

Die einfachste Möglichkeit zum Erstellen eines begrenzten Kanals besteht darin, eine Kapazität anzugeben:

var channel = Channel.CreateBounded<Coordinates>(1);

Mit dem vorstehenden Code wird ein begrenzter Kanal mit einer maximalen Kapazität von 1 erstellt. Es stehen weitere Optionen zur Verfügung, von denen einige dieselben sind wie bei einem unbegrenzten Kanal, während andere speziell für unbegrenzte Kanäle vorgesehen sind:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

Im vorstehenden Code wird der Kanal als begrenzter Kanal erstellt, der auf 1.000 Elemente begrenzt ist, mit einem einzigen Writer, aber vielen Readern. Das Vollmodusverhalten wird als DropWrite definiert, was bedeutet, dass das geschriebene Element gelöscht wird, wenn der Kanal voll ist.

Wenn Sie einen begrenzten Kanal verwenden, um Elemente zu beobachten, die gelöscht werden, registrieren Sie einen itemDropped-Rückruf:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

Wenn der Kanal voll ist und ein neues Element hinzugefügt wird, wird der itemDropped-Rückruf aufgerufen. In diesem Beispiel wird das Element mit dem bereitgestellten Rückruf in die Konsole geschrieben. Sie können jedoch auch eine andere Aktion ausführen.

Producermuster

Stellen Sie sich vor, der Producer in diesem Szenario schreibt neue Koordinaten in den Kanal. Der Producer kann dies tun, indem er TryWriteaufruft:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

Der vorstehende Producercode:

  • Akzeptiert Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) als Argument zusammen mit den ursprünglichen Coordinates.
  • Definiert eine bedingte while-Schleife, mit der die Koordinaten mithilfe von TryWrite verschoben werden.

Ein anderer Producer kann beispielsweise auch die WriteAsync-Methode verwenden:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Auch hier werden die Channel<Coordinates>.Writer in einer while-Schleife verwendet. Dieses Mal wird jedoch die WriteAsync-Methode aufgerufen. Die Methode wird erst fortgesetzt, nachdem die Koordinaten geschrieben wurden. Wenn die while-Schleife beendet wird, wird Complete aufgerufen und damit signalisiert, dass keine weiteren Daten in den Kanal geschrieben werden.

Ein weiteres Producermuster besteht darin, die WaitToWriteAsync-Methode zu verwenden. Verwenden Sie dazu folgenden Code:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

Im Rahmen der bedingten while-Schleife wird das Ergebnis des WaitToWriteAsync-Aufrufs verwendet, um festzustellen, ob die Schleife fortgesetzt werden soll

Consumermuster

Es gibt mehrere gängige Kanalconsumermuster. Wenn ein Kanal nie endet, was bedeutet, dass er unendlich Daten erzeugt, kann der Consumer eine while (true)-Schleife verwenden und Daten lesen, sobald er verfügbar wird:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

Hinweis

Dieser Code löst eine Ausnahme aus, wenn der Kanal geschlossen wird.

Ein anderer Consumer kann dieses Problem vermeiden, indem er wie im folgenden Code eine geschachtelte WHILE-Schleife verwendet:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

Im obigen Code wartet der Consumer mit dem Lesen von Daten. Sobald die Daten verfügbar sind, versucht der Consumer, sie zu lesen. Diese Schleifen werden so lange ausgewertet, bis der Producer des Kanals signalisiert, dass er keine zu lesenden Daten mehr hat. Wenn also bekannt ist, dass ein Producer eine endliche Anzahl von Elementen erzeugt und signalisiert, dass er fertig ist, kann der Consumer mithilfe von await foreach-Semantik die Elemente durchlaufen:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

Im vorherigen Code wird die ReadAllAsync-Methode verwendet, um alle Koordinaten aus dem Kanal zu lesen.

Weitere Informationen