非同期エージェント ライブラリ
非同期エージェント ライブラリ (単にエージェント ライブラリとも呼ばれます) は、コンカレンシー対応のアプリケーション開発の保全性を向上できるプログラミング モデルを提供します。 エージェント ライブラリは、アクターベースのプログラミング モデルと、粒度の粗いデータ フローおよびパイプライン処理タスクのためのインプロセス メッセージ パッシングを推進する C++ テンプレート ライブラリです。 エージェント ライブラリは、コンカレンシー ランタイムのスケジューリング コンポーネントとリソース管理コンポーネントに基づいています。
プログラミング モデル
エージェント ライブラリは、制御フローではなくデータ フローに基づく非同期通信モデルを使用して分離コンポーネントを接続できるようにすることで、共有状態に代わる手段を提供します。 データ フローは、必要なすべてのデータを使用できるときに計算が行われるプログラミング モデルです。制御フローは、あらかじめ決められた順序で計算が行われるプログラミング モデルです。
データ フロー プログラミング モデルは、プログラムの独立したコンポーネントがメッセージの送信によって相互に通信する、メッセージ パッシングの概念に関連しています。
エージェント ライブラリは、非同期エージェント、非同期メッセージ ブロック、およびメッセージ パッシング関数の 3 つのコンポーネントで構成されています。 エージェントは状態を保持し、メッセージ ブロックとメッセージ パッシング関数を使用して相互の通信および外部コンポーネントとの通信を行います。 メッセージ パッシング関数は、エージェントが外部コンポーネントとの間でメッセージを送受信できるようにします。 非同期メッセージ ブロックは、メッセージを保持し、エージェントが同期的に通信を行うことができるようにします。
次の図は、2 つのエージェントがメッセージ ブロックとメッセージ パッシング関数を使用して通信する方法を示しています。 この図では、agent1
は concurrency::send 関数と concurrency::unbounded_buffer オブジェクトを使用して agent2
にメッセージを送信します。 agent2
は、concurrency::receive 関数を使用してメッセージを読み取ります。 agent2
は、同じメソッドを使用して agent1
にメッセージを送信します。 破線の矢印は、エージェント間のデータの流れを表しています。 実線の矢印は、エージェントと、それが読み書きを行う対象のメッセージ ブロックを結んでいます。
この図を実装するコード例については、このトピックで後述します。
エージェント プログラミング モデルには、他のコンカレンシー機構および同期機構 (イベントなど) に比べていくつかの利点があります。 利点の 1 つは、メッセージ パッシングを使用してオブジェクト間で状態の変更を送信することで共有リソースへのアクセスを分離でき、スケーラビリティを向上できるという点です。 メッセージ パッシングの利点は、同期を外部同期オブジェクトに結び付けるのではなく、データに結び付けるという点にあります。 これにより、コンポーネント間のデータ伝送が簡略化され、アプリケーションのプログラミング エラーを防ぐことができます。
エージェント ライブラリを使用する場合
エージェント ライブラリは、相互に非同期通信を行う必要がある複数の操作がある場合に使用します。 メッセージ ブロックとメッセージ パッシング関数を使用すると、ロックなどの同期機構を必要とせずに並行アプリケーションを作成できます。 これにより、アプリケーション ロジックに集中することができます。
エージェント プログラミング モデルは、データ パイプラインまたはネットワークを作成する場合によく使用されます。 データ パイプラインは一連のコンポーネントで、その各コンポーネントは、より大きな目標を達成するための特定のタスクを実行します。 データフロー パイプラインのすべてのコンポーネントは、他のコンポーネントからメッセージを受け取ったときに処理を実行します。 その処理の結果は、パイプラインまたはネットワーク内の別のコンポーネントに渡されます。 コンポーネントは、 などの他のライブラリの、より粒度の細かいコンカレンシー機能を使用できます (例: 平行パターン ライブラリ (PPL))。
例
前述した図を実装する例を次に示します。
// basic-agents.cpp
// compile with: /EHsc
#include <agents.h>
#include <string>
#include <iostream>
#include <sstream>
using namespace concurrency;
using namespace std;
// This agent writes a string to its target and reads an integer
// from its source.
class agent1 : public agent
{
public:
explicit agent1(ISource<int>& source, ITarget<wstring>& target)
: _source(source)
, _target(target)
{
}
protected:
void run()
{
// Send the request.
wstringstream ss;
ss << L"agent1: sending request..." << endl;
wcout << ss.str();
send(_target, wstring(L"request"));
// Read the response.
int response = receive(_source);
ss = wstringstream();
ss << L"agent1: received '" << response << L"'." << endl;
wcout << ss.str();
// Move the agent to the finished state.
done();
}
private:
ISource<int>& _source;
ITarget<wstring>& _target;
};
// This agent reads a string to its source and then writes an integer
// to its target.
class agent2 : public agent
{
public:
explicit agent2(ISource<wstring>& source, ITarget<int>& target)
: _source(source)
, _target(target)
{
}
protected:
void run()
{
// Read the request.
wstring request = receive(_source);
wstringstream ss;
ss << L"agent2: received '" << request << L"'." << endl;
wcout << ss.str();
// Send the response.
ss = wstringstream();
ss << L"agent2: sending response..." << endl;
wcout << ss.str();
send(_target, 42);
// Move the agent to the finished state.
done();
}
private:
ISource<wstring>& _source;
ITarget<int>& _target;
};
int wmain()
{
// Step 1: Create two message buffers to serve as communication channels
// between the agents.
// The first agent writes messages to this buffer; the second
// agents reads messages from this buffer.
unbounded_buffer<wstring> buffer1;
// The first agent reads messages from this buffer; the second
// agents writes messages to this buffer.
overwrite_buffer<int> buffer2;
// Step 2: Create the agents.
agent1 first_agent(buffer2, buffer1);
agent2 second_agent(buffer1, buffer2);
// Step 3: Start the agents. The runtime calls the run method on
// each agent.
first_agent.start();
second_agent.start();
// Step 4: Wait for both agents to finish.
agent::wait(&first_agent);
agent::wait(&second_agent);
}
この例を実行すると、次の出力が生成されます。
agent1: sending request...
agent2: received 'request'.
agent2: sending response...
agent1: received '42'.
この例で使用されている機能の詳細については、次のトピックを参照してください。
関連項目
非同期エージェント
大規模なコンピューティング タスクの解決における非同期エージェントの役割について説明します。
非同期メッセージ ブロック
エージェント ライブラリに用意されているさまざまなメッセージ ブロックの型について説明します。
メッセージ パッシング関数
エージェント ライブラリに用意されているさまざまなメッセージ パッシング ルーチンについて説明します。
方法: さまざまなプロデューサー/コンシューマー パターンを実装する
アプリケーションにプロデューサー/コンシューマー パターンを実装する方法について説明します。
方法: call クラスおよび transformer クラスに処理関数を提供する
concurrency::call クラスおよび concurrency::transformer クラスに処理関数を提供するいくつかの方法について説明します。
方法: データ パイプラインでトランスフォーマーを使用する
データ パイプラインで concurrency::transformer クラスを使用する方法について説明します。
方法: 完了したタスクから選択する
concurrency::choice クラスおよび concurrency::join クラスを使用して、最初のタスクを選択して検索アルゴリズムを完了する方法について説明します。
方法: メッセージを定期的に送信する
concurrency::timer クラスを使用してメッセージを定期的に送信する方法について説明します。
方法: メッセージ ブロック フィルターを使用する
フィルターを使用して、非同期メッセージ ブロックでメッセージの受け入れや拒否が行われるようにする方法について説明します。
並列パターン ライブラリ (PPL)
アプリケーションで各種の並列パターン (並列アルゴリズムなど) を使用する方法について説明します。
コンカレンシー ランタイム
並列プログラミングを容易にするコンカレンシー ランタイムについて説明します。また、関連トピックへのリンクを示します。