次の方法で共有


非同期エージェント

非同期エージェント ライブラリによるアクターベースのプログラミング

Michael Chu

サーバーからデスクトップやノート PC に至るまで、市場ではマルチコア プロセッサが当たり前のようになってきたため、コードの並列実行がかつてないほど重要になっています。この重要な分野に対処するため、Visual Studio 2010 では新しい並列ランタイムと新しい並列プログラミング モデルを用意し、C++ 開発者が利用できる新しい手段をいくつか導入しました。とはいえ、開発者には大きな課題が 1 つ残されています。それは、アプリケーションに適したプログラミング モデルを決めるという課題です。適切なモデルを採用すれば、そのモデルの基盤となる並列処理を最大限に活かすことができますが、プログラムの構造や実際の動作を考え直す必要が生じることもこともあります。

並列プログラミング モデルの中で現在幅広く普及しているモデルは、汎用の同時実行コンテナーと、ループ反復処理の並列化などのアルゴリズムを使用するものです。このような従来の手法は、マルチコア コンピューターを活用するようにアプリケーションをスケール変換するには効果的ですが、並列処理のパフォーマンスに影響を与える大きな要因の 1 つである、待機時間の影響の拡大には対処していません。並列化の技法によってコンピューター処理の速度が上昇し、このようなコンピューター処理が複数のコアに分散されるようになるにつれて、パフォーマンスの向上が実行速度の最も遅い部分の制約を受けることが、アムダールの法則 (http://ja.wikipedia.org/wiki/%E3%82%A2%E3%83%A0%E3%83%80%E3%83%BC%E3%83%AB%E3%81%AE%E6%B3%95%E5%89%87) によって示されます。多くの場合、ディスクやネットワークなどの I/O からデータを待機するのに費やされる時間の割合が増加します。

アクターベース プログラミング モデルは、こうした待機時間の問題を実に巧みに扱います。このモデルは、数百または数千の独立したプロセッサを搭載している高度な並列コンピューターのリソースを活用するために、1970 年代初頭に初めて導入されました。アクター モデルを支える基本的な考え方は、アプリケーションのコンポーネントをそれぞれ個別のアクターとして扱い、各アクターがメッセージの送信、受信、および処理を行うことによって、外部とやり取りできるようにすることです。

近年、マルチコア プロセッサが数多く使用されるようになったことから、並列処理を効率的に実行するために待機時間の影響を小さくする効果的な手法として、このアクター モデルが再浮上しています。Visual Studio 2010 では、Asynchronous Agents Library (AAL: 非同期エージェント ライブラリ) という、メッセージ受け渡しインターフェイスを備えた魅力的なアクターベース モデルを新たに導入します。このライブラリでは、エージェントがアクターの役割を果たします。AAL を使用すると、開発者は、データの流れを中心にしてアプリケーションを設計できます。データの流れを中心に設計すると、通常、データの待機時間を有効活用できます。

今回の記事では、AAL の概要を説明し、アプリケーションで AAL を利用する方法を例を挙げて示します。

同時実行ランタイム

Visual Studio 2010 と AAL における同時実行サポートの基盤となるのが新しい Concurrency Runtime (同時実行ランタイム) です。このランタイムは、Visual Studio 2010 の C ランタイム (CRT) に含まれています。同時実行ランタイムは、協調タスク スケジューラと、コンピューターの基盤となるリソースを詳細に把握するリソース マネージャーを提供します。そのため、マルチコア コンピューター全体に負荷分散する形式で複数のタスクを実行できます。

図 1 は、ネイティブ コードの同時実行に関する Visual Studio 2010 のサポートの概要を示しています。スケジューラは、タスクを実行する場所とタイミングを決定するメイン コンポーネントです。このコンポーネントは、リソース マネージャーが収集した情報を利用して、実行リソースを最適に活用します。アプリケーションとライブラリ自体は、主に AAL と Parallel Patterns Library (PPL: 並列パターン ライブラリ) という、スケジューラの上位に位置する 2 つのプログラミング モデルを通じて、同時実行ランタイムとやり取りしますが、ランタイム自体と直接やり取りすることもできます。

image: The Concurrency Runtime

図 1 同時実行ランタイム

PPL は、parallel_for コンストラクトと parallel_for_each コンストラクト、ランタイム対応ロック、同時実行のデータ構造 (キューやベクター) など、従来の並列化手法に比較的近いものを提供します。この記事のテーマではありませんが、PPL は AAL に導入されたすべての新しい手法と組み合わせて使用できる、強力な開発者向けツールです。PPL の詳細については、2009 年 2 月号の「Windows と C++」コラム (msdn.microsoft.com/magazine/dd434652) を参照してください。

一方、AAL は、従来の手法よりも高いレベルで、異なる観点からアプリケーションを並列化する機能を提供します。開発者は、処理対象のデータの観点からアプリケーションを捉える必要があり、データの処理を、並列に実行可能なコンポーネントやステージに分割する方法を検討する必要があります。

AAL には、メッセージ受け渡しフレームワークと非同期エージェントという、2 つのメイン コンポーネントがあります。

メッセージ受け渡しフレームワークには一連のメッセージ ブロックが用意されており、メッセージの受信、処理、および伝達を実行できます。複数のメッセージ ブロックを連鎖させることで、同時に実行できる作業のパイプラインを作成できます。

非同期エージェントは、メッセージを受信し、自身が管理する状態に基づいてローカル作業を実行し、メッセージを送信することで外部とやり取りするアクターです。

開発者は、これら 2 つのコンポーネントを使用して、並列処理を制御の流れではなく、データの流れの観点から考えることができ、並列処理のリソースをより効率的に利用して長い待機時間にも適切に対処できるようになります。

メッセージ受け渡しフレームワーク

AAL の 1 つ目の重要なコンポーネントがメッセージ受け渡しフレームワークです。これは、作業をパイプライン化するために、データフローのネットワークを開発するのに役立つコンストラクトのセットです。作業をパイプライン化することは、データフロー モデルに不可欠な要素です。パイプライン化することにより、作業が独立した複数のステージに分割され、データの用意ができればいつでも、ストリーミング データを並列処理できるようになります。あるステージのデータ処理が完了すると、そのステージから次のステージにデータを渡せるようになり、同時に最初のステージは処理対象の新しいデータを待機します。

たとえば、送信メッセージの書式を設定し、メッセージに不適切な内容が含まれているかどうか検閲する電子メール アプリケーションを考えてみましょう。このような処理のコードは次のようになります。

std::foreach(reader.begin(); reader.end(); 
  [](const string& word) { 
    auto w1 = censor(word); 
    auto w2 = format(w1); 
    writer.write_word(w2);
  });

電子メールの単語ごとに、アプリケーションは検閲対象単語の辞書にその単語が存在するかどうか確認し、存在していればその単語を置き換えます。続いて、一連のガイドラインに従って各単語の書式を設定します。

このようなシナリオには、本質的に並列可能な処理がたくさん含まれています。しかし、従来の並列処理手法ではこれに十分対応することができません。たとえば、簡単な方法として、検閲するテキストの文字列に parallel_for_each アルゴリズムを使用してから、文字列を書式設定するとします。

このソリューションの 1 つ目の主な課題は、反復子が作業を適切に分割できるように、ファイル全体を読み取る必要があることです。
ファイル全体を強制的に読み取ると、処理が I/O の制約を受け、並列化のメリットが少なくなる可能性があります。もちろん、単語の処理と入力の読み取りをオーバーラップさせるような、高度な反復子を使用することも考えられます。

従来の並列化手法の 2 つ目の主な課題は順序です。当然ながら、電子メール メッセージの場合、テキストの並列処理ではテキストの順序を維持しなければなりません。さもないと、支離滅裂なメッセージになってしまいます。parallel_for_each の手法を採用すると、テキストの順序を維持するために、同期とバッファーに関して大きなオーバーヘッドが発生します。AAL ではこのようなオーバーヘッドに自動的に対処されます。

メッセージをパイプラインで処理すると、並列化のメリットを活かしながら、この 2 つの課題を回避できます。では、単純なパイプラインを作成する例を考えてみましょう (図 2 参照)。この例では、アプリケーションの主要タスク (検閲と書式設定) は、2 つのステージとして分割されます。第 1 ステージでは、文字列を受け取り、検閲対象単語の辞書と照合します。検閲のブロックでは、単語が辞書にあれば、文字列を辞書に記載されている別の単語に置き換えます。辞書になければ、入力されたメッセージをそのまま出力します。同様に、第 2 ステージの書式設定ブロックでは、各単語を受け取りし、一定のスタイルに従って適切に書式設定します。

image: E-mail Processing Pipeline

図 2 電子メールを処理するパイプライン

この例では、データフロー手法からさまざまなメリットを得ることができます。まず、処理前にメッセージ全体を読み取る必要がなくなるため、メッセージに含まれる文字列の検閲ステージと書式設定ステージへのストリーミングをすぐに開始できます。次に、パイプライン処理により、ある文字列を書式設定ブロックで処理しながら、同時に次の文字列を検閲ブロックで処理することができます。最後に、元のテキストの順序で文字列を処理するため、追加で同期を実行する必要がありません。

メッセージ ブロック

メッセージ ブロックは、メッセージの受信、処理、格納、および伝達を行います。メッセージ ブロックは、ソース、ターゲット、および伝達子の 3 つの形式のいずれかになります。ソースにはメッセージを伝達する機能しかありませんが、ターゲットにはメッセージを受信、格納、および処理する機能があります。大部分のブロックは伝達子で、ソースとターゲットの両方の機能を備えています。つまり、伝達子には、メッセージの受信、格納、および処理を行う機能だけでなく、これらのメッセージの転送や送信の機能もあります。

AAL には、開発者が遭遇するユース ケースの大部分に対応する、一連のメッセージ ブロック プリミティブが用意されています。図 3 に、AAL に用意されているすべてのメッセージ ブロックの概要を示します。ただし、モデルは固定的ではないので、アプリケーションで特定の動作を備えたメッセージ ブロックが必要であれば、すべての定義済みブロックとやり取りできるカスタム ブロックを作成できます。各ブロックには、メッセージの処理、格納、および伝達に関して、それぞれ独特の特性があります。

図 3 AAL のメッセージ ブロック

メッセージ ブロック 用途
unbounded_buffer<Type> 数の制限なくメッセージを格納し、格納したメッセージをターゲットに伝達します。
overwrite_buffer<Type> 1 つのメッセージを格納し、新しいメッセージが伝達されるたびにこのメッセージを上書きします。このメッセージをターゲットにブロードキャストします。
single_assignment<Type> 1 つのメッセージを格納します。メッセージを書き込めるのは 1 回だけです。このメッセージをターゲットにブロードキャストします。
transformer<Input,Output> Input 型のメッセージを受け取り、ユーザーが指定した関数を実行してメッセージを Output 型に変換します。この変換したメッセージを、ターゲットに伝達します。
call<Type> メッセージを受け取り、そのメッセージのペイロードを引数としてユーザーが指定した関数を実行します。これが純粋なメッセージ ターゲットです。
timer<Type> ユーザーが定義した時間が経過すると、ターゲットにメッセージを伝達します。この処理は、繰り返して実行することも、一度だけ実行することもできます。このブロックが純粋なメッセージ ソースです。
choice<Type1,Type2,...> 複数の型の複数のソースからメッセージを受け取り、choice ブロックに伝達された最初のブロックからのメッセージだけを受け入れます。
join<Type> 複数のソースからメッセージを受け取り、受け取ったメッセージをまとめて 1 つのメッセージを出力します。各ソースの入力から、メッセージの準備が整うのを非同期に待機します。
multitype_join<Type1,Type2,...> 複数の型の複数のソースからメッセージを受け取り、受け取ったメッセージをまとめます。各ソースの入力から、メッセージの準備が整うのを非同期に待機します。

AAL に用意されているメッセージ ブロック プリミティブの主なメリットの 1 つは、これらのプリミティブが構成可能なことです。したがって、目的の動作に基づいてプリミティブを組み合わせることができます。たとえば、join ブロックの最後に transformer ブロックを接続すると、複数の入力をまとめるブロックを簡単に作成できます。join ブロックは、各ソースからのメッセージの取得に成功すると、メッセージを transformer ブロックに渡すことができ、transformer ブロックがメッセージ ペイロードを集計します。

また、join ブロックのソースとして繰り返し timer ブロックを接続することもできます。このように接続すると、メッセージを絞込み、timer ブロックからメッセージが送信されたときにだけメッセージを伝達するブロックを作成できます。この 2 つの構成可能なブロックを、図 4 に示します。

image: Composing Adder and Message Throttling Blocks from Primitives

図 4 プリミティブから組み立てた Adder ブロックと Message Throttling ブロック

メッセージ受け渡しパイプラインを作成する

では、先ほど説明したメッセージ ブロックのパイプラインを作成するコードについて見てみることにしましょう。パイプラインは、2 つの transformer ブロックに置き換えることができます (図 5 参照)。transformer ブロックの目的は、ある型のメッセージを受け取って、ユーザー定義関数をそのメッセージに対して実行することです。ユーザー定義関数では、メッセージのペイロードを変更したり、メッセージの型を完全に変更したりすることができます。たとえば、検閲ブロックでは文字列を含むメッセージを入力として受け取り、このメッセージを処理する必要があります。

image: A Message Block Pipeline

図 5 メッセージ ブロックのパイプライン

メッセージ ブロックを作成してメッセージ ブロックどうしを接続するコードを図 6 に示します。このコードでは、まず transformer メッセージ ブロックのインスタンスを 2 つ作成します。検閲 (censor) ブロックのコンストラクターの C++0x ラムダ パラメーターは、変換関数を定義しています。この関数は、メッセージに含まれている入力文字列を辞書と照合し、別の文字列に変更する必要があるかどうか確認します。結果の文字列が返されると、検閲ブロック内でメッセージにラップされ、ブロックの外部に伝達されます。書式設定 (format) の transformer ブロックでも同様の処理が行われますが、出力されるのは書式設定関数で変更された文字列です。

図 6 単純なメッセージ パイプライン

dictionary dict;

transformer<string, string> 
  censor([&dict](const string& s) -> string {

  string result = s;
  auto iter = dict.find(s);

  if (iter != dict.end()) {
    result =  iter->second;
  }

  return result;
});

transformer<string, string> 
  format([](const string& s) -> string {

  string result = s;
  for (string::size_type i = 0; i < s.size(); i++) {
    result[i] = (char)Format(s[i]);
  }

  return result;
});

censor.link_target(&format);

asend(&censor, "foo");
string newStr = receive(format);
printf("%s\n", newStr);

2 つのブロックのインスタンスを作成したら、次の行では censor ブロックの link_target メソッドを呼び出して、2 つのブロックを相互にリンクします。すべてのソース ブロックと伝達子ブロックには link_target メソッドがあり、ソースのメッセージの伝達先となるメッセージ ブロックを決定するのに使用します。

censor ブロックと format ブロックを相互にリンクすると、censor ブロックに伝達されたすべてのメッセージが変換関数で処理され、返されたメッセージが暗黙のうちに format ブロックに渡されて処理されます。メッセージ ブロックがソースまたは伝達子で、ターゲットが接続されていないと、ターゲットをリンクするかメッセージを取得するまで、メッセージをブロック固有の方法で格納できます。

コード例の最後の 3 行は、ブロック内にメッセージを送信して、ブロックの外部からメッセージを取得する処理を示しています。AAL には、send と asend という 2 つのメッセージ送信 API があります。これらの API では、それぞれ同期方式と非同期方式でメッセージをブロックに入力します。

2 つの API の主な違いは、send メソッドの場合、呼び出しから戻れば必ず、メッセージが送信先ブロックに送信済みになっていることです。asend メソッドの呼び出しはすぐに復帰し、同時実行ランタイムが伝達のスケジュールを設定できます。同様に、AAL には receive と try_receive という 2 つのメッセージ取得 API があります。receive メソッドはメッセージを受信するまでブロックされますが、try_receive メソッドはメッセージを取得できなければすぐに復帰します。

図 6 では、"foo" という文字列を censor ブロックに非同期に送信します。censor ブロックではこのメッセージを受け取り、検閲対象単語の辞書にメッセージ文字列が含まれているかどうか確認し、処理済の文字列をメッセージに含めて伝達します。このメッセージは format ブロックに渡され、format ブロックでは文字列を受け取って各文字を大文字にします。また、ターゲットが接続されていないため、メッセージを保持します。receive メソッドが呼び出されるときに、format ブロックからのメッセージを把握します。したがって、"foo" が辞書に含まれていないとすると、この例の出力は "FOO" になります。この例では 1 つの文字列をネットワーク経由で送信しているだけですが、入力文字列のストリームで処理の実行のパイプラインを作成する方法がわかります。

このメッセージングの例では、明らかにメッセージ自体については参照していないことがわかります。メッセージは、データフロー ネットワーク上で渡すデータをラップするためのエンベロープにすぎません。メッセージの受け渡し自体は、メッセージの提供と受け入れの処理によって行われます。メッセージ ブロックは、メッセージを受信すると、そのメッセージを任意の方法で格納できます。後でメッセージを送信する場合は、接続している各ターゲットにメッセージを提供できます。メッセージを実際に受け取るには、提供されたメッセージを受信側ブロックで受け入れて、トランザクションを完了する必要があります。ブロック間のメッセージ受け渡し処理全体は、タスクによってスケジュールが設定され、処理されます。このタスクは、同時実行ランタイムによってスケジュールが設定され、実行されます。

メッセージ ブロックの伝達

これで、メッセージ ブロックを作成して相互にリンクする方法と、各メッセージ ブロックでメッセージを送受信する方法がわかりました。では、次にブロック間でメッセージを受け渡す方法と、同時実行ランタイムが AAL の中核と如何に密接に結び付いているかを簡単に説明しましょう。

これは、メッセージ ブロックや AAL を使用する際に必須の知識ではありませんが、メッセージ受け渡しプロトコルのしくみと使用方法について理解を深めることができます。ここからは伝達子ブロックについて説明します。これは、伝達子ブロックがソースとターゲットを兼ねているためです。当然、純粋なソース ブロックやターゲット ブロックは伝達子ブロックの実装に含まれます。

各伝達子ブロックの内部には、メッセージの入力キューと、ブロック固有のメッセージ用ストレージ コンテナーがあります。この伝達子ブロックにリンクしている他のブロックからメッセージが送信され、このブロックの入力キューに格納されます。

たとえば、図 7 では、censor の transformer ブロックに、現在 str6 という文字列が格納されている入力キューがあります。transformer ブロック自体には、str4 と str5 という 2 つのメッセージが格納されています。このブロックは transformer なので、ブロック固有のストレージもキューになっています。ストレージ コンテナーは、ブロックの型によって異なることがあります。たとえば、overwrite_buffer ブロックではメッセージを 1 つだけ格納し、常に、このメッセージを上書きします。

image: Message-Passing Protocol

図 7 メッセージ受け渡しプロトコル

リンクしているいずれかのソース ブロック (または send API または asend API) からブロックにメッセージが提供されると、そのブロックではまずフィルター処理関数を確認して、メッセージを受け入れるかどうか判断します。メッセージを受け入れると決めたら、そのメッセージを入力キューに格納します。フィルター処理関数は、ターゲット ブロックや伝達子ブロックそれぞれのコンストラクターに渡せるオプションの関数で、ソース ブロックから提供されたメッセージを受け入れるかどうか決定するブール値を返します。メッセージを拒否すると、ソース ブロックから次のターゲット ブロックにメッセージが引き続き提供されます。

メッセージを入力キューに格納したら、メッセージを送信したソース ブロックではそのメッセージが保持されなくなります。ただし、受け入れ側ブロックではまだメッセージを伝達する準備が整っていません。そのため、処理を待機しているメッセージを入力キューにバッファリングできます。

メッセージ ブロックの入力キューにメッセージを格納すると、同時実行ランタイムのスケジューラ内で軽量タスク (LWT: LightWeight Task) のスケジュールが設定されます。この LWT の目的は 2 つあります。まず、メッセージを入力キューからブロックの内部ストレージに移動する必要があります (これをメッセージ処理と呼びます)。次に、任意のターゲット ブロックへのメッセージの伝達を試行することも必要です (これをメッセージ伝達と呼びます)。

たとえば、図 7 の場合、メッセージが入力キューに格納されており、このキューから LWT に対して、スケジュールを設定するよう通知しています。続いて、LWT がメッセージを処理します。この処理では、まず transformer のユーザー指定関数をメッセージに対して実行して、検閲対象文字列の辞書を確認し、次にメッセージをブロックのストレージ バッファーに移動します。

メッセージをストレージ バッファーに転送したら、LWT は伝達ステージを開始して、メッセージをターゲットの format ブロックに送信します。この例では、transformer ブロックの先頭に str4 というメッセージが格納されていたため、このメッセージを format ブロックに伝達してから次のメッセージ (str5) を伝達します。format ブロックでも、実行する手順はまったく同じです。

メッセージの処理は、メッセージ ブロックの型によって異なることがあります。たとえば、unbounded_buffer ブロックには、メッセージをストレージ バッファーに移動するだけの手順があります。transformer ブロックでは、メッセージに対してユーザー定義関数を呼び出して、メッセージを処理してからストレージ バッファーに移動します。他のブロックではさらに複雑な手順を実行することもあります。たとえば、join ブロックでは、さまざまなソースの複数のメッセージをまとめ、伝達に備えてバッファーに格納します。

パフォーマンスを向上するために、AAL では、メッセージ ブロックごとに LWT のスケジュールが一度に 1 つずつ設定されるよう適切に LWT を作成します。LWT の処理中に新しいメッセージを受信すると、AAL は受信したメッセージを続けて取得して処理します。したがって、図 7 では、str7 メッセージが入力キューに格納されたときに transformer ブロックの LWT がまだ処理中であれば、処理と伝達のタスクを新しく開始せずに、このメッセージを取得して処理します。

処理と伝達に対処する固有の LWT を各メッセージ ブロックに用意することは、設計上重要な考え方です。メッセージ受け渡しフレームワークがデータフロー方式で作業をパイプライン化できるのはこのためです。各メッセージ ブロックが独自の LWT でメッセージを処理して伝達するため、AAL ではブロックどうしを切り離すことができ、複数のブロック間で並列処理を実行できます。各 LWT はターゲット ブロックの入力キューにメッセージを伝達すればよく、各ターゲットではブロック独自の入力を処理する LWT のスケジュールを設定するだけです。処理と伝達に 1 つの LWT を使用することで、メッセージの順序がメッセージ ブロックで維持されます。

非同期エージェント

AAL の 2 つ目の主要コンポーネントは、非同期エージェントです。非同期エージェントは、大まかには、比較的大きなコンピューター処理タスクや I/O を非同期に扱うことを目的としたアプリケーション コンポーネントです。エージェントは他のエージェントと通信して、低レベルの並列処理を開始するものと考えられます。エージェントは、関連処理がそのクラス内ですべて完結するため独立しており、メッセージ受け渡しを使用することによって、他のアプリケーション コンポーネントと通信できます。エージェント自体のスケジュールは、同時実行ランタイム内でタスクとして設定されます。そのため、エージェントは、同時に実行されている他の処理と協調しながら、他の処理をブロックしたり、処理を明け渡したりすることができます。

非同期エージェントには、一連のライフサイクルがあります (図 8 参照)。このライフサイクルは、監視や待機が可能です。 開発者は、基本エージェント クラスから派生することで独自のエージェントを作成できます。

image: The Asynchronous Agent Lifecycle

図 8 非同期エージェントのライフサイクル

3 つの基本クラス関数 (start、cancel、および done) によって、エージェントはさまざまな状態に遷移します。エージェントを作成した時点では、エージェントは created 状態です。エージェントを開始することは、スレッドを開始することに似ています。エージェントの start メソッドを呼び出すまで、そのエージェントでは何も実行されません。start メソッドを呼び出すと、エージェントの実行スケジュールが設定され、runnable 状態に移行します。

同時実行ランタイムがそのエージェントを選択すると、エージェントは started 状態に移行し、ユーザーが done メソッドを呼び出して処理が完了したことを指定するまで実行を続けます。エージェントのスケジュールが設定されていてもまだ開始されていないときに cancel メソッドを呼び出すと、エージェントは canceled 状態に遷移し、それ以降は実行されなくなります。

もう一度、電子メールをフィルター処理する例について考えてみましょう。この例では、パイプライン化されたメッセージ ブロックによって、アプリケーションにデータフローが導入され、単語の並列処理機能が強化されています。しかし、電子メール自体を処理してパイプラインで処理可能な文字列のストリームに分割する際の、I/O の処理方法は示していませんでした。また、パイプラインに文字列を渡したら、検閲と書式設定が完了した状態で、新たにテキストを再度書き込めるように、その文字列を集める必要もあります。このような場合、I/O に伴う待機時間の差を調整するためにエージェントが活躍します。

たとえば、電子メールを処理するパイプラインの最後を考えてみましょう。この時点では、format ブロックから出力された文字列を、メールボックス内のファイルに書き込む必要があります。図 9 は、出力エージェントで文字列をキャプチャして、出力する電子メール メッセージを作成する方法を示しています。WriterAgent の run 関数が、format ブロックからのメッセージをループして受け取ります。

image: An Agent Capturing the Output of the Format Block

図 9 format ブロックの出力をキャプチャするエージェント

このアプリケーションで実行するほとんどの処理ではデータフローを使用していますが、WriterAgent では、このアプリケーションに制御フローを導入する方法を示しています。たとえば、ファイルの終わりを示すメッセージを受信すると、WriterAgent は受信した入力文字列に応じて異なる動作を行う必要があります。つまり、操作の終了を認識する必要があります。WriterAgent のコードを図 10 に示します。

図 10 WriterAgent

class WriterAgent : public agent {
public:
  WriterAgent(ISource<string> * src) : m_source(src) {
  }

  ~WriterAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;
    fopen_s( &stream, ... );

    string s;
    string eof("EOF");

    while (!feof(stream) && ((s=receive(m_source)) != eof)) {
      write_string(stream, s);
    }

    fclose(stream);
    done();
  }

private:

  ISource<string> * m_source;
};

