System.Threading.Channels 라이브러리

System.Threading.Channels 네임스페이스는 생산자와 소비자 간에 데이터를 비동기적으로 전달하기 위한 일련의 동기화 데이터 구조를 제공합니다. 라이브러리는 .NET Standard를 대상으로 하며 모든 .NET 구현에서 작동합니다.

이 라이브러리는 System.Threading.Channels NuGet 패키지에서 사용할 수 있습니다. 그러나 .NET Core 3.0 이상을 사용하는 경우 패키지는 프레임워크의 일부로 포함됩니다.

생산자/소비자 개념적 프로그래밍 모델

채널은 생산자/소비자 개념적 프로그래밍 모델을 구현한 것입니다. 이 프로그래밍 모델에서 생산자는 비동기적으로 데이터를 생성하고 소비자는 해당 데이터를 비동기적으로 소비합니다. 즉, 이 모델은 한 당사자에서 다른 당사자에게 데이터를 전달합니다. List<T>와 같은 다른 제네릭적인 제네릭 컬렉션 형식과 마찬가지로 채널을 생각해 보세요. 가장 큰 차이점은 이 컬렉션이 동기화를 관리하고 팩터리 만들기 옵션을 통해 다양한 사용량 모델을 제공한다는 것입니다. 이러한 옵션은 저장할 수 있는 요소 수, 해당 제한에 도달할 경우 발생하는 상황, 여러 생산자 또는 여러 소비자가 동시에 채널에 액세스하는지 여부 등 채널의 동작을 제어합니다.

경계 전략

Channel<T>가 만들어지는 방식에 따라 읽기 권한자와 기록자가 다르게 동작합니다.

최대 용량을 지정하는 채널을 만들려면 Channel.CreateBounded를 호출합니다. 여러 읽기 권한자와 기록자가 동시에 사용하는 채널을 만들려면 Channel.CreateUnbounded를 호출합니다. 각 경계 전략은 각각 BoundedChannelOptions 또는 UnboundedChannelOptions 등 다양한 작성자 정의 옵션을 노출합니다.

참고 항목

경계 전략에 관계없이 채널은 닫힌 후 사용될 때 항상 ChannelClosedException을 throw합니다.

무제한 채널

제한되지 않은 채널을 만들려면 Channel.CreateUnbounded 오버로드 중 하나를 호출합니다.

var channel = Channel.CreateUnbounded<T>();

제한되지 않은 채널을 만들면 기본적으로 무제한의 읽기 권한자와 작성자가 동시에 채널을 사용할 수 있습니다. 또는 UnboundedChannelOptions 인스턴스를 제공하여 무제한 채널을 만들 때 네이티브가 아닌 동작을 지정할 수 있습니다. 채널의 용량에는 제한이 없으며 모든 쓰기는 동기식으로 수행됩니다. 더 많은 예를 보려면 무제한 만들기 패턴을 참조하세요.

제한된 채널

제한된 채널을 만들려면 Channel.CreateBounded 오버로드 중 하나를 호출합니다.

var channel = Channel.CreateBounded<T>(7);

앞의 코드는 최대 7개 항목 용량을 갖는 채널을 만듭니다. 제한된 채널을 만들면 채널이 최대 용량으로 제한됩니다. 경계에 도달하면 기본 동작은 공간을 사용할 수 있을 때까지 채널이 생산자를 비동기적으로 차단하는 것입니다. 채널을 만들 때 옵션을 지정하여 이 동작을 구성할 수 있습니다. 제한된 채널은 0보다 큰 용량 값으로 만들어질 수 있습니다. 다른 예는 제한된 만들기 패턴을 참조하세요.

전체 모드 동작

제한된 채널을 사용할 때 구성된 경계에 도달했을 때 채널이 준수하는 동작을 지정할 수 있습니다. 다음 표에는 각 BoundedChannelFullMode 값에 대한 전체 모드 동작이 나열되어 있습니다.

