Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Пространство System.Threading.Channels имен предоставляет набор структур данных синхронизации для асинхронной передачи данных между производителями и потребителями. Библиотека предназначена для .NET Standard и работает на всех реализациях .NET.
Эта библиотека доступна в пакете System.Threading.Channels NuGet. Однако если вы используете .NET Core 3.0 или более поздней версии, пакет входит в состав платформы.
Концептуальная модель программирования "производитель/потребитель"
Каналы — это реализация концептуальной модели программирования производителя или потребителя. В этой модели программирования производители асинхронно создают данные, а потребители асинхронно используют эти данные. Другими словами, эта модель передает данные из одной стороны в другую через очередь по принципу первое поступление — первое обслуживание (FIFO). Думайте о каналах как о любом другом универсальном типе коллекции, например List<T>. Основное различие заключается в том, что эта коллекция управляет синхронизацией и предоставляет различные модели потребления через опции создания фабрики. Эти параметры управляют поведением каналов, таких как количество элементов, которые они могут хранить, и что происходит, если достигнуто это ограничение, или же доступ к каналу осуществляется несколькими производителями или несколькими потребителями одновременно.
Ограничивающие стратегии
В зависимости от способа создания Channel<T>, его считыватель и записывающее устройство ведут себя иначе.
Чтобы создать канал, указывающий максимальную емкость, вызовите Channel.CreateBounded. Чтобы создать канал, который используется любым количеством читателей и писателей одновременно, вызовите Channel.CreateUnbounded. Каждая ограничивающая стратегия предоставляет различные параметры, определенные создателем, или BoundedChannelOptions, либо UnboundedChannelOptions соответственно.
Примечание.
Независимо от стратегии ограничения, канал всегда выбрасывает ChannelClosedException исключение, когда канал используется после закрытия.
Несвязанные каналы
Чтобы создать несвязанный канал, вызовите одну из Channel.CreateUnbounded перегрузок:
var channel = Channel.CreateUnbounded<T>();
При создании необвязанного канала по умолчанию канал может использоваться любым количеством читателей и писателей одновременно. Кроме того, при создании неограниченного канала можно указать поведение не по умолчанию, предоставив UnboundedChannelOptions экземпляр. Емкость канала не связана, и все операции записи выполняются синхронно. Дополнительные примеры см. в разделе "Неподвязанные шаблоны создания".
Ограниченные каналы
Чтобы создать ограниченный канал, вызовите одну из Channel.CreateBounded перегрузок:
var channel = Channel.CreateBounded<T>(7);
Предыдущий код создает канал с максимальной емкостью 7 элементов. При создании ограниченного канала он ограничен максимальной емкостью. Когда граница достигнута, поведение по умолчанию заключается в том, что канал асинхронно блокирует продюсера, пока пространство не станет доступным. Это поведение можно настроить, указав параметр при создании канала. Ограниченные каналы можно создать с любым значением емкости, превышающим нулю. Другие примеры см. в разделе "Ограничивающие шаблоны создания".
Поведение полного режима
При использовании ограниченного канала можно указать, как канал будет себя вести при достижении установленного предела. В следующей таблице перечислено поведение в полном режиме для каждого значения BoundedChannelFullMode.
| Значение | Поведение |
|---|---|
| BoundedChannelFullMode.Wait | Это значение по умолчанию. Вызов WriteAsync ожидает доступность пространства для завершения операции записи. Вызовы TryWrite возвращаются false немедленно. |
| BoundedChannelFullMode.DropNewest | Удаляет и игнорирует самый новый элемент в канале, чтобы освободить место для записываемого элемента. |
| BoundedChannelFullMode.DropOldest | Удаляет и игнорирует самый старый элемент в канале, чтобы освободить место для записываемого элемента. |
| BoundedChannelFullMode.DropWrite | Удаляет записываемый элемент. |
Внимание
Когда Channel<TWrite,TRead>.Writer производит быстрее, чем Channel<TWrite,TRead>.Reader может потребить, отправитель каналов испытывает обратное давление.
API-интерфейсы производителя
Функциональность продюсера доступна на Channel<TWrite,TRead>.Writer. API-интерфейсы производителя и ожидаемое поведение подробно описаны в следующей таблице:
| API (Интерфейс программирования приложений) | Ожидаемое поведение |
|---|---|
| ChannelWriter<T>.Complete | Помечает канал как завершенный, то есть больше элементов не записываются в него. |
| ChannelWriter<T>.TryComplete | Пытается пометить канал как завершенный, то есть больше данных не записывается в него. |
| ChannelWriter<T>.TryWrite | Предпринимает попытку записать указанный элемент в канал. При использовании с неограниченным каналом это всегда возвращает true, если только писатель канала не сигнализирует о завершении с помощью либо ChannelWriter<T>.Complete, либо ChannelWriter<T>.TryComplete. |
| ChannelWriter<T>.WaitToWriteAsync | Возвращает значение ValueTask<TResult> , которое завершается, когда пространство доступно для записи элемента. |
| ChannelWriter<T>.WriteAsync | Асинхронно записывает элемент в канал. |
Потребительские API
Функциональность потребителя предоставляется в объекте Channel<TWrite,TRead>.Reader. API-интерфейсы потребителей и ожидаемое поведение подробно описаны в следующей таблице:
| API (Интерфейс программирования приложений) | Ожидаемое поведение |
|---|---|
| ChannelReader<T>.ReadAllAsync | Создает интерфейс IAsyncEnumerable<T>, позволяющий считывать все данные из канала. |
| ChannelReader<T>.ReadAsync | Асинхронно считывает элемент из канала. |
| ChannelReader<T>.TryPeek | Пытается просмотреть элемент из канала. |
| ChannelReader<T>.TryRead | Пытается прочитать элемент из канала. |
| ChannelReader<T>.WaitToReadAsync | Возвращает ValueTask<TResult>, который завершается, когда данные становятся доступными для чтения. |
Общие варианты использования
Существует несколько шаблонов использования для каналов. API разработан так, чтобы он был простым, согласованным и как можно более гибким. Все асинхронные методы возвращают ( ValueTask или ValueTask<bool>), представляющие упрощенную асинхронную операцию, которая может избежать выделения, если операция завершается синхронно и потенциально даже асинхронно. Кроме того, API предназначен для создания, в том, что создатель канала дает обещания о его предполагаемом использовании. При создании канала с определенными параметрами внутренняя реализация может работать более эффективно, зная эти обещания.
Шаблоны создания
Представьте, что вы создаете решение производителя или потребителя для глобальной системы позиций (GPS). Вы хотите отслеживать координаты устройства с течением времени. Пример объекта координат может выглядеть следующим образом:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Неограниченные шаблоны создания
Одним из распространенных шаблонов использования является создание канала с неограниченной пропускной способностью по умолчанию.
var channel = Channel.CreateUnbounded<Coordinates>();
Но вместо этого предположим, что вы хотите создать несвязанный канал с несколькими производителями и потребителями:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
В этом случае все записи синхронны, даже WriteAsync. Это связано с тем, что канал с неограниченной ёмкостью всегда имеет доступное место для записи, что позволяет производить её фактически немедленно. Однако, при AllowSynchronousContinuations настройке true, записи в конечном итоге могут выполнять работу, связанную с читателем, продолжая свои операции. Это не влияет на синхронность операции.
Ограниченные шаблоны создания
С ограниченными каналами настройка канала должна быть известна потребителю, чтобы обеспечить надлежащее потребление. То есть потребитель должен знать, какое поведение канал проявляет при достижении установленной границы. Давайте рассмотрим некоторые распространенные шаблоны создания с ограниченными границами.
Самый простой способ создания привязанного канала — указать емкость:
var channel = Channel.CreateBounded<Coordinates>(1);
Предыдущий код создает ограниченный канал с максимальной емкостью 1. Доступны другие варианты, некоторые параметры совпадают с несвязанным каналом, а другие относятся к несвязанным каналам:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
В приведенном выше коде создается ограниченный канал, ограниченный 1000 элементами, с одним писателем, но множественными читателями. Его поведение в полном режиме определяется как DropWrite, что означает, что он удаляет элемент, записанный, если канал заполнен.
Чтобы наблюдать за элементами, которые удаляются при использовании привязанных каналов, зарегистрируйте обратный itemDropped вызов:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
При каждом заполнении канала и добавлении нового элемента вызывается обратный вызов itemDropped. В этом примере предоставленный обратный вызов записывает элемент в консоль, но вы можете выполнить любое другое действие.
Шаблоны производителя
Представьте, что продюсер в этом сценарии записывает новые координаты в канал. Производитель может сделать это, вызвав:TryWrite
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Предыдущий код производителя:
- Принимает
Channel<Coordinates>.Writer(ChannelWriter<Coordinates>) в качестве аргумента вместе с начальнымCoordinates. - Определяет условный
whileцикл, который пытается переместить координаты с помощьюTryWrite.
Альтернативный производитель может использовать метод WriteAsync:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Опять же, используется Channel<Coordinates>.Writer в цикле while . На этот раз вызывается метод WriteAsync. Метод продолжается только после записи координат. Когда цикл while завершается, вызывается Complete, что сигнализирует о том, что больше данных не записывается в канал.
Другой шаблон производителя — использовать WaitToWriteAsync метод, рассмотрим следующий код:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
В рамках условного whileвызова используется результат WaitToWriteAsync вызова, чтобы определить, следует ли продолжать цикл.
Шаблоны потребителей
Существует несколько общих шаблонов потребителей каналов. Если канал никогда не заканчивается, это означает, что он создает данные на неопределенный срок, потребитель может использовать while (true) цикл и читать данные по мере его доступности:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Примечание.
Этот код создает исключение, если канал закрыт.
Альтернативный пользователь может избежать этой проблемы, используя вложенный цикл while, как показано в следующем коде.
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
В приведенном выше коде потребитель ждет возможности прочитать данные. После того как данные будут доступны, потребитель пытается прочитать его. Эти циклы продолжают оцениваться до тех пор, пока производитель канала не сигнализирует о том, что данные больше не будут считываться. При этом, когда производитель, как известно, имеет ограниченное количество элементов, которые он создает, и он сигнализирует о завершении, потребитель может использовать await foreach семантику для итерации элементов:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Данный код использует метод ReadAllAsync для чтения всех координат из канала.