このコードには、注意すべき興味深い箇所がいくつかあります。まず、デストラクター内で、agent::wait 静的関数を呼び出しています。この関数は、任意のエージェントへのポインターを指定して呼び出すことができ、エージェントがいずれかの終了状態 (done または canceled) になるまでそのエージェントをブロックします。デストラクター内で wait 関数を呼び出す必要がないエージェントもありますが、デストラクターの実行後にはコードがまったく実行されなくなるため、ほとんどの場合はこの関数を呼び出すことをお勧めします。

次に興味深い点は、run メソッドそのものです。run メソッドでは、エージェントの主な処理を定義します。このコードでは、エージェントはソース (今回の例では format ブロック) から読み取った文字列を書き込んでいます。

最後に、run メソッドの最後の行に注目してください。ここでは、done というエージェント関数を呼び出しています。done メソッドを呼び出すと、エージェントが実行中状態から done 状態に移行します。ほとんどの場合は、run メソッドの最後でこのメソッドを呼び出す必要があります。ただし、状況によっては、アプリケーションでエージェントを使用して、run メソッドの有効期間終了後も有効状態を設定することもあります (データフロー ネットワークの場合など)。

すべてをまとめる

これで、文字列をフィルター処理して書式設定するメッセージング パイプラインと、結果の文字列を処理する出力エージェントを作成したので、出力エージェントによく似た動作の入力エージェントも追加できます。図 11 は、このアプリケーションの組み合わせ方の例を示しています。