동작
BoundedChannelFullMode.Wait 기본값입니다. WriteAsync에 대한 호출은 쓰기 작업을 완료하기 위해 사용 가능한 공간이 생길 때까지 기다립니다. TryWrite를 호출하면 즉시 false가 반환됩니다.
BoundedChannelFullMode.DropNewest 항목을 쓸 공간을 확보하기 위해 채널의 가장 새로운 항목을 제거하고 무시합니다.
BoundedChannelFullMode.DropOldest 항목을 쓸 공간을 확보하기 위해 채널의 가장 오래된 항목을 제거하고 무시합니다.
BoundedChannelFullMode.DropWrite 쓰고 있는 항목을 삭제합니다.

Important

Channel<TWrite,TRead>.Reader가 소비할 수 있는 것보다 Channel<TWrite,TRead>.Writer가 빠르게 생성될 때마다 채널 작성자는 반대 방향의 압력을 받게 됩니다.

생산자 API

생산자 기능은 Channel<TWrite,TRead>.Writer에 노출됩니다. 생산자 API 및 예상 동작은 다음 표에 자세히 설명되어 있습니다.

API 예상되는 동작
ChannelWriter<T>.Complete 채널을 완료된 것으로 표시합니다. 즉, 더 이상 항목이 기록되지 않음을 의미합니다.
ChannelWriter<T>.TryComplete 채널을 완료된 것으로 표시하려고 시도합니다. 즉, 더 이상 데이터가 기록되지 않음을 의미합니다.
ChannelWriter<T>.TryWrite 지정된 항목을 채널에 쓰려고 합니다. 제한되지 않은 채널과 함께 사용하면 채널 작성자가 ChannelWriter<T>.Complete 또는 ChannelWriter<T>.TryComplete로 완료 신호를 보내지 않는 한 항상 true를 반환합니다.
ChannelWriter<T>.WaitToWriteAsync 항목을 쓸 수 있는 공간이 생기면 완료되는 ValueTask<TResult>를 반환합니다.
ChannelWriter<T>.WriteAsync 채널에 항목을 비동기적으로 씁니다.

소비자 API

소비자 기능은 Channel<TWrite,TRead>.Reader에 노출됩니다. 소비자 API 및 예상 동작은 다음 표에 자세히 설명되어 있습니다.

API 예상되는 동작
ChannelReader<T>.ReadAllAsync 채널의 모든 데이터를 읽을 수 있도록 IAsyncEnumerable<T>을 만듭니다.
ChannelReader<T>.ReadAsync 채널에서 항목을 비동기적으로 읽습니다.
ChannelReader<T>.TryPeek 채널의 항목을 엿보려고 시도합니다.
ChannelReader<T>.TryRead 채널에서 항목을 읽으려고 합니다.
ChannelReader<T>.WaitToReadAsync 데이터를 읽을 수 있게 되면 완료되는 ValueTask<TResult>를 반환합니다.

일반적인 사용 패턴

채널에는 여러 가지 사용 패턴이 있습니다. API는 최대한 단순하고 일관되며 유연하게 설계되었습니다. 모든 비동기 메서드는 작업이 동기적으로 완료되고 잠재적으로 심지어 비동기적으로 완료되는 경우 할당을 피할 수 있는 경량 비동기 작업을 나타내는 ValueTask(또는 ValueTask<bool>)를 반환합니다. 또한 API는 채널 작성자가 의도한 사용량에 대한 약속을 한다는 측면에서 구성 가능하도록 설계되었습니다. 특정 매개 변수를 사용하여 채널을 만들면 이러한 프라미스를 알고 내부 구현이 보다 효율적으로 작동할 수 있습니다.

만들기 패턴

GPS(Global Position System)용 생산자/소비자 솔루션을 만들고 있다고 가정해 보겠습니다. 시간이 지남에 따라 디바이스의 좌표를 추적하려고 합니다. 샘플 좌표 개체는 다음과 같습니다.

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

제한 없는 만들기 패턴

일반적인 사용 패턴 중 하나는 기본 제한 없는 채널을 만드는 것입니다.

var channel = Channel.CreateUnbounded<Coordinates>();

하지만 여러 생산자 및 소비자와 함께 제한 없는 채널을 만들고자 하는 경우를 생각해 보겠습니다.

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

이 경우 모든 쓰기는 동기식이며 WriteAsync도 마찬가지입니다. 이는 제한 없는 채널에는 항상 즉시 효과적으로 쓰기에 사용할 수 있는 공간이 있기 때문입니다. 그러나 AllowSynchronousContinuationstrue로 설정하면 쓰기가 연속 작업을 실행하여 읽기 권한자와 관련된 작업을 수행하게 될 수 있습니다. 이는 작업의 동시성에 영향을 주지 않습니다.

제한된 만들기 패턴

제한된 채널을 사용하는 경우 적절한 사용량을 보장하려면 사용자에게 채널의 구성 가능성을 알려야 합니다. 즉, 소비자는 구성된 경계에 도달했을 때 채널이 나타내는 동작을 알아야 합니다. 일반적인 제한된 만들기 패턴 중 일부를 살펴보겠습니다.

제한된 채널을 만드는 가장 간단한 방법은 용량을 지정하는 것입니다.

var channel = Channel.CreateBounded<Coordinates>(1);

앞의 코드는 최대 용량이 1인 제한된 채널을 만듭니다. 다른 옵션도 사용할 수 있습니다. 일부 옵션은 제한 없는 채널과 동일하지만 다른 옵션은 제한 없는 채널에만 적용됩니다.

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

앞의 코드에서 채널은 작성자가 한 명이지만 읽기 권한자가 많은 1,000개 항목으로 제한되는 제한된 채널로 만들어집니다. 전체 모드 동작은 DropWrite로 정의됩니다. 즉, 채널이 가득 차면 기록 중인 항목을 삭제한다는 의미입니다.

제한된 채널을 사용할 때 삭제되는 항목을 관찰하려면 itemDropped 콜백을 등록합니다.

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

채널이 가득 차고 새 항목이 추가될 때마다 itemDropped 콜백이 호출됩니다. 이 예에서 제공된 콜백은 항목을 콘솔에 기록하지만 원하는 다른 작업을 자유롭게 수행할 수 있습니다.

생산자 패턴

이 시나리오의 생산자가 채널에 새 좌표를 쓰고 있다고 상상해 보세요. 생산자는 TryWrite를 호출하여 이 작업을 수행할 수 있습니다.

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

이전 생산자 코드:

  • 초기 Coordinates와 함께 Channel<Coordinates>.Writer(ChannelWriter<Coordinates>)을 인수로 허용합니다.
  • TryWrite를 사용하여 좌표 이동을 시도하는 조건부 while 루프를 정의합니다.

대체 생산자는 WriteAsync 메서드를 사용할 수 있습니다.

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

다시 말하지만, Channel<Coordinates>.Writerwhile 루프 내에서 사용됩니다. 하지만 이번에는 WriteAsync 메서드가 호출됩니다. 이 방법은 좌표가 작성된 후에만 계속됩니다. while 루프가 종료되면 Complete가 호출되어 더 이상 채널에 데이터가 기록되지 않는다는 신호를 보냅니다.

또 다른 생산자 패턴은 WaitToWriteAsync 메서드를 사용하는 것입니다. 다음 코드를 고려합니다.

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

조건부 while의 일부로 WaitToWriteAsync 호출의 결과는 루프를 계속할지 여부를 결정하는 데 사용됩니다.

소비자 패턴

몇 가지 일반적인 채널 소비자 패턴이 있습니다. 채널이 끝나지 않는 경우, 즉 데이터를 무기한 생성하는 경우 소비자는 while (true) 루프를 사용하고 데이터가 사용 가능해지면 읽을 수 있습니다.

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

참고 항목

이 코드는 채널이 닫히면 예외를 throw합니다.

대체 소비자는 다음 코드와 같이 중첩된 while 루프를 사용하여 이러한 문제를 피할 수 있습니다.

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

이전 코드에서 소비자는 데이터 읽기를 기다립니다. 데이터를 사용할 수 있게 되면 소비자는 데이터를 읽으려고 시도합니다. 이러한 루프는 채널 생성자가 더 이상 읽을 데이터가 없다는 신호를 보낼 때까지 계속 평가합니다. 즉, 생산자가 생산하는 항목의 수가 한정되어 있고 완료 신호를 보내는 경우 소비자는 await foreach 의미 체계를 사용하여 항목을 반복할 수 있습니다.

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

앞의 코드는 ReadAllAsync 메서드를 사용하여 채널에서 모든 좌표를 읽습니다.

참고 항목