英語で読む

次の方法で共有


System.Threading.Channels ライブラリ

System.Threading.Channels 名前空間は、プロデューサーとコンシューマーの間で非同期的にデータを渡すための一連の同期データ構造を提供します。 このライブラリは、.NET Standard を対象としており、すべての .NET 実装で動作します。

このライブラリは、System.Threading.Channels NuGet パッケージで入手できます。 ただし、.NET Core 3.0 以降を使用している場合、このパッケージはフレームワークの一部として含まれます。

プロデューサー/コンシューマーの概念プログラミング モデル

チャネルは、プロデューサー/コンシューマーの概念プログラミング モデルの実装です。 このプログラミング モデルでは、プロデューサーはデータを非同期に生成し、コンシューマーはそのデータを非同期に使用します。 言い換えると、このモデルでは、先入れ先出し ("FIFO") キューを介して、データがあるパーティーから別のパーティーに渡されます。 チャネルを、List<T> などの他の一般的な汎用コレクション型だと考えてみてください。 主な違いは、このコレクションでは同期が管理され、ファクトリ作成オプションを使用してさまざまな消費モデルが提供されます。 これらのオプションによってチャネルの動作が制御されます。たとえば、チャネルが格納を許可される要素の数とその制限に達したときの動作や、複数のプロデューサーまたは複数のコンシューマーが同時にチャネルにアクセスするかどうかなどがあります。

境界戦略

Channel<T> の作成方法に応じて、そのリーダーおよびライターの動作は異なります。

最大容量を指定するチャネルを作成するには、Channel.CreateBounded を呼び出します。 任意の数のリーダーおよびライターが同時に使用するチャネルを作成するには、Channel.CreateUnbounded を呼び出します。 各境界戦略では、作成者が定義したさまざまなオプションが、それぞれ BoundedChannelOptions または UnboundedChannelOptions のいずれかで公開されます。

注意

境界戦略に関係なく、チャネルを閉じた後に使用すると、ChannelClosedException がスローされます。

制限のないチャネル

制限のないチャネルを作成するには、いずれかの Channel.CreateUnbounded オーバーロードを呼び出します。

var channel = Channel.CreateUnbounded<T>();

制限のないチャネルを作成すると、既定では、任意の数のリーダーおよびライターが同時にチャネルを使用できます。 または、制限のないチャネルを作成するときに UnboundedChannelOptions インスタンスを指定して、既定以外の動作を指定することもできます。 チャネルの容量は無制限であり、すべての書き込みが同時に実行されます。 その他の例については、「制限なしの作成パターン」を参照してください。

制限付きチャネル

制限付きチャネルを作成するには、いずれかの Channel.CreateBounded オーバーロードを呼び出します。

var channel = Channel.CreateBounded<T>(7);

上記のコードでは、最大容量が 7 項目のチャネルを作成しています。 制限付きチャネルを作成すると、チャネルは最大容量に制限されます。 境界に達すると、既定の動作として、領域が使用できるようになるまでプロデューサーはチャネルによって非同期にブロックされます。 この動作を構成するには、チャネルの作成時にオプションを指定します。 制限付きチャネルは、0 より大きい容量値を指定して作成することができます。 その他の例については、「制限付きの作成パターン」を参照してください。

フル モード動作

制限付きチャネルを使用する場合、構成された境界に達したときにチャネルが従う動作を指定できます。 次の表に、各 BoundedChannelFullMode 値のフル モード動作を示します。

動作
BoundedChannelFullMode.Wait これが既定値です。 WriteAsync の呼び出しは、書き込み操作を完了するために、使用できる領域が空くまで待機します。 TryWrite の呼び出しは、即時に false を返します。
BoundedChannelFullMode.DropNewest 書き込まれる項目用の領域を確保するために、チャネル内の最新の項目を削除して無視します。
BoundedChannelFullMode.DropOldest 書き込まれる項目用の領域を確保するために、チャネル内の最も古い項目を削除して無視します。
BoundedChannelFullMode.DropWrite 書き込まれる項目を削除します。

重要

Channel<TWrite,TRead>.Writer による生成速度が Channel<TWrite,TRead>.Reader による消費速度よりも速い場合、チャネルのライターでバック プレッシャが発生します。

プロデューサー API

プロデューサー機能は、Channel<TWrite,TRead>.Writer に公開されています。 プロデューサー API と予期される動作の詳細を次の表に示します。