image: Agents Used to Process E-mail Messages

図 11 電子メール メッセージの処理に使用するエージェント

エージェント処理のメリットの 1 つは、アプリケーションで非同期アクターを使用できることです。したがって、処理するデータを受け取ると、入力エージェントではパイプライン経由での文字列の送信を非同期に開始し、出力エージェントでも同様に非同期に文字列を読み取ってファイルに出力できます。これらのアクターは、完全に独立した状態で処理を開始および停止する、全面的にデータ駆動型の処理です。このような動作は多くのシナリオで役に立ちます。特に、電子メールを処理する例など、待機時間の影響を受ける非同期 I/O に効果を発揮します。

今回の例では、ReaderAgent という 2 つ目のエージェントを追加しました。ReaderAgent の機能は、電子メールを読み取って文字列をネットワークに送信するために I/O を制御する点を除いて WriterAgent と同様です。ReaderAgent のコードを 図 12 に示します。

図 12 ReaderAgent

class ReaderAgent : public agent {
public:
  ReaderAgent(ITarget<string> * target) : m_target(target) {
  }

  ~ReaderAgent() {
    agent::wait(this);
  }

  virtual void run() {
    FILE *stream;       
    fopen_s( &stream, ...);

    while (!feof(stream)) {
      asend(m_target, read_word(stream));
    }

    fclose( stream );

    asend(m_target, string("eof"));
    done();
  }

private:

