分享方式:


System.Threading.Channels 程式庫

System.Threading.Channels 命名空間提供一組同步資料結構,用來在生產者和消費者之間以非同步方式傳遞資料。 該程式庫鎖定 .NET Standard 且可在所有 .NET 實作中使用。

此程式庫可於 System.Threading.Channels NuGet 套件中使用。 不過,如果您使用 .NET Core 3.0 或更新版本,套件會包含在架構中。

產生者/取用者概念性程式設計模型

通道是產生者/取用者概念性程式設計模型的實作項目。 在這項程式設計模型中,產生者會以非同步方式產生資料,取用者也會以非同步方式取用該資料。 換句話說,此模型會透過先進先出 (“FIFO”) 佇列,將資料從一方傳遞至另一方。 您不妨將通道視為其他任何常見的泛型集合型別,例如 List<T>。 其中的主要差異在於,這項集合會透過處理站建立選項,管理同步並提供各種使用模型。 這些選項可控制通道的行為,例如允許儲存的元素數量,以及觸達該上限的處理方式,或通道是否可由多位產生者或取用者同時存取。

繫結策略

根據 Channel<T> 的建立方式,其讀者和作者會有不同的行為。

若要建立指定容量上限的通道,請呼叫 Channel.CreateBounded。 若要建立由任意數量的讀者和作者同時使用的通道,請呼叫 Channel.CreateUnbounded。 每個繫結策略都會分別公開各種建立者定義的選項,例如 BoundedChannelOptionsUnboundedChannelOptions

注意

無論繫結策略為何,通道都會在關閉後受到使用時一律擲回 ChannelClosedException

未繫結的通道

若要建立未繫結的通道,請呼叫任意 Channel.CreateUnbounded 多載:

var channel = Channel.CreateUnbounded<T>();

建立未繫結的通道時,根據預設,該通道可由任意數量的讀者和作者同時使用。 或者,您可提供 UnboundedChannelOptions 執行個體,在建立未繫結通道時指定非預設行為。 該通道的容量未繫結,因此所有寫入都會同步執行。 如需其他範例,請參閱未繫結的建立模式

繫結的通道

若要建立繫結的通道,請呼叫任意 Channel.CreateBounded 多載:

var channel = Channel.CreateBounded<T>(7);

上述程式碼會建立容量上限為 7 個項目的通道。 建立繫結的通道時,該通道會繫結至容量上限。 觸達繫結時,系統的預設行為是讓通道以非同步方式封鎖產生者,直到出現可用空間為止。 您可以在建立通道時指定選項來設定此行為。 繫結的通道可使用大於零的任何容量值建立。 如需其他範例,請參閱繫結的建立模式

完整模式行為

使用繫結的通道時,您可指定觸達設定的繫結時,通道要遵守的行為。 下表列出每個 BoundedChannelFullMode 值的完整模式行為:

行為
BoundedChannelFullMode.Wait 這是預設值。 對 WriteAsync 的呼叫會等候空間變為可用以完成寫入作業。 對 TryWrite 的呼叫會立即傳回 false
BoundedChannelFullMode.DropNewest 移除並忽略通道中最新的項目,以騰出空間給要寫入的項目。
BoundedChannelFullMode.DropOldest 移除並忽略通道中最舊的項目,以騰出空間給要寫入的項目。
BoundedChannelFullMode.DropWrite 捨棄正在寫入的項目。

重要

只要 Channel<TWrite,TRead>.Writer 產生的速度超過 Channel<TWrite,TRead>.Reader 使用的速度,通道的作者就會經歷背壓。

產生者 API

產生者功能會在 Channel<TWrite,TRead>.Writer 上公開。 下表詳述產生者 API 和預期行為:

API 預期的行為
ChannelWriter<T>.Complete 將通道標示為完成,表示不會再有任何項目寫入其中。
ChannelWriter<T>.TryComplete 嘗試將通道標示為完成,代表不會再有任何資料寫入其中。
ChannelWriter<T>.TryWrite 嘗試將指定項目寫入通道。 與未繫結通道搭配使用時,這會一律傳回 true,除非通道作者用 ChannelWriter<T>.CompleteChannelWriter<T>.TryComplete 表示完成。
ChannelWriter<T>.WaitToWriteAsync 傳回空間可供寫入項目時將會完成的 ValueTask<TResult>
ChannelWriter<T>.WriteAsync 以非同步方式將項目寫入至通道。

取用者 API

取用者功能會在 Channel<TWrite,TRead>.Reader 上公開。 下表詳述取用者 API 和預期行為:

API 預期的行為
ChannelReader<T>.ReadAllAsync 建立可讓您讀取通道中所有資料的 IAsyncEnumerable<T>
ChannelReader<T>.ReadAsync 以非同步方式從通道讀取項目。
ChannelReader<T>.TryPeek 嘗試從通道瞄核項目。
ChannelReader<T>.TryRead 嘗試從通道讀取項目。
ChannelReader<T>.WaitToReadAsync 傳回資料可供讀取時將會完成的 ValueTask<TResult>

常見使用模式

通道的使用模式有很多。 API 的設計初衷,就是盡可能簡單、一致並具備彈性。 所有非同步方法都會傳回代表輕量型非同步作業的 ValueTask (或 ValueTask<bool>),這些作業可在作業同步完成 (或甚至非同步完成時) 避免配置。 此外,API 也採用可組合設計,供通道建立者承諾預定的使用方式。 用特定參數建立通道時,內部實作因了解這些承諾,因此能更有效率地運作。

建立模式

假設您要為全球定位系統 (GPS) 建立生產者/取用者解決方案。 您想在一段時間內追蹤某個裝置的座標。 座標物件的範例可能如下所示:

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

未繫結的建立模式

一個常見的使用模式,是建立預設未繫結通道:

var channel = Channel.CreateUnbounded<Coordinates>();

不過,與其如此,不如假設您想建立有多位生產者和取用者的未繫結通道:

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

若是如此,所有寫入都會同步,WriteAsync 也不例外。 這是因為未繫結的通道隨時有空間可立即有效寫入。 不過,AllowSynchronousContinuations 設為 true 時,寫入最後可能會執行其接續項目,來處理與讀者有關的作業。 這不會影響作業的同步性。

繫結的建立模式

使用繫結的通道時,應將通道的設定功能告知取用者,確保使用方式正確。 換言之,取用者應了解觸達設定的繫結時通道會有的行為。 現在讓我們來探討幾個常見的繫結建立模式。

建立繫結的通道最簡單的方式,就是指定容量:

var channel = Channel.CreateBounded<Coordinates>(1);

上述程式碼建立了容量上限為 1 的繫結通道。 另外還有其他選項可供使用,有些選項與未繫結通道相同,有些則僅供未繫結通道使用:

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

在上述程式碼中,通道建立為上限 1,000 個項目的繫結通道,可具備單一作者和多位讀者。 其完整模式行為定義為 DropWrite,意即通道已滿後會卸除正在寫入的項目。

若要觀察使用繫結的通道時卸除的項目,請註冊 itemDropped 回撥:

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

若在通道已滿後新增項目,則系統會叫用 itemDropped 回撥。 在此範例中,所提供的回撥將項目寫入主控台,您仍可採取其他希望的動作。

產生者模式

假設此情節中的產生者正在為通道寫入新座標。 產生者可透過呼叫 TryWrite 來執行動作:

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

上述產生者程式碼:

  • 接受 Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) 為引數,並同時接受初始 Coordinates
  • 定義有條件的 while 迴圈,其試圖用 TryWrite 移動座標。

其他產生者可能使用 WriteAsync方法:

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

Channel<Coordinates>.Writer 再次用於 while 迴圈, 但是這一次系統呼叫了 WriteAsync 方法。 只有在寫入座標後,這項方法才會繼續。 while 迴圈結束時,系統呼叫了 Complete,表示不會再有資料寫入通道。

另一個產生者模式是使用 WaitToWriteAsync 方法。請考量下列程式碼:

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

作為有條件的 while 的一部分,系統用 WaitToWriteAsync 呼叫的結果來判斷是否要繼續迴圈。

取用者模式

常見的通道取用者模式有很多。 如果通道不會結束,意即該通道會無限產生資料,則取用者可使用 while (true) 迴圈,並在資料可用時讀取之:

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

注意

如果通道關閉,則此程式碼會擲回例外狀況。

另一位取用者可在迴圈期間使用巢狀避免此疑慮,如以下程式碼所示:

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

在上述程式碼中,取用者等候以讀取資料。 資料可用後,取用者便試圖讀取。 這些迴圈會繼續評估,直到通道產生者表示其不再需要讀取資料為止。 雖是如此,如果產生者已知具備要產生的有限項目數量卻表示完成,則取用者可用 await foreach 語意來逐一查看項目:

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

上述程式碼使用 ReadAllAsync 方法讀取所有來自通道的座標。

另請參閱