학습
System.Threading.Channels 라이브러리
System.Threading.Channels 네임스페이스는 생산자와 소비자 간에 데이터를 비동기적으로 전달하기 위한 일련의 동기화 데이터 구조를 제공합니다. 라이브러리는 .NET Standard를 대상으로 하며 모든 .NET 구현에서 작동합니다.
이 라이브러리는 System.Threading.Channels NuGet 패키지에서 사용할 수 있습니다. 그러나 .NET Core 3.0 이상을 사용하는 경우 패키지는 프레임워크의 일부로 포함됩니다.
채널은 생산자/소비자 개념적 프로그래밍 모델을 구현한 것입니다. 이 프로그래밍 모델에서 생산자는 비동기적으로 데이터를 생성하고 소비자는 해당 데이터를 비동기적으로 소비합니다. 즉, 이 모델은 FIFO(선입선출) 큐를 통해 한 당사자에서 다른 당사자로 데이터를 전달합니다. List<T>
와 같은 다른 제네릭적인 제네릭 컬렉션 형식과 마찬가지로 채널을 생각해 보세요. 가장 큰 차이점은 이 컬렉션이 동기화를 관리하고 팩터리 만들기 옵션을 통해 다양한 사용량 모델을 제공한다는 것입니다. 이러한 옵션은 저장할 수 있는 요소 수, 해당 제한에 도달할 경우 발생하는 상황, 여러 생산자 또는 여러 소비자가 동시에 채널에 액세스하는지 여부 등 채널의 동작을 제어합니다.
Channel<T>
가 만들어지는 방식에 따라 읽기 권한자와 기록자가 다르게 동작합니다.
최대 용량을 지정하는 채널을 만들려면 Channel.CreateBounded를 호출합니다. 여러 읽기 권한자와 기록자가 동시에 사용하는 채널을 만들려면 Channel.CreateUnbounded를 호출합니다. 각 경계 전략은 각각 BoundedChannelOptions 또는 UnboundedChannelOptions 등 다양한 작성자 정의 옵션을 노출합니다.
참고
경계 전략에 관계없이 채널은 닫힌 후 사용될 때 항상 ChannelClosedException을 throw합니다.
제한되지 않은 채널을 만들려면 Channel.CreateUnbounded 오버로드 중 하나를 호출합니다.
var channel = Channel.CreateUnbounded<T>();
제한되지 않은 채널을 만들면 기본적으로 무제한의 읽기 권한자와 작성자가 동시에 채널을 사용할 수 있습니다. 또는 UnboundedChannelOptions
인스턴스를 제공하여 무제한 채널을 만들 때 네이티브가 아닌 동작을 지정할 수 있습니다. 채널의 용량에는 제한이 없으며 모든 쓰기는 동기식으로 수행됩니다. 더 많은 예를 보려면 무제한 만들기 패턴을 참조하세요.
제한된 채널을 만들려면 Channel.CreateBounded 오버로드 중 하나를 호출합니다.
var channel = Channel.CreateBounded<T>(7);
앞의 코드는 최대 7
개 항목 용량을 갖는 채널을 만듭니다. 제한된 채널을 만들면 채널이 최대 용량으로 제한됩니다. 경계에 도달하면 기본 동작은 공간을 사용할 수 있을 때까지 채널이 생산자를 비동기적으로 차단하는 것입니다. 채널을 만들 때 옵션을 지정하여 이 동작을 구성할 수 있습니다. 제한된 채널은 0보다 큰 용량 값으로 만들어질 수 있습니다. 다른 예는 제한된 만들기 패턴을 참조하세요.
제한된 채널을 사용할 때 구성된 경계에 도달했을 때 채널이 준수하는 동작을 지정할 수 있습니다. 다음 표에는 각 BoundedChannelFullMode 값에 대한 전체 모드 동작이 나열되어 있습니다.
값 | 동작 |
---|---|
BoundedChannelFullMode.Wait | 기본값입니다. WriteAsync 에 대한 호출은 쓰기 작업을 완료하기 위해 사용 가능한 공간이 생길 때까지 기다립니다. TryWrite 를 호출하면 즉시 false 가 반환됩니다. |
BoundedChannelFullMode.DropNewest | 항목을 쓸 공간을 확보하기 위해 채널의 가장 새로운 항목을 제거하고 무시합니다. |
BoundedChannelFullMode.DropOldest | 항목을 쓸 공간을 확보하기 위해 채널의 가장 오래된 항목을 제거하고 무시합니다. |
BoundedChannelFullMode.DropWrite | 쓰고 있는 항목을 삭제합니다. |
중요
Channel<TWrite,TRead>.Reader가 소비할 수 있는 것보다 Channel<TWrite,TRead>.Writer가 빠르게 생성될 때마다 채널 작성자는 반대 방향의 압력을 받게 됩니다.
생산자 기능은 Channel<TWrite,TRead>.Writer에 노출됩니다. 생산자 API 및 예상 동작은 다음 표에 자세히 설명되어 있습니다.
API | 예상되는 동작 |
---|---|
ChannelWriter<T>.Complete | 채널을 완료된 것으로 표시합니다. 즉, 더 이상 항목이 기록되지 않음을 의미합니다. |
ChannelWriter<T>.TryComplete | 채널을 완료된 것으로 표시하려고 시도합니다. 즉, 더 이상 데이터가 기록되지 않음을 의미합니다. |
ChannelWriter<T>.TryWrite | 지정된 항목을 채널에 쓰려고 합니다. 제한되지 않은 채널과 함께 사용하면 채널 작성자가 ChannelWriter<T>.Complete 또는 ChannelWriter<T>.TryComplete로 완료 신호를 보내지 않는 한 항상 true 를 반환합니다. |
ChannelWriter<T>.WaitToWriteAsync | 항목을 쓸 수 있는 공간이 생기면 완료되는 ValueTask<TResult>를 반환합니다. |
ChannelWriter<T>.WriteAsync | 채널에 항목을 비동기적으로 씁니다. |
소비자 기능은 Channel<TWrite,TRead>.Reader에 노출됩니다. 소비자 API 및 예상 동작은 다음 표에 자세히 설명되어 있습니다.
API | 예상되는 동작 |
---|---|
ChannelReader<T>.ReadAllAsync | 채널의 모든 데이터를 읽을 수 있도록 IAsyncEnumerable<T>을 만듭니다. |
ChannelReader<T>.ReadAsync | 채널에서 항목을 비동기적으로 읽습니다. |
ChannelReader<T>.TryPeek | 채널의 항목을 엿보려고 시도합니다. |
ChannelReader<T>.TryRead | 채널에서 항목을 읽으려고 합니다. |
ChannelReader<T>.WaitToReadAsync | 데이터를 읽을 수 있게 되면 완료되는 ValueTask<TResult>를 반환합니다. |
채널에는 여러 가지 사용 패턴이 있습니다. API는 최대한 단순하고 일관되며 유연하게 설계되었습니다. 모든 비동기 메서드는 작업이 동기적으로 완료되고 잠재적으로 심지어 비동기적으로 완료되는 경우 할당을 피할 수 있는 경량 비동기 작업을 나타내는 ValueTask
(또는 ValueTask<bool>
)를 반환합니다. 또한 API는 채널 작성자가 의도한 사용량에 대한 약속을 한다는 측면에서 구성 가능하도록 설계되었습니다. 특정 매개 변수를 사용하여 채널을 만들면 이러한 프라미스를 알고 내부 구현이 보다 효율적으로 작동할 수 있습니다.
GPS(Global Position System)용 생산자/소비자 솔루션을 만들고 있다고 가정해 보겠습니다. 시간이 지남에 따라 디바이스의 좌표를 추적하려고 합니다. 샘플 좌표 개체는 다음과 같습니다.
/// <summary>
/// A representation of a device's coordinates,
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
Guid DeviceId,
double Latitude,
double Longitude);
일반적인 사용 패턴 중 하나는 기본 제한 없는 채널을 만드는 것입니다.
var channel = Channel.CreateUnbounded<Coordinates>();
하지만 여러 생산자 및 소비자와 함께 제한 없는 채널을 만들고자 하는 경우를 생각해 보겠습니다.
var channel = Channel.CreateUnbounded<Coordinates>(
new UnboundedChannelOptions
{
SingleWriter = false,
SingleReader = false,
AllowSynchronousContinuations = true
});
이 경우 모든 쓰기는 동기식이며 WriteAsync
도 마찬가지입니다. 이는 제한 없는 채널에는 항상 즉시 효과적으로 쓰기에 사용할 수 있는 공간이 있기 때문입니다. 그러나 AllowSynchronousContinuations
를 true
로 설정하면 쓰기가 연속 작업을 실행하여 읽기 권한자와 관련된 작업을 수행하게 될 수 있습니다. 이는 작업의 동시성에 영향을 주지 않습니다.
제한된 채널을 사용하는 경우 적절한 사용량을 보장하려면 사용자에게 채널의 구성 가능성을 알려야 합니다. 즉, 소비자는 구성된 경계에 도달했을 때 채널이 나타내는 동작을 알아야 합니다. 일반적인 제한된 만들기 패턴 중 일부를 살펴보겠습니다.
제한된 채널을 만드는 가장 간단한 방법은 용량을 지정하는 것입니다.
var channel = Channel.CreateBounded<Coordinates>(1);
앞의 코드는 최대 용량이 1
인 제한된 채널을 만듭니다. 다른 옵션도 사용할 수 있습니다. 일부 옵션은 제한 없는 채널과 동일하지만 다른 옵션은 제한 없는 채널에만 적용됩니다.
var channel = Channel.CreateBounded<Coordinates>(
new BoundedChannelOptions(1_000)
{
SingleWriter = true,
SingleReader = false,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.DropWrite
});
앞의 코드에서 채널은 작성자가 한 명이지만 읽기 권한자가 많은 1,000개 항목으로 제한되는 제한된 채널로 만들어집니다. 전체 모드 동작은 DropWrite
로 정의됩니다. 즉, 채널이 가득 차면 기록 중인 항목을 삭제한다는 의미입니다.
제한된 채널을 사용할 때 삭제되는 항목을 관찰하려면 itemDropped
콜백을 등록합니다.
var channel = Channel.CreateBounded(
new BoundedChannelOptions(10)
{
AllowSynchronousContinuations = true,
FullMode = BoundedChannelFullMode.DropOldest
},
static void (Coordinates dropped) =>
Console.WriteLine($"Coordinates dropped: {dropped}"));
채널이 가득 차고 새 항목이 추가될 때마다 itemDropped
콜백이 호출됩니다. 이 예에서 제공된 콜백은 항목을 콘솔에 기록하지만 원하는 다른 작업을 자유롭게 수행할 수 있습니다.
이 시나리오의 생산자가 채널에 새 좌표를 쓰고 있다고 상상해 보세요. 생산자는 TryWrite를 호출하여 이 작업을 수행할 수 있습니다.
static void ProduceWithWhileAndTryWrite(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
}
}
이전 생산자 코드:
- 초기
Coordinates
와 함께Channel<Coordinates>.Writer
(ChannelWriter<Coordinates>
)을 인수로 허용합니다. TryWrite
를 사용하여 좌표 이동을 시도하는 조건부while
루프를 정의합니다.
대체 생산자는 WriteAsync
메서드를 사용할 수 있습니다.
static async ValueTask ProduceWithWhileWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 })
{
await writer.WriteAsync(
item: coordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
});
}
writer.Complete();
}
다시 말하지만, Channel<Coordinates>.Writer
는 while
루프 내에서 사용됩니다. 하지만 이번에는 WriteAsync 메서드가 호출됩니다. 이 방법은 좌표가 작성된 후에만 계속됩니다. while
루프가 종료되면 Complete가 호출되어 더 이상 채널에 데이터가 기록되지 않는다는 신호를 보냅니다.
또 다른 생산자 패턴은 WaitToWriteAsync 메서드를 사용하는 것입니다. 다음 코드를 고려합니다.
static async ValueTask ProduceWithWaitToWriteAsync(
ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
await writer.WaitToWriteAsync())
{
var tempCoordinates = coordinates with
{
Latitude = coordinates.Latitude + .5,
Longitude = coordinates.Longitude + 1
};
if (writer.TryWrite(item: tempCoordinates))
{
coordinates = tempCoordinates;
}
await Task.Delay(TimeSpan.FromMilliseconds(10));
}
writer.Complete();
}
조건부 while
의 일부로 WaitToWriteAsync
호출의 결과는 루프를 계속할지 여부를 결정하는 데 사용됩니다.
몇 가지 일반적인 채널 소비자 패턴이 있습니다. 채널이 끝나지 않는 경우, 즉 데이터를 무기한 생성하는 경우 소비자는 while (true)
루프를 사용하고 데이터가 사용 가능해지면 읽을 수 있습니다.
static async ValueTask ConsumeWithWhileAsync(
ChannelReader<Coordinates> reader)
{
while (true)
{
// May throw ChannelClosedException if
// the parent channel's writer signals complete.
Coordinates coordinates = await reader.ReadAsync();
Console.WriteLine(coordinates);
}
}
참고
이 코드는 채널이 닫히면 예외를 throw합니다.
대체 소비자는 다음 코드와 같이 중첩된 while 루프를 사용하여 이러한 문제를 피할 수 있습니다.
static async ValueTask ConsumeWithNestedWhileAsync(
ChannelReader<Coordinates> reader)
{
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out Coordinates coordinates))
{
Console.WriteLine(coordinates);
}
}
}
이전 코드에서 소비자는 데이터 읽기를 기다립니다. 데이터를 사용할 수 있게 되면 소비자는 데이터를 읽으려고 시도합니다. 이러한 루프는 채널 생성자가 더 이상 읽을 데이터가 없다는 신호를 보낼 때까지 계속 평가합니다. 즉, 생산자가 생산하는 항목의 수가 한정되어 있고 완료 신호를 보내는 경우 소비자는 await foreach
의미 체계를 사용하여 항목을 반복할 수 있습니다.
static async ValueTask ConsumeWithAwaitForeachAsync(
ChannelReader<Coordinates> reader)
{
await foreach (Coordinates coordinates in reader.ReadAllAsync())
{
Console.WriteLine(coordinates);
}
}
앞의 코드는 ReadAllAsync 메서드를 사용하여 채널에서 모든 좌표를 읽습니다.
.NET 피드백
.NET은(는) 오픈 소스 프로젝트입니다. 다음 링크를 선택하여 피드백을 제공해 주세요.