System.Threading.Channels 程式庫
System.Threading.Channels 命名空間提供一組同步資料結構,用來在生產者和消費者之間以非同步方式傳遞資料。 該程式庫鎖定 .NET Standard 且可在所有 .NET 實作中使用。
此程式庫可於 System.Threading.Channels NuGet 套件中使用。 不過,如果您使用 .NET Core 3.0 或更新版本,套件會包含在架構中。
產生者/取用者概念性程式設計模型
通道是產生者/取用者概念性程式設計模型的實作項目。 在這項程式設計模型中,產生者會以非同步方式產生資料,取用者也會以非同步方式取用該資料。 換句話說,此模型會透過先進先出 (“FIFO”) 佇列,將資料從一方傳遞至另一方。 您不妨將通道視為其他任何常見的泛型集合型別,例如 List<T>
。 其中的主要差異在於,這項集合會透過處理站建立選項,管理同步並提供各種使用模型。 這些選項可控制通道的行為,例如允許儲存的元素數量,以及觸達該上限的處理方式,或通道是否可由多位產生者或取用者同時存取。
繫結策略
根據 Channel<T>
的建立方式,其讀者和作者會有不同的行為。
若要建立指定容量上限的通道,請呼叫 Channel.CreateBounded。 若要建立由任意數量的讀者和作者同時使用的通道,請呼叫 Channel.CreateUnbounded。 每個繫結策略都會分別公開各種建立者定義的選項,例如 BoundedChannelOptions 或 UnboundedChannelOptions。
注意
無論繫結策略為何,通道都會在關閉後受到使用時一律擲回 ChannelClosedException。
未繫結的通道
若要建立未繫結的通道,請呼叫任意 Channel.CreateUnbounded 多載:
var channel = Channel.CreateUnbounded<T>();
建立未繫結的通道時,根據預設,該通道可由任意數量的讀者和作者同時使用。 或者,您可提供 UnboundedChannelOptions
執行個體,在建立未繫結通道時指定非預設行為。 該通道的容量未繫結,因此所有寫入都會同步執行。 如需其他範例,請參閱未繫結的建立模式。
繫結的通道
若要建立繫結的通道,請呼叫任意 Channel.CreateBounded 多載:
var channel = Channel.CreateBounded<T>(7);
上述程式碼會建立容量上限為 7
個項目的通道。 建立繫結的通道時,該通道會繫結至容量上限。 觸達繫結時,系統的預設行為是讓通道以非同步方式封鎖產生者,直到出現可用空間為止。 您可以在建立通道時指定選項來設定此行為。 繫結的通道可使用大於零的任何容量值建立。 如需其他範例,請參閱繫結的建立模式。
完整模式行為
使用繫結的通道時,您可指定觸達設定的繫結時,通道要遵守的行為。 下表列出每個 BoundedChannelFullMode 值的完整模式行為:
值 | 行為 |
---|---|
BoundedChannelFullMode.Wait | 這是預設值。 對 WriteAsync 的呼叫會等候空間變為可用以完成寫入作業。 對 TryWrite 的呼叫會立即傳回 false 。 |
BoundedChannelFullMode.DropNewest | 移除並忽略通道中最新的項目,以騰出空間給要寫入的項目。 |
BoundedChannelFullMode.DropOldest | 移除並忽略通道中最舊的項目,以騰出空間給要寫入的項目。 |
BoundedChannelFullMode.DropWrite | 捨棄正在寫入的項目。 |
重要
只要 Channel<TWrite,TRead>.Writer 產生的速度超過 Channel<TWrite,TRead>.Reader 使用的速度,通道的作者就會經歷背壓。
產生者 API
產生者功能會在 Channel<TWrite,TRead>.Writer 上公開。 下表詳述產生者 API 和預期行為:
API | 預期的行為 |
---|---|
ChannelWriter<T>.Complete | 將通道標示為完成,表示不會再有任何項目寫入其中。 |
ChannelWriter<T>.TryComplete | 嘗試將通道標示為完成,代表不會再有任何資料寫入其中。 |
ChannelWriter<T>.TryWrite | 嘗試將指定項目寫入通道。 與未繫結通道搭配使用時,這會一律傳回 true ,除非通道作者用 ChannelWriter<T>.Complete 或 ChannelWriter<T>.TryComplete 表示完成。 |
ChannelWriter<T>.WaitToWriteAsync | 傳回空間可供寫入項目時將會完成的 ValueTask<TResult>。 |
ChannelWriter<T>.WriteAsync | 以非同步方式將項目寫入至通道。 |
取用者 API
取用者功能會在 Channel<TWrite,TRead>.Reader 上公開。 下表詳述取用者 API 和預期行為:
API | 預期的行為 |
---|---|
ChannelReader<T>.ReadAllAsync | 建立可讓您讀取通道中所有資料的 IAsyncEnumerable<T>。 |
ChannelReader<T>.ReadAsync | 以非同步方式從通道讀取項目。 |
ChannelReader<T>.TryPeek | 嘗試從通道瞄核項目。 |
ChannelReader<T>.TryRead | 嘗試從通道讀取項目。 |
ChannelReader<T>.WaitToReadAsync | 傳回資料可供讀取時將會完成的 ValueTask<TResult>。 |
常見使用模式
通道的使用模式有很多。 API 的設計初衷,就是盡可能簡單、一致並具備彈性。 所有非同步方法都會傳回代表輕量型非同步作業的 ValueTask
(或 ValueTask<bool>
),這些作業可在作業同步完成 (或甚至非同步完成時) 避免配置。 此外,API 也採用可組合設計,供通道建立者承諾預定的使用方式。 用特定參數建立通道時,內部實作因了解這些承諾,因此能更有效率地運作。
建立模式
假設您要為全球定位系統 (GPS) 建立生產者/取用者解決方案。 您想在一段時間內追蹤某個裝置的座標。 座標物件的範例可能如下所示:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
未繫結的建立模式
一個常見的使用模式,是建立預設未繫結通道:
var channel = Channel.CreateUnbounded<Coordinates>();
不過,與其如此,不如假設您想建立有多位生產者和取用者的未繫結通道:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
若是如此,所有寫入都會同步,WriteAsync
也不例外。 這是因為未繫結的通道隨時有空間可立即有效寫入。 不過,AllowSynchronousContinuations
設為 true
時,寫入最後可能會執行其接續項目,來處理與讀者有關的作業。 這不會影響作業的同步性。
繫結的建立模式
使用繫結的通道時,應將通道的設定功能告知取用者,確保使用方式正確。 換言之,取用者應了解觸達設定的繫結時通道會有的行為。 現在讓我們來探討幾個常見的繫結建立模式。
建立繫結的通道最簡單的方式,就是指定容量:
var channel = Channel.CreateBounded<Coordinates>(1);
上述程式碼建立了容量上限為 1
的繫結通道。 另外還有其他選項可供使用,有些選項與未繫結通道相同,有些則僅供未繫結通道使用:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
在上述程式碼中,通道建立為上限 1,000 個項目的繫結通道,可具備單一作者和多位讀者。 其完整模式行為定義為 DropWrite
,意即通道已滿後會卸除正在寫入的項目。
若要觀察使用繫結的通道時卸除的項目,請註冊 itemDropped
回撥:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
若在通道已滿後新增項目,則系統會叫用 itemDropped
回撥。 在此範例中,所提供的回撥將項目寫入主控台,您仍可採取其他希望的動作。
產生者模式
假設此情節中的產生者正在為通道寫入新座標。 產生者可透過呼叫 TryWrite 來執行動作:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
上述產生者程式碼:
- 接受
Channel<Coordinates>.Writer
(ChannelWriter<Coordinates>
) 為引數,並同時接受初始Coordinates
。 - 定義有條件的
while
迴圈,其試圖用TryWrite
移動座標。
其他產生者可能使用 WriteAsync
方法:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Channel<Coordinates>.Writer
再次用於 while
迴圈, 但是這一次系統呼叫了 WriteAsync 方法。 只有在寫入座標後,這項方法才會繼續。 while
迴圈結束時,系統呼叫了 Complete,表示不會再有資料寫入通道。
另一個產生者模式是使用 WaitToWriteAsync 方法。請考量下列程式碼:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
作為有條件的 while
的一部分,系統用 WaitToWriteAsync
呼叫的結果來判斷是否要繼續迴圈。
取用者模式
常見的通道取用者模式有很多。 如果通道不會結束,意即該通道會無限產生資料,則取用者可使用 while (true)
迴圈,並在資料可用時讀取之:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
注意
如果通道關閉,則此程式碼會擲回例外狀況。
另一位取用者可在迴圈期間使用巢狀避免此疑慮,如以下程式碼所示:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
在上述程式碼中,取用者等候以讀取資料。 資料可用後,取用者便試圖讀取。 這些迴圈會繼續評估,直到通道產生者表示其不再需要讀取資料為止。 雖是如此,如果產生者已知具備要產生的有限項目數量卻表示完成,則取用者可用 await foreach
語意來逐一查看項目:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
上述程式碼使用 ReadAllAsync 方法讀取所有來自通道的座標。