Knihovna System.Threading.Channels
Obor System.Threading.Channels názvů poskytuje sadu synchronizačních datových struktur pro předávání dat mezi producenty a příjemci asynchronně. Knihovna cílí na .NET Standard a funguje na všech implementacích .NET.
Tato knihovna je k dispozici v balíčku NuGet System.Threading.Channels . Pokud ale používáte .NET Core 3.0 nebo novější, balíček je součástí architektury.
Koncepční programovací model producenta/příjemce
Kanály představují implementaci konceptuálního programovacího modelu producenta/příjemce. V tomto programovacím modelu producenti asynchronně vytvářejí data a spotřebitelé asynchronně tato data spotřebovávají. Jinými slovy, tento model předává data z jedné strany do jiné prostřednictvím fronty FIFO (first-in first-out). Zkuste si představit kanály jako jakýkoli jiný běžný obecný typ kolekce, například List<T>
. Hlavním rozdílem je, že tato kolekce spravuje synchronizaci a poskytuje různé modely spotřeby prostřednictvím možností vytváření továrny. Tyto možnosti řídí chování kanálů, například počet prvků, které mají povoleno ukládat, a co se stane, když dojde k dosažení tohoto limitu, nebo jestli je kanál současně přístupný více producenty nebo více spotřebitelů.
Strategie ohraničování
V závislosti na tom, jak se vytvoří Channel<T>
, se její čtenář a spisovatel chovají odlišně.
Chcete-li vytvořit kanál, který určuje maximální kapacitu, zavolejte Channel.CreateBounded. Chcete-li vytvořit kanál, který je používán libovolným počtem čtenářů a zapisovačů souběžně, zavolejte Channel.CreateUnbounded. Každá strategie ohraničující zpřístupňuje různé možnosti definované tvůrcem, a to buď BoundedChannelOptions nebo UnboundedChannelOptions v uvedeném pořadí.
Poznámka:
Bez ohledu na ohraničující strategii kanál vždy vyvolá ChannelClosedException , když se použije po zavření.
Nevázané kanály
Pokud chcete vytvořit nevázaný kanál, zavolejte jedno z Channel.CreateUnbounded přetížení:
var channel = Channel.CreateUnbounded<T>();
Když ve výchozím nastavení vytvoříte nevázaný kanál, může kanál používat libovolný počet čtenářů a zapisovačů současně. Alternativně můžete určit nestandardní chování při vytváření nevázaného kanálu poskytnutím UnboundedChannelOptions
instance. Kapacita kanálu je nevázaná a všechny zápisy se provádějí synchronně. Další příklady najdete v tématu Nevázané vzory vytváření.
Ohraničené kanály
Pokud chcete vytvořit ohraničený kanál, zavolejte jedno z Channel.CreateBounded přetížení:
var channel = Channel.CreateBounded<T>(7);
Předchozí kód vytvoří kanál, který má maximální kapacitu 7
položek. Když vytvoříte ohraničený kanál, je kanál vázán na maximální kapacitu. Po dosažení vazby je výchozím chováním, že kanál asynchronně blokuje producenta, dokud nebude k dispozici mezera. Toto chování můžete nakonfigurovat zadáním možnosti při vytváření kanálu. Vázané kanály je možné vytvořit s libovolnou hodnotou kapacity větší než nula. Další příklady naleznete v tématu Ohraničené vzory vytváření.
Chování režimu v plném režimu
Při použití vázaného kanálu můžete určit chování, které kanál dodržuje při dosažení nakonfigurované vazby. Následující tabulka uvádí chování celého režimu pro každou BoundedChannelFullMode hodnotu:
Hodnota | Chování |
---|---|
BoundedChannelFullMode.Wait | Tato hodnota je výchozí. Volání, která WriteAsync mají čekat, až bude k dispozici místo pro dokončení operace zápisu. Volání, která se mají TryWrite okamžitě vrátit false . |
BoundedChannelFullMode.DropNewest | Odebere a ignoruje nejnovější položku v kanálu, aby se uvolnilo místo pro zapisování položky. |
BoundedChannelFullMode.DropOldest | Odebere a ignoruje nejstarší položku v kanálu, aby byla místo pro zápis položky. |
BoundedChannelFullMode.DropWrite | Zahodí položku, která se zapisuje. |
Důležité
Pokaždé, Channel<TWrite,TRead>.Writer když je produkce rychlejší, než Channel<TWrite,TRead>.Reader může spotřebovávat, zapisovač kanálu zaznamená zpětný tlak.
Rozhraní API pro producenty
Funkce producenta je vystavena Channel<TWrite,TRead>.Writerna . Rozhraní API producenta a očekávané chování jsou podrobně popsány v následující tabulce:
rozhraní API | Očekávané chování |
---|---|
ChannelWriter<T>.Complete | Označí kanál jako dokončený, což znamená, že do něj nejsou zapsány žádné další položky. |
ChannelWriter<T>.TryComplete | Pokusí se kanál označit jako dokončený, což znamená, že se do něj nezapisují žádná další data. |
ChannelWriter<T>.TryWrite | Pokusí se napsat zadanou položku do kanálu. Pokud se používá s nevázaným kanálem, vrátí se to vždy true , pokud zapisovač kanálu signalizuje dokončení buď ChannelWriter<T>.Complete, nebo ChannelWriter<T>.TryComplete. |
ChannelWriter<T>.WaitToWriteAsync | ValueTask<TResult> Vrátí hodnotu, která se dokončí, když je k dispozici mezera pro zápis položky. |
ChannelWriter<T>.WriteAsync | Asynchronně zapíše položku do kanálu. |
Rozhraní API příjemců
Funkce spotřebitele je zpřístupněna na Channel<TWrite,TRead>.Readerzařízení . Rozhraní API pro spotřebitele a očekávané chování jsou podrobně popsány v následující tabulce:
rozhraní API | Očekávané chování |
---|---|
ChannelReader<T>.ReadAllAsync | Vytvoří, IAsyncEnumerable<T> která umožňuje čtení všech dat z kanálu. |
ChannelReader<T>.ReadAsync | Asynchronně čte položku z kanálu. |
ChannelReader<T>.TryPeek | Pokusí se nahlédnout na položku z kanálu. |
ChannelReader<T>.TryRead | Pokusí se přečíst položku z kanálu. |
ChannelReader<T>.WaitToReadAsync | ValueTask<TResult> Vrátí hodnotu, která se dokončí, když jsou data k dispozici ke čtení. |
Běžné vzory použití
Pro kanály existuje několik vzorů použití. Rozhraní API je navržené tak, aby bylo co nejjednodušší, konzistentní a co nejflexibilnější. Všechny asynchronní metody vrací ValueTask
(nebo ValueTask<bool>
) jednoduchou asynchronní operaci, která se může vyhnout přidělování, pokud se operace dokončí synchronně a potenciálně i asynchronně. Kromě toho je rozhraní API navržené tak, aby bylo možné sestavit v tom, že tvůrce kanálu slíbí o zamýšleném použití. Při vytváření kanálu s určitými parametry může interní implementace pracovat efektivněji s vědomím těchto příslibů.
Vzory vytváření
Představte si, že vytváříte řešení producenta/spotřebitele pro globální systém umístění (GPS). Chcete sledovat souřadnice zařízení v průběhu času. Ukázkový objekt souřadnic může vypadat takto:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Nevázané vzory vytváření
Jedním z běžných vzorů použití je vytvoření výchozího nevázaného kanálu:
var channel = Channel.CreateUnbounded<Coordinates>();
Pojďme si ale představit, že chcete vytvořit nevázaný kanál s více producenty a spotřebiteli:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
V tomto případě jsou všechny zápisy synchronní, dokonce i WriteAsync
. Je to proto, že nevázaný kanál má vždy k dispozici prostor pro zápis okamžitě. Pokud AllowSynchronousContinuations
je však nastavena hodnota true
, zápisy mohou skončit dělat práci přidruženou ke čtenáři spuštěním jejich pokračování. To nemá vliv na synchronicity operace.
Ohraničené vzory vytváření
U ohraničených kanálů by měla být konfigurovatelnost kanálu známa příjemci, aby se zajistila správná spotřeba. To znamená, že příjemce by měl vědět, jaké chování kanál vykazuje při dosažení nakonfigurované vazby. Pojďme se podívat na některé běžné vázané vzory vytváření.
Nejjednodušší způsob, jak vytvořit ohraničený kanál, je zadat kapacitu:
var channel = Channel.CreateBounded<Coordinates>(1);
Předchozí kód vytvoří ohraničený kanál s maximální kapacitou 1
. K dispozici jsou další možnosti, některé možnosti jsou stejné jako nevázaný kanál, zatímco jiné jsou specifické pro nevázané kanály:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
V předchozím kódu se kanál vytvoří jako ohraničený kanál, který je omezen na 1 000 položek, s jedním spisovatelem, ale mnoha čtenáři. Jeho chování v plném režimu je definováno jako DropWrite
, což znamená, že zahodí položku, která je zapsána, pokud je kanál plný.
Pokud chcete sledovat položky, které se zahodí při použití ohraničených kanálů, zaregistrujte itemDropped
zpětné volání:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Pokaždé, když je kanál plný a přidá se nová položka, itemDropped
vyvolá se zpětné volání. V tomto příkladu zapíše poskytnutá zpětná volání položku do konzoly, ale můžete provést jakoukoli jinou akci, kterou chcete.
Vzory producenta
Představte si, že producent v tomto scénáři píše do kanálu nové souřadnice. Producent to může provést voláním TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Předchozí kód producenta:
Channel<Coordinates>.Writer
Přijme (ChannelWriter<Coordinates>
) jako argument spolu s počátečnímCoordinates
argumentem .- Definuje podmíněnou
while
smyčku, která se pokusí přesunout souřadnice pomocíTryWrite
.
Alternativní producent může použít metodu WriteAsync
:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Opět se Channel<Coordinates>.Writer
používá ve smyčce while
. WriteAsync Tentokrát je však volána metoda. Metoda bude pokračovat až po napsání souřadnic. Když smyčka while
skončí, provede se volání Complete , které signalizuje, že do kanálu nejsou zapsána žádná další data.
Dalším vzorem producenta WaitToWriteAsync je použití metody, zvažte následující kód:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
V rámci podmíněného volání while
se výsledek WaitToWriteAsync
volání používá k určení, zda se má smyčka pokračovat.
Vzory spotřebitelů
Existuje několik běžných vzorů příjemců kanálu. Když kanál nikdy nekončí, znamená to, že vytváří data po neomezenou dobu, uživatel může použít smyčku while (true)
a číst data, jakmile bude k dispozici:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Poznámka:
Tento kód vyvolá výjimku, pokud je kanál zavřený.
Alternativnímu příjemci by se tento problém mohl vyhnout použitím vnořené smyčky while, jak je znázorněno v následujícím kódu:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
V předchozím kódu příjemce čeká na čtení dat. Jakmile jsou data k dispozici, uživatel se je pokusí přečíst. Tyto smyčky se nadále vyhodnocují, dokud producent kanálu signály, že už nemá data ke čtení. Když je výrobce známý, že má konečný počet položek, které vytváří, a signalizuje dokončení, může spotřebitel použít await foreach
sémantiku k iteraci položek:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Předchozí kód používá metodu ReadAllAsync ke čtení všech souřadnic z kanálu.