  ITarget<string> * m_target;
};

プログラムの I/O を非同期に制御する ReaderAgent と WriterAgent の両方を作成したら、後は 2 つのエージェントをネットワークの transformer ブロックにリンクして、処理を開始するだけです。この処理は、2 つのブロックをリンクしたら、次のように簡単に実行できます。

censor.link_target(&format);

ReaderAgent r(&censor);
r.start();

WriterAgent w(&format);
w.start();

censor ブロックへの参照を指定して ReaderAgent を作成し、ReaderAgent から censor ブロックにメッセージを正しく送信できるようにします。同様に、format ブロックへの参照を指定して WriterAgent を作成し、format ブロックからメッセージを取得できるようにします。どちらのエージェントも、そのエージェントの start API で開始します。この API は、同時実行ランタイム内でエージェントの実行スケジュールを設定します。どちらのエージェントでもデストラクターで agent::wait(this) を呼び出しているため、両方のエージェントが done 状態になるまでデストラクターの実行が待機されます。

まとめ

今回の記事では、Visual Studio 2010 に組み込まれているアクターベース プログラミングとデータフローのパイプライン化の新しい可能性をいくつか簡単に紹介しました。ぜひご自身で試してみてください。

興味をお持ちの方のために説明すると、ここでは触れることができなかった機能が他にも多数あります (カスタム メッセージ ブロックの作成、メッセージのフィルター処理など)。この魅力的な新しいプログラミング モデルを使用してプログラムを斬新な方法で並列化する方法の詳細とチュートリアルについては、MSDN の並列コンピューティング デベロッパー センター (msdn.microsoft.com/concurrency、英語) を参照してください。

Michael Chu は、マイクロソフトの並列コンピューティング プラットフォーム グループのソフトウェア開発エンジニアであり、同時実行ランタイム チームに所属しています。

Krishnan Varadarajan は、マイクロソフトの並列コンピューティング プラットフォーム グループのソフトウェア開発エンジニアであり、同時実行ランタイム チームに所属しています。

この記事のレビューに協力してくれた同時実行ランタイム チームの技術スタッフに心より感謝いたします。