API 正しい動作
ChannelWriter<T>.Complete 完了としてチャネルをマークします。これ以上の項目は書き込まれないという意味です。
ChannelWriter<T>.TryComplete 完了としてチャネルをマークしようとします。これ以上のデータは書き込まれないという意味です。
ChannelWriter<T>.TryWrite 指定した項目のチャネルへの書き込みを試行します。 制限のないチャネルでこれを使用すると、チャネルのライターが ChannelWriter<T>.Complete または ChannelWriter<T>.TryComplete のいずれかで完了を通知しない限り、常に true が返されます。
ChannelWriter<T>.WaitToWriteAsync 項目を書き込むための空き領域が利用可能になると完了する ValueTask<TResult> を返します。
ChannelWriter<T>.WriteAsync 項目をチャネルに非同期的に書き込みます。

コンシューマー API

コンシューマー機能は、Channel<TWrite,TRead>.Reader で公開されています。 コンシューマー API と予期される動作の詳細を次の表示示します。

API 正しい動作
ChannelReader<T>.ReadAllAsync チャネルからのすべてのデータを読み取るための IAsyncEnumerable<T> を作成します。
ChannelReader<T>.ReadAsync チャネルから非同期的に項目を読み取ります。
ChannelReader<T>.TryPeek チャネルから項目をプレビューしようと試みます。
ChannelReader<T>.TryRead チャネルから項目を読み取ろうと試みます。
ChannelReader<T>.WaitToReadAsync データの読み取りを実行できるようになると完了する ValueTask<TResult> を返します。

一般的な使用パターン

チャネルには、いくつかの使用パターンがあります。 API は、シンプルで一貫性があり、最大限の柔軟性が得られるように設計されています。 すべての非同期メソッドは、ValueTask (または ValueTask<bool>) を返します。これは、操作が同期的に完了し、かつ非同期的にも完了する可能性がある場合、割り当てを回避できる軽量の非同期操作を表します。 さらに、この API は、チャネルの作成者が使用目的について約束するという点で、構成可能になるように設計されています。 特定のパラメーターを使用してチャネルを作成すると、内部の実装は、これらの約束を認識してより効率的に動作できます。

作成パターン

全地球測位システム (GPS) のプロデューサー/コンシューマー ソリューションを作成しているとしましょう。 時間と共にデバイスの座標を追跡する必要があります。 サンプルの座標オブジェクトは次のようになります。

/// <summary>
/// A representation of a device's coordinates, 
/// which includes latitude and longitude.
/// </summary>
/// <param name="DeviceId">A unique device identifier.</param>
/// <param name="Latitude">The latitude of the device.</param>
/// <param name="Longitude">The longitude of the device.</param>
public readonly record struct Coordinates(
    Guid DeviceId,
    double Latitude,
    double Longitude);

制限なしの作成パターン

一般的な使用パターンの 1 つは、既定の制限のないチャネルを作成することです。

var channel = Channel.CreateUnbounded<Coordinates>();

しかし、代わりに、複数のプロデューサーおよびコンシューマーを含む制限のないチャネルを作成するとしましょう。

var channel = Channel.CreateUnbounded<Coordinates>(
    new UnboundedChannelOptions
    {
        SingleWriter = false,
        SingleReader = false,
        AllowSynchronousContinuations = true
    });

この場合、WriteAsync であっても、すべての書き込みは同期されます。 これは、制限のないチャネルには、効果的にすぐに書き込むための空き容量が常にあるためです。 ただし、AllowSynchronousContinuationstrue に設定すると、書き込みは、継続を実行することによって、リーダーに関連付けられた作業を行うことになる場合があります。 これは、操作の同期に影響しません。

制限付きの作成パターン

制限付きチャネルを使用する場合、適切な消費を確保するために、チャネルの構成可能性をコンシューマーに知らせる必要があります。 つまり、コンシューマーは、構成された境界に達したときにチャネルがどのような動作を示すかを認識している必要があります。 制限付きの一般的な作成パターンをいくつか調べてみましょう。

境界付きチャネルを作成する最も簡単な方法は、容量を指定することです。

var channel = Channel.CreateBounded<Coordinates>(1);

上記のコードでは、最大容量が 1 の制限付きチャネルを作成しています。 その他のオプションを使用できます。一部のオプションは、制限なしのチャネルと同じですが、それ以外は制限付きチャネルに固有です。

var channel = Channel.CreateBounded<Coordinates>(
    new BoundedChannelOptions(1_000)
    {
        SingleWriter = true,
        SingleReader = false,
        AllowSynchronousContinuations = false,
        FullMode = BoundedChannelFullMode.DropWrite
    });

上記のコードでは、1,000 項目に制限され、ライターは 1 人、リーダーは複数の制限付きチャネルとしてチャネルを作成しています。 フル モード動作は DropWrite として定義されています。これは、チャネルがいっぱいの場合に、書き込まれる項目は削除されることを意味します。

制限付きチャネルを使用する場合、削除される項目を監視するには、itemDropped コールバックを登録します。

var channel = Channel.CreateBounded(
    new BoundedChannelOptions(10)
    {
        AllowSynchronousContinuations = true,
        FullMode = BoundedChannelFullMode.DropOldest
    },
    static void (Coordinates dropped) =>
        Console.WriteLine($"Coordinates dropped: {dropped}"));

チャネルがいっぱいの場合に、新しい項目を追加するたびに、itemDropped コールバックが呼び出されます。 この例では、指定されたコールバッによって項目がコンソールに書き込まれますが、必要な他のアクションを自由に実行できます。

プロデューサー パターン

このシナリオのプロデューサーが新しい座標をチャネルに書き込んでいるとしましょう。 プロデューサーは、TryWrite を呼び出してこれを行うことができます。

static void ProduceWithWhileAndTryWrite(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }
    }
}

上記のプロデューサーのコード:

  • 最初の Coordinates と共に、Channel<Coordinates>.Writer (ChannelWriter<Coordinates>) を引数として受け入れます。
  • TryWrite を使用して座標の移動を試みる条件付き while ループを定義します。

別のプロデューサーが WriteAsync メソッドを使用する可能性があります。

static async ValueTask ProduceWithWhileWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 })
    {
        await writer.WriteAsync(
            item: coordinates = coordinates with
            {
                Latitude = coordinates.Latitude + .5,
                Longitude = coordinates.Longitude + 1
            });
    }

    writer.Complete();
}

この場合も、while ループ内で Channel<Coordinates>.Writer が使用されます。 しかし、今回は、WriteAsync メソッドが呼び出されます。 このメソッドは、座標が書き込まれた後にのみ続行されます。 while ループが終了すると、Complete が呼び出され、チャネルに書き込まれるデータはこれ以上ないことが通知されます。

別のプロデューサー パターンは、WaitToWriteAsync メソッドを使用することです。次のコードについて検討してみましょう。

static async ValueTask ProduceWithWaitToWriteAsync(
    ChannelWriter<Coordinates> writer, Coordinates coordinates)
{
    while (coordinates is { Latitude: < 90, Longitude: < 180 } &&
        await writer.WaitToWriteAsync())
    {
        var tempCoordinates = coordinates with
        {
            Latitude = coordinates.Latitude + .5,
            Longitude = coordinates.Longitude + 1
        };

        if (writer.TryWrite(item: tempCoordinates))
        {
            coordinates = tempCoordinates;
        }

        await Task.Delay(TimeSpan.FromMilliseconds(10));
    }

    writer.Complete();
}

条件付き while の一部として、WaitToWriteAsync 呼び出しの結果を使用して、ループを続行するかどうかを判断します。

コンシューマー パターン

一般的なチャネル コンシューマー パターンがいくつかあります。 チャネルが終了することがない場合、つまり永久にデータが生成される場合、コンシューマーは while (true) ループを使用し、データが使用可能になった時点で読み取ることができます。

static async ValueTask ConsumeWithWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (true)
    {
        // May throw ChannelClosedException if
        // the parent channel's writer signals complete.
        Coordinates coordinates = await reader.ReadAsync();
        Console.WriteLine(coordinates);
    }
}

注意

チャネルが閉じている場合、このコードでは例外がスローされます。

別のコンシューマーは、次のコードに示すように、入れ子になった while ループを使用して、この問題を回避する場合があります。

static async ValueTask ConsumeWithNestedWhileAsync(
    ChannelReader<Coordinates> reader)
{
    while (await reader.WaitToReadAsync())
    {
        while (reader.TryRead(out Coordinates coordinates))
        {
            Console.WriteLine(coordinates);
        }
    }
}

上記のコードでは、コンシューマーはデータの読み取りを待ちます。 データが使用できるようになると、コンシューマーはその読み取りを試みます。 これらのループは、読み取るデータがなくなったことをチャネルのプロデューサーが通知するまで、評価を続行します。 そうは言っても、プロデューサーは生成する項目の数が限られていることがわかっていて、完了を通知すると、コンシューマーは await foreach セマンティクスを使用して項目を反復処理できます。

static async ValueTask ConsumeWithAwaitForeachAsync(
    ChannelReader<Coordinates> reader)
{
    await foreach (Coordinates coordinates in reader.ReadAllAsync())
    {
        Console.WriteLine(coordinates);
    }
}

上記のコードでは、ReadAllAsync メソッドを使用して、チャネルからすべての座標を読み取っています。

関連項目