次の方法で共有


Orleans ストリーミングのクイック スタート

このガイドでは、 Orleans ストリームを設定して使用する簡単な方法について説明します。 ストリーミング機能の詳細については、このドキュメントの他の部分を参照してください。

必要な構成

このガイドでは、グレイン メッセージングを使用してストリーム データをサブスクライバーに送信するメモリベースのストリームを使用します。 インメモリ ストレージ プロバイダーを使用して、サブスクリプションの一覧を格納します。 ストリーミングとストレージにメモリベースのメカニズムを使用することは、運用環境ではなく、ローカルの開発とテストのみを目的としています。

siloISiloBuilderであるサイロで、AddMemoryStreamsを呼び出します。

silo.AddMemoryStreams("StreamProvider")
    .AddMemoryGrainStorage("PubSubStore");

clientIClientBuilderであるクラスター クライアントで、AddMemoryStreamsを呼び出します。

client.AddMemoryStreams("StreamProvider");

このガイドでは、グレイン メッセージングを使用してストリーム データをサブスクライバーに送信する単純なメッセージ ベースのストリームを使用します。 インメモリ ストレージ プロバイダーを使用して、サブスクリプションの一覧を格納します。これは、実際の運用アプリケーションにとって賢明な選択ではありません。

hostBuilderISiloHostBuilderであるサイロで、AddSimpleMessageStreamProviderを呼び出します。

hostBuilder.AddSimpleMessageStreamProvider("SMSProvider")
           .AddMemoryGrainStorage("PubSubStore");

clientBuilderIClientBuilderであるクラスター クライアントで、AddSimpleMessageStreamProviderを呼び出します。

clientBuilder.AddSimpleMessageStreamProvider("SMSProvider");

既定では、Simple Message Stream 経由で渡されるメッセージは不変と見なされ、他のグレインへの参照によって渡される可能性があります。 この動作を無効にするには、SMS プロバイダーを構成して SimpleMessageStreamProviderOptions.OptimizeForImmutableDataをオフにします。

siloBuilder
    .AddSimpleMessageStreamProvider(
        "SMSProvider",
        options => options.OptimizeForImmutableData = false);

ストリームを作成し、プロデューサーとして使用してデータを送信し、サブスクライバーとしてデータを受信できます。

イベントを生成する

ストリームのイベントを生成するのは比較的簡単です。 まず、前に構成で定義されているストリーム プロバイダー ("StreamProvider") にアクセスし、ストリームを選択してデータをプッシュします。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("StreamProvider");
// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

ストリームのイベントを生成するのは比較的簡単です。 まず、前に構成で定義されているストリーム プロバイダー ("SMSProvider") にアクセスし、ストリームを選択してデータをプッシュします。

// Pick a GUID for a chat room grain and chat room stream
var guid = new Guid("some guid identifying the chat room");
// Get one of the providers which we defined in our config
var streamProvider = GetStreamProvider("SMSProvider");
// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

ご覧のように、ストリームには GUID と名前空間があります。 これにより、一意のストリームを簡単に識別できます。 たとえば、チャットルームの名前空間は「Rooms」で、GUIDは所有者RoomGrainのGUIDになることができます。

ここでは、既知のチャット ルームの GUID を使用します。 ストリームの OnNextAsync メソッドを使用して、データをストリームにプッシュします。 乱数を使用してタイマー内でこれを行いましょう。 ストリームには他のデータ型を使用することもできます。

RegisterTimer(_ =>
{
    return stream.OnNextAsync(Random.Shared.Next());
},
null,
TimeSpan.FromMilliseconds(1_000),
TimeSpan.FromMilliseconds(1_000));

ストリーミングデータを購読して受信する

データを受信する場合は、暗黙的サブスクリプションと明示的サブスクリプションを使用できます。詳細については、「 明示的サブスクリプションと暗黙的サブスクリプション」を参照してください。 この例では、より簡単な暗黙的なサブスクリプションを使用します。 グレイン型は、ストリームを暗黙的にサブスクライブする場合、属性 [ImplicitStreamSubscription(namespace)]を使用します。

ここでは、次のような ReceiverGrain を定義します。

[ImplicitStreamSubscription("RANDOMDATA")]
public class ReceiverGrain : Grain, IRandomReceiver

データが (タイマーの例のように) RANDOMDATA名前空間のストリームにプッシュされるたびに、ストリームと同じReceiverGrainを持つGuid型のグレインがメッセージを受信します。 現在、グレインのアクティブ化が存在しない場合でも、ランタイムは自動的に新しいグレインを作成し、メッセージを送信します。

これを機能させるには、データを受信するための OnNextAsync 方法を設定して、サブスクリプション プロセスを完了します。 これを行うには、 ReceiverGrainOnActivateAsyncで次のようなものを呼び出す必要があります。

// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("StreamProvider");

// Get the reference to a stream
var streamId = StreamId.Create("RANDOMDATA", guid);
var stream = streamProvider.GetStream<int>(streamId);

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });
// Create a GUID based on our GUID as a grain
var guid = this.GetPrimaryKey();

// Get one of the providers which we defined in config
var streamProvider = GetStreamProvider("SMSProvider");

// Get the reference to a stream
var stream = streamProvider.GetStream<int>(guid, "RANDOMDATA");

// Set our OnNext method to the lambda which simply prints the data.
// This doesn't make new subscriptions, because we are using implicit
// subscriptions via [ImplicitStreamSubscription].
await stream.SubscribeAsync<int>(
    async (data, token) =>
    {
        Console.WriteLine(data);
        await Task.CompletedTask;
    });

すべての設定が完了しました。 唯一の要件は、何かがプロデューサーグレインを生成するトリガーとなることです。 次に、タイマーを登録し、すべての関係者にランダムな整数の送信を開始します。

ここでも、このガイドでは多くの詳細をスキップし、概要のみを説明します。 このマニュアルの他の部分と Rx の他のリソースを読んで、利用可能な内容とそのしくみを十分に理解してください。

事後対応型プログラミングは、多くの問題を解決するための強力なアプローチです。 たとえば、サブスクライバーで LINQ を使用して数値をフィルター処理し、さまざまな興味深い操作を実行できます。

こちらも参照ください

Orleans ストリームプログラミング API