Библиотека System.Threading.Channel
Пространство System.Threading.Channels имен предоставляет набор структур данных синхронизации для передачи данных между производителями и потребителями асинхронно. Библиотека предназначена для .NET Standard и работает на всех реализациях .NET.
Эта библиотека доступна в пакете NuGet System.Threading.Channel . Однако если вы используете .NET Core 3.0 или более поздней версии, пакет входит в состав платформы.
Модель программирования производителя и потребителя
Каналы — это реализация концептуальной модели программирования производителя или потребителя. В этой модели программирования производители асинхронно создают данные, а потребители асинхронно используют эти данные. Другими словами, эта модель передает данные из одной стороны в другую через очередь первого выхода (FIFO). Попробуйте подумать о каналах, как и любой другой распространенный универсальный тип коллекции, например List<T>
. Основное различие заключается в том, что эта коллекция управляет синхронизацией и предоставляет различные модели потребления с помощью параметров создания фабрики. Эти параметры управляют поведением каналов, таких как количество элементов, которые они могут хранить, и что происходит, если достигнуто это ограничение, или же доступ к каналу осуществляется несколькими производителями или несколькими потребителями одновременно.
Ограничивающие стратегии
В зависимости от того, как Channel<T>
создается объект, его читатель и средство записи ведут себя по-разному.
Чтобы создать канал, указывающий максимальную емкость, вызовите Channel.CreateBounded. Чтобы создать канал, который используется любым количеством читателей и писателей одновременно, вызовите Channel.CreateUnbounded. Каждая ограничивающая стратегия предоставляет различные параметры, определенные создателем, либо BoundedChannelOptions UnboundedChannelOptions соответственно.
Примечание.
Независимо от ограничивающей стратегии, канал всегда будет вызывать ChannelClosedException , когда он используется после закрытия.
Несвязанные каналы
Чтобы создать несвязанный канал, вызовите одну из Channel.CreateUnbounded перегрузок:
var channel = Channel.CreateUnbounded<T>();
При создании необвязанного канала по умолчанию канал может использоваться любым количеством читателей и писателей одновременно. Кроме того, при создании несвязанного канала можно указать несоотделенное поведение, предоставив UnboundedChannelOptions
экземпляр. Емкость канала не связана, и все операции записи выполняются синхронно. Дополнительные примеры см. в разделе "Неподвязанные шаблоны создания".
Ограниченные каналы
Чтобы создать ограниченный канал, вызовите одну из Channel.CreateBounded перегрузок:
var channel = Channel.CreateBounded<T>(7);
Предыдущий код создает канал с максимальной емкостью 7
элементов. При создании привязанного канала канал привязан к максимальной емкости. Когда граница достигнута, поведение по умолчанию заключается в том, что канал асинхронно блокирует продюсера, пока пространство не станет доступным. Это поведение можно настроить, указав параметр при создании канала. Ограниченные каналы можно создать с любым значением емкости, превышающим нулю. Другие примеры см. в разделе "Ограничивающие шаблоны создания".
Поведение полного режима
При использовании привязанного канала можно указать поведение, которое канал соответствует достижении настроенной привязки. В следующей таблице перечислены режимы полного режима для каждого BoundedChannelFullMode значения:
Значение | Поведение |
---|---|
BoundedChannelFullMode.Wait | Это значение по умолчанию. Вызовы для WriteAsync ожидания доступности пространства для завершения операции записи. Вызовы для немедленного TryWrite возврата false . |
BoundedChannelFullMode.DropNewest | Удаляет и не использует новый элемент в канале, чтобы освободить место для записываемого элемента. |
BoundedChannelFullMode.DropOldest | Удаляет и не использует наиболее старый элемент в канале, чтобы освободить место для записываемого элемента. |
BoundedChannelFullMode.DropWrite | Удаляет записываемый элемент. |
Внимание
Всякий Channel<TWrite,TRead>.Writer раз, когда производство быстрее, чем Channel<TWrite,TRead>.Reader может потреблять, писатель канала испытывает обратное давление.
API-интерфейсы производителя
Функциональные возможности производителя предоставляются на устройстве Channel<TWrite,TRead>.Writer. API-интерфейсы производителя и ожидаемое поведение подробно описаны в следующей таблице:
API | Ожидаемое поведение |
---|---|
ChannelWriter<T>.Complete | Помечает канал как завершенный, то есть больше элементов не записываются в него. |
ChannelWriter<T>.TryComplete | Пытается пометить канал как завершенный, то есть больше данных не записывается в него. |
ChannelWriter<T>.TryWrite | Предпринимает попытку записать указанный элемент в канал. При использовании с необвязанным каналом это всегда возвращается, если запись канала не сигнализирует true о завершении с помощью илиChannelWriter<T>.CompleteChannelWriter<T>.TryComplete. |
ChannelWriter<T>.WaitToWriteAsync | Возвращает значение ValueTask<TResult> , которое завершается, когда пространство доступно для записи элемента. |
ChannelWriter<T>.WriteAsync | Асинхронно записывает элемент в канал. |
Потребительские API
Функциональность потребителя предоставляется в объекте Channel<TWrite,TRead>.Reader. API-интерфейсы потребителей и ожидаемое поведение подробно описаны в следующей таблице:
API | Ожидаемое поведение |
---|---|
ChannelReader<T>.ReadAllAsync | Создает интерфейс IAsyncEnumerable<T>, позволяющий считывать все данные из канала. |
ChannelReader<T>.ReadAsync | Асинхронно считывает элемент из канала. |
ChannelReader<T>.TryPeek | Пытается просмотреть элемент из канала. |
ChannelReader<T>.TryRead | Пытается считать элемент из канала. |
ChannelReader<T>.WaitToReadAsync | ValueTask<TResult> Возвращает значение, которое завершается, когда данные доступны для чтения. |
Общие варианты использования
Существует несколько шаблонов использования для каналов. API разработан так, чтобы он был простым, согласованным и как можно более гибким. Все асинхронные методы возвращают ( ValueTask
или ValueTask<bool>
), представляющие упрощенную асинхронную операцию, которая может избежать выделения, если операция завершается синхронно и потенциально даже асинхронно. Кроме того, API предназначен для создания, в том, что создатель канала дает обещания о его предполагаемом использовании. При создании канала с определенными параметрами внутренняя реализация может работать более эффективно, зная эти обещания.
Шаблоны создания
Представьте, что вы создаете решение производителя или потребителя для глобальной системы позиций (GPS). Вы хотите отслеживать координаты устройства с течением времени. Пример объекта координат может выглядеть следующим образом:
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
Неподвязанные шаблоны создания
Одним из распространенных шаблонов использования является создание канала без подключения по умолчанию:
var channel = Channel.CreateUnbounded<Coordinates>();
Но вместо этого предположим, что вы хотите создать несвязанный канал с несколькими производителями и потребителями:
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
В этом случае все записи синхронны, даже WriteAsync
те. Это связано с тем, что несвязанный канал всегда имеет доступное место для записи немедленно. Однако при AllowSynchronousContinuations
настройке true
записи могут в конечном итоге выполнять работу, связанную с читателем, выполняя свои продолжения. Это не влияет на синхронность операции.
Ограниченные шаблоны создания
С ограниченными каналами настройка канала должна быть известна потребителю, чтобы обеспечить надлежащее потребление. То есть потребитель должен знать, какое поведение канал отображает при достижении настроенной привязки. Давайте рассмотрим некоторые распространенные шаблоны создания с ограниченными границами.
Самый простой способ создания привязанного канала — указать емкость:
var channel = Channel.CreateBounded<Coordinates>(1);
Предыдущий код создает ограниченный канал с максимальной емкостью 1
. Доступны другие варианты, некоторые параметры совпадают с несвязанным каналом, а другие относятся к несвязанным каналам:
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
В приведенном выше коде канал создается как ограниченный канал, который ограничен 1000 элементами с одним средством записи, но многие читатели. Его поведение в полном режиме определяется как DropWrite
, что означает, что он удаляет элемент, записанный, если канал заполнен.
Чтобы наблюдать за элементами, которые удаляются при использовании привязанных каналов, зарегистрируйте обратный itemDropped
вызов:
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
При каждом заполнении канала и добавлении itemDropped
нового элемента вызывается обратный вызов. В этом примере предоставленный обратный вызов записывает элемент в консоль, но вы можете выполнить любое другое действие.
Шаблоны производителя
Представьте, что продюсер в этом сценарии записывает новые координаты в канал. Производитель может сделать это, вызвав:TryWrite
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
Предыдущий код производителя:
- Принимает (
Channel<Coordinates>.Writer
ChannelWriter<Coordinates>
) в качестве аргумента, а также инициалCoordinates
. - Определяет условный
while
цикл, который пытается переместить координаты с помощьюTryWrite
.
Альтернативный WriteAsync
производитель может использовать метод:
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
Опять же, используется Channel<Coordinates>.Writer
в цикле while
. Но на WriteAsync этот раз вызывается метод. Метод будет продолжаться только после записи координат. while
Когда цикл завершается, вызывается Complete вызов, который сигнализирует о том, что больше данных не записывается в канал.
Другой шаблон производителя — использовать WaitToWriteAsync метод, рассмотрим следующий код:
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
В рамках условного while
вызова используется результат WaitToWriteAsync
вызова, чтобы определить, следует ли продолжать цикл.
Шаблоны потребителей
Существует несколько общих шаблонов потребителей каналов. Если канал никогда не заканчивается, это означает, что он создает данные на неопределенный срок, потребитель может использовать while (true)
цикл и читать данные по мере его доступности:
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
Примечание.
Этот код создает исключение, если канал закрыт.
Альтернативный потребитель может избежать этой проблемы с помощью вложенного цикла во время выполнения, как показано в следующем коде:
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
В приведенном выше коде потребитель ожидает чтения данных. После того как данные будут доступны, потребитель пытается прочитать его. Эти циклы продолжают оцениваться до тех пор, пока производитель канала не сигнализирует о том, что данные больше не будут считываться. При этом, когда производитель, как известно, имеет ограниченное количество элементов, которые он создает, и он сигнализирует о завершении, потребитель может использовать await foreach
семантику для итерации элементов:
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
Предыдущий ReadAllAsync код использует метод для чтения всех координат из канала.