Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Obor System.Threading.Channels názvů poskytuje sadu synchronizačních datových struktur pro předávání dat mezi producenty a příjemci asynchronně. Knihovna cílí na .NET Standard a funguje na všech implementacích .NET.
Tato knihovna je k dispozici v balíčku NuGet System.Threading.Channels . Pokud ale používáte .NET Core 3.0 nebo novější, balíček je součástí architektury.
Koncepční programovací model producenta/příjemce
Kanály představují implementaci konceptuálního programovacího modelu producenta/příjemce. V tomto programovacím modelu producenti asynchronně vytvářejí data a spotřebitelé asynchronně tato data spotřebovávají. Jinými slovy, tento model předává data mezi stranami prostřednictvím fronty typu FIFO (první dovnitř, první ven). Představte si kanály jako jakýkoli jiný běžný obecný typ kolekce, například List<T>. Hlavním rozdílem je, že tato kolekce spravuje synchronizaci a poskytuje různé modely spotřeby prostřednictvím možností vytváření továrny. Tyto možnosti řídí chování kanálů, například počet prvků, které mají povoleno ukládat, a co se stane, když dojde k dosažení tohoto limitu, nebo jestli je kanál současně přístupný více producenty nebo více spotřebitelů.
Strategie ohraničování
V závislosti na tom, jak se vytvoří Channel<T> , se její čtenář a spisovatel chovají odlišně.
Chcete-li vytvořit kanál, který určuje maximální kapacitu, zavolejte Channel.CreateBounded. Chcete-li vytvořit kanál, který je používán libovolným počtem čtenářů a zapisovačů souběžně, zavolejte Channel.CreateUnbounded. Každá strategie ohraničující nabízí různé možnosti definované tvůrcem, buď BoundedChannelOptions nebo UnboundedChannelOptions.
Poznámka:
Bez ohledu na ohraničující strategii kanál vždy vyvolá ChannelClosedException, když se použije po zavření.
Nevázané kanály
Pokud chcete vytvořit nevázaný kanál, zavolejte jedno z Channel.CreateUnbounded přetížení:
var channel = Channel.CreateUnbounded<T>();
Když ve výchozím nastavení vytvoříte nevázaný kanál, může kanál používat libovolný počet čtenářů a zapisovačů současně. Alternativně můžete určit nestandardní chování při vytváření nevázaného kanálu poskytnutím UnboundedChannelOptions instance. Kapacita kanálu je nevázaná a všechny zápisy se provádějí synchronně. Další příklady najdete v tématu Nevázané vzory vytváření.
Ohraničené kanály
Pokud chcete vytvořit ohraničený kanál, zavolejte jedno z Channel.CreateBounded přetížení:
var channel = Channel.CreateBounded<T>(7);
Předchozí kód vytvoří kanál, který má maximální kapacitu 7 položek. Když vytvoříte ohraničený kanál, je kanál vázán na maximální kapacitu. Po dosažení hranice je výchozím chováním, že kanál asynchronně blokuje producenta tak, že dokud nebude k dispozici volné místo. Toto chování můžete nakonfigurovat zadáním možnosti při vytváření kanálu. Vázané kanály je možné vytvořit s libovolnou hodnotou kapacity větší než nula. Další příklady naleznete v tématu Ohraničené vzory vytváření.
Chování plného režimu
Při použití vázaného kanálu můžete určit chování, které kanál dodržuje při dosažení nakonfigurované vazby. Následující tabulka uvádí chování celého režimu pro každou BoundedChannelFullMode hodnotu:
| Hodnota | Chování |
|---|---|
| BoundedChannelFullMode.Wait | Tato hodnota je výchozí. Volání na WriteAsync čekají, až bude dostupné místo, aby mohla být operace zápisu dokončena. Volání na TryWrite se false okamžitě vracejí. |
| BoundedChannelFullMode.DropNewest | Odebere a ignoruje nejnovější položku v kanálu, aby se uvolnilo místo pro zapisování položky. |
| BoundedChannelFullMode.DropOldest | Odebere a ignoruje nejstarší položku v kanálu, abyste uvolnili místo pro zápis nové položky. |
| BoundedChannelFullMode.DropWrite | Zahodí zapisovanou položku. |
Důležité
Kdykoli Channel<TWrite,TRead>.Writer produkuje rychleji, než Channel<TWrite,TRead>.Reader může spotřebovávat, zapisovač kanálu zaznamená zpětný tlak.
Rozhraní API pro producenty
Funkcionalita producenta je dostupná na Channel<TWrite,TRead>.Writer. Rozhraní API producenta a očekávané chování jsou podrobně popsány v následující tabulce:
| API | Očekávané chování |
|---|---|
| ChannelWriter<T>.Complete | Označí kanál jako dokončený, což znamená, že do něj nejsou zapsány žádné další položky. |
| ChannelWriter<T>.TryComplete | Pokusí se kanál označit jako dokončený, což znamená, že se do něj nezapisují žádná další data. |
| ChannelWriter<T>.TryWrite | Pokusí se napsat zadanou položku do kanálu. Pokud se používá s nevázaným kanálem, vždy se vrátí true, pokud zapisovač kanálu nesignalizuje dokončení buď ChannelWriter<T>.Complete nebo ChannelWriter<T>.TryComplete. |
| ChannelWriter<T>.WaitToWriteAsync | ValueTask<TResult> Vrátí hodnotu, která se dokončí, když je k dispozici mezera pro zápis položky. |
| ChannelWriter<T>.WriteAsync | Asynchronně zapíše položku do kanálu. |
Rozhraní API pro spotřebitele
Funkce pro uživatele je zpřístupněna na Channel<TWrite,TRead>.Reader. Rozhraní API pro spotřebitele a očekávané chování jsou podrobně popsány v následující tabulce:
| API | Očekávané chování |
|---|---|
| ChannelReader<T>.ReadAllAsync | Vytvoří IAsyncEnumerable<T>, které umožňuje načítat všechna data z kanálu. |
| ChannelReader<T>.ReadAsync | Asynchronně čte položku z kanálu. |
| ChannelReader<T>.TryPeek | Pokusí se nahlédnout na položku z kanálu. |
| ChannelReader<T>.TryRead | Pokusí se přečíst položku z kanálu. |
| ChannelReader<T>.WaitToReadAsync | ValueTask<TResult> Vrátí hodnotu, která se dokončí, když jsou data k dispozici ke čtení. |
Běžné vzory použití
Pro kanály existuje několik vzorů použití. Rozhraní API je navržené tak, aby bylo co nejjednodušší, konzistentní a co nejflexibilnější. Všechny asynchronní metody vrací ValueTask (nebo ValueTask<bool>) jednoduchou asynchronní operaci, která se může vyhnout přidělování, pokud se operace dokončí synchronně a potenciálně i asynchronně. Kromě toho je rozhraní API navrženo tak, aby bylo modulární, přičemž tvůrce kanálu dává záruky ohledně jeho zamýšleného použití. Při vytváření kanálu s určitými parametry může interní implementace pracovat efektivněji s vědomím těchto příslibů.
Vzory tvorby
Představte si, že vytváříte řešení producenta/spotřebitele pro globální systém umístění (GPS). Chcete sledovat souřadnice zařízení v průběhu času. Ukázkový objekt souřadnic může vypadat takto:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Nevázané vzory vytváření
Jedním z běžných vzorů použití je vytvoření výchozího nevázaného kanálu:
var channel = Channel.CreateUnbounded<Coordinates>();
Pojďme si ale představit, že chcete vytvořit nevázaný kanál s více producenty a spotřebiteli:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
V tomto případě jsou všechny zápisy synchronní, dokonce i WriteAsync. Je to proto, že nevázaný kanál má vždy k dispozici prostor pro zápis okamžitě. Ovšem s AllowSynchronousContinuations nastaveným na true mohou zápisy končit tím, že dělají práci spojenou se čtenářem vykonáním jejich pokračování. To nemá vliv na synchronicity operace.
Ohraničené vzory vytváření
U ohraničených kanálů by měla být konfigurovatelnost kanálu známa příjemci, aby se zajistila správná spotřeba. To znamená, že uživatel by měl vědět, jaké chování kanál vykazuje při dosažení nakonfigurované meze. Pojďme se podívat na některé běžné vázané vzory vytváření.
Nejjednodušší způsob, jak vytvořit ohraničený kanál, je zadat kapacitu:
var channel = Channel.CreateBounded<Coordinates>(1);
Předchozí kód vytvoří ohraničený kanál s maximální kapacitou 1. K dispozici jsou další možnosti, některé možnosti jsou stejné jako nevázaný kanál, zatímco jiné jsou specifické pro nevázané kanály:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
V předchozím kódu se kanál vytvoří jako ohraničený kanál, který je omezen na 1 000 položek, s jedním spisovatelem, ale mnoha čtenáři. Jeho chování v plném režimu je definováno jako DropWrite, což znamená, že zahodí položku, která je zapsána, pokud je kanál plný.
Pokud chcete sledovat položky, které se zahodí při použití ohraničených kanálů, zaregistrujte itemDropped zpětné volání:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
Pokaždé, když je kanál plný a přidá se nová položka, itemDropped vyvolá se zpětné volání. V tomto příkladu zapíše poskytnutá callback funkce položku do konzole, ale můžete provést jakoukoli jinou akci, kterou chcete.
Vzory výrobce
Představte si, že producent v tomto scénáři píše do kanálu nové souřadnice. Producent to může provést voláním TryWrite:
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Předchozí kód producenta:
-
Channel<Coordinates>.Writerpřijme (ChannelWriter<Coordinates>) jako argument spolu s počátečnímCoordinates. - Definuje podmíněnou
whilesmyčku, která se pokusí přesunout souřadnice pomocíTryWrite.
Alternativní producent může použít metodu WriteAsync :
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Opět se Channel<Coordinates>.Writer používá ve smyčce while . Tentokrát je však volána metoda WriteAsync. Metoda pokračuje až po napsání souřadnic. Když smyčka while skončí, provede se volání Complete , které signalizuje, že do kanálu nejsou zapsána žádná další data.
Dalším vzorem producenta je použití metody WaitToWriteAsync. Prohlédněte si následující kód:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
V rámci podmíněného while se výsledek WaitToWriteAsync volání používá k určení, zda má smyčka pokračovat.
Vzory spotřebitelů
Existuje několik běžných vzorů příjemců kanálu. Když kanál nikdy nekončí, znamená to, že vytváří data po neomezenou dobu, uživatel může použít smyčku while (true) a číst data, jakmile bude k dispozici:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Poznámka:
Tento kód vyvolá výjimku, pokud je kanál zavřený.
Alternativnímu uživateli by se tento problém mohl vyhnout použitím vnořené smyčky while, jak je znázorněno v následujícím kódu:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
V předchozím kódu příjemce čeká na čtení dat. Jakmile jsou data k dispozici, uživatel se je pokusí přečíst. Tyto smyčky se nadále vyhodnocují, dokud výrobce kanálu nesignalizuje, že už nemá data k přečtení. S tím řečeno, když je známo, že výrobce má konečný počet položek, které vytváří, a signalizuje dokončení, může spotřebitel použít sémantiku await foreach k iteraci položek.
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Předchozí kód používá metodu ReadAllAsync ke čtení všech souřadnic z kanálu.