チュートリアル: カスタム メッセージ ブロックの作成
ここでは、受信メッセージを優先順位に従って並べるカスタム メッセージ ブロックの型を作成する方法について説明します。
組み込みのメッセージ ブロックの型には幅広い機能が備わっていますが、独自のメッセージ ブロックの型を作成して、アプリケーションの要件を満たすようにカスタマイズすることもできます。 非同期エージェント ライブラリに用意されている組み込みのメッセージ ブロックの型については、「非同期メッセージ ブロック」を参照してください。
必須コンポーネント
このチュートリアルを開始する前に、次のドキュメントを参照してください。
セクション
このチュートリアルは、次のセクションで構成されています。
カスタム メッセージ ブロックのデザイン
priority_buffer クラスの定義
完全な例
カスタム メッセージ ブロックのデザイン
メッセージ ブロックは、メッセージの送受信処理に参加します。 メッセージを送信するメッセージ ブロックはソース ブロックと呼ばれます。 メッセージを受信するメッセージ ブロックはターゲット ブロックと呼ばれます。 メッセージを送受信するメッセージ ブロックは伝達子ブロックと呼ばれます。 エージェント ライブラリは、concurrency::ISource 抽象クラスを使用してソース ブロックを表し、concurrency::ITarget 抽象クラスを使用してターゲット ブロックを表します。 ソースとして機能するメッセージ ブロックの型は ISource から派生します。ターゲットとして機能するメッセージ ブロックの型は ITarget から派生します。
メッセージ ブロックの型は ISource および ITarget から直接派生させることもできますが、エージェント ライブラリには、メッセージ ブロックのすべての型に共通の大部分の機能を実行する 3 つの基底クラスが定義されています。これらの基底クラスによって、エラーの処理やメッセージ ブロックの接続などの操作が同時実行セーフに行われます。 concurrency::source_block クラスは ISource から派生し、メッセージを他のブロックに送信します。 concurrency::target_block クラスは ITarget から派生し、他のブロックからメッセージを受信します。 concurrency::propagator_block クラスは ISource および ITarget から派生し、他のブロックとの間でメッセージを送受信します。 メッセージ ブロックの動作に焦点を合わせることができるように、インフラストラクチャの細部の処理にはこれらの 3 つの基底クラスを使用することをお勧めします。
source_block、target_block、および propagator_block の各クラスはテンプレートであり、ソース ブロックとターゲット ブロック間の接続 (リンク) を管理する型、およびメッセージの処理方法を管理する型でパラメーター化されます。 エージェント ライブラリには、リンクの管理を行う 2 つの型 concurrency::single_link_registry および concurrency::multi_link_registry が定義されています。 single_link_registry クラスは、メッセージ ブロックを 1 つのソースまたは 1 つのターゲットにリンクできるようにします。 multi_link_registry クラスは、メッセージ ブロックを複数のソースまたは複数のターゲットにリンクできるようにします。 エージェント ライブラリには、メッセージの管理を行う 1 つのクラス concurrency::ordered_message_processor が定義されています。 ordered_message_processor クラスは、メッセージ ブロックでメッセージを受信順に処理できるようにします。
メッセージ ブロックとソースおよびターゲットとの相互関係をより深く理解できるように、次の例を考えてみてください。 この例は、concurrency::transformer クラスの宣言を示しています。
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer クラスは propagator_block から派生するため、ソース ブロックとしてもターゲット ブロックとしても機能します。 _Input 型のメッセージを受け入れ、_Output 型のメッセージを送信します。 transformer クラスは、single_link_registry をターゲット ブロックのリンク マネージャーとして指定し、multi_link_registry をソース ブロックのリンク マネージャーとして指定します。 したがって、transformer オブジェクトで許容されるターゲットは 1 つだけですが、ソースの数に制限はありません。
source_block から派生するクラスは、propagate_to_any_targets、accept_message、reserve_message、consume_message、release_message、および resume_propagation の 6 つのメソッドを実装する必要があります。 target_block から派生するクラスは、propagate_message メソッドを実装する必要があり、必要に応じて send_message メソッドを実装できます。 propagator_block からの派生は、source_block および target_block からの派生と機能的には同等です。
propagate_to_any_targets メソッドは、受信メッセージを非同期的または同期的に処理し、送信メッセージを伝達するためにランタイムによって呼び出されます。 accept_message メソッドは、メッセージを受け入れるためにターゲット ブロックによって呼び出されます。 unbounded_buffer などのメッセージ ブロックの型の多くは、最初にメッセージを受信するターゲットにのみメッセージを送信します。 したがって、メッセージの所有権はそのターゲットに譲渡されます。 concurrency::overwrite_buffer などの他のメッセージ ブロックの型は、メッセージを各ターゲット ブロックに提供します。 したがって、overwrite_buffer は各ターゲット用にメッセージのコピーを作成します。
reserve_message、consume_message、release_message、および resume_propagation メソッドは、メッセージ ブロックがメッセージの予約に参加できるようにします。 ターゲット ブロックは、提供されたメッセージを予約して後で使用できるようにする場合に、reserve_message メソッドを呼び出します。 メッセージを受信したターゲット ブロックは、consume_message メソッドを呼び出してそのメッセージを処理するか、release_message メソッドを呼び出して予約を取り消すことができます。 accept_message メソッドと同様に、consume_message の実装では、メッセージの所有権を譲渡するか、メッセージのコピーを返すことができます。 ターゲット ブロックが予約済みのメッセージを処理または解放すると、ランタイムは resume_propagation メソッドを呼び出します。 通常、このメソッドは、キュー内の次のメッセージからメッセージ伝達を続行します。
ランタイムは、propagate_message メソッドを呼び出して、別のブロックから現在のブロックにメッセージを非同期的に転送します。 send_message メソッドは、非同期的にではなく同期的にメッセージをターゲット ブロックに送信する点を除いて、propagate_message と似ています。 send_message の既定の実装では、すべての受信メッセージが拒否されます。 ターゲット ブロックに関連付けられているオプションのフィルター関数をメッセージが通過しない場合、ランタイムはどちらのメソッドも呼び出しません。 メッセージ フィルターの詳細については、「非同期メッセージ ブロック」を参照してください。
[トップ]
priority_buffer クラスの定義
priority_buffer クラスは、受信メッセージを優先順位に従って並べてから、メッセージの受信順に並べるカスタム メッセージ ブロックの型です。 priority_buffer クラスは、メッセージのキューを保持するという点と、ソース メッセージ ブロックとしてもターゲット メッセージ ブロックとしても機能し、どちらの場合も複数のソースおよび複数のターゲットが許容されるという点で、concurrency::unbounded_buffer クラスと似ています。 ただし、unbounded_buffer でのメッセージの伝達順は、必ずソースからのメッセージの受信順になります。
priority_buffer クラスは、PriorityType 要素と Type 要素を含む std::tuple 型のメッセージを受信します。 PriorityType は各メッセージの優先順位を保持する型を表し、Type はメッセージのデータ部分を表します。 priority_buffer クラスは、Type 型のメッセージを送信します。 priority_buffer クラスは、受信メッセージ用の std::priority_queue オブジェクトと送信メッセージ用の std::queue オブジェクトの 2 つのメッセージ キューの管理も行います。 priority_buffer オブジェクトが複数のメッセージを同時に受信する場合、またはコンシューマーがまだメッセージを読み取っていないときに複数のメッセージを受信する場合、メッセージを優先順位に従って並べ替えると便利です。
priority_buffer クラスでは、propagator_block の派生クラスで実装する必要のある 7 つのメソッドに加えて、link_target_notification メソッドと send_message メソッドもオーバーライドします。 priority_buffer クラスでは、2 つのパブリック ヘルパー メソッド (enqueue および dequeue) と 1 つのプライベート ヘルパー メソッド (propagate_priority_order) も定義します。
次の手順では、priority_buffer クラスを実装する方法について説明します。
priority_buffer クラスを定義するには
C++ ヘッダー ファイルを作成し、priority_buffer.h という名前を付けます。 または、プロジェクトに含まれる既存のヘッダー ファイルを使用することもできます。
priority_buffer.h に次のコードを追加します。
#pragma once #include <agents.h> #include <queue>
std 名前空間で、std::less および std::greater の特殊化を定義して、concurrency::message オブジェクトで動作するようにします。
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer クラスは、priority_queue オブジェクトに message オブジェクトを格納します。 このような型の特殊化によって、優先順位キューでメッセージが優先順位に従って並べ替えられるようになります。 優先順位は、tuple オブジェクトの最初の要素です。
concurrencyex 名前空間で、priority_buffer クラスを宣言します。
namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer クラスは propagator_block から派生したものです。 したがって、メッセージを送信することも受信することもできます。 priority_buffer クラスは、Type 型のメッセージを受信する複数のターゲットを持つことができます。 さらに、tuple<PriorityType, Type> 型のメッセージを送信する複数のソースを持つことができます。
priority_buffer クラスの private セクションに、次のメンバー変数を追加します。
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
priority_queue オブジェクトは受信メッセージを保持し、queue オブジェクトは送信メッセージを保持します。 priority_buffer オブジェクトは、複数のメッセージを同時に受信できます。critical_section オブジェクトは、入力メッセージのキューへのアクセスを同期します。
private セクションで、コピー コンストラクターと代入演算子を定義します。 これにより、priority_queue オブジェクトが割り当て可能になるのを防ぎます。
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
public セクションで、多くのメッセージ ブロックの型に共通のコンストラクターを定義します。 デストラクターも定義します。
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
public セクションで、enqueue メソッドと dequeue メソッドを定義します。 これらのヘルパー メソッドによって、priority_buffer オブジェクトとの間でメッセージを送受信する別の手段が提供されます。
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
protected セクションで、propagate_to_any_targets メソッドを定義します。
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
propagate_to_any_targets メソッドは、入力キューの先頭にあるメッセージを出力キューに転送し、出力キュー内のすべてのメッセージを伝達します。
protected セクションで、accept_message メソッドを定義します。
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
ターゲット ブロックから accept_message メソッドが呼び出されたとき、priority_buffer クラスは、メッセージを最初に受け入れるターゲット ブロックにそのメッセージの所有権を譲渡します (この動作は unbounded_buffer の動作と似ています)。
protected セクションで、reserve_message メソッドを定義します。
// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
priority_buffer クラスは、指定されたメッセージ識別子とキューの先頭にあるメッセージの識別子が一致する場合に、ターゲット ブロックでメッセージを予約できるようにします。 つまり、priority_buffer オブジェクトがまだ追加のメッセージを受信しておらず、現在のメッセージも伝達していない場合、ターゲットはメッセージを予約できます。
protected セクションで、consume_message メソッドを定義します。
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
ターゲット ブロックは、予約したメッセージの所有権を譲渡するために consume_message を呼び出します。
protected セクションで、release_message メソッドを定義します。
// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
ターゲット ブロックは、メッセージの予約を取り消すために release_message を呼び出します。
protected セクションで、resume_propagation メソッドを定義します。
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
ターゲット ブロックが予約済みのメッセージを処理または解放すると、ランタイムは resume_propagation を呼び出します。 このメソッドは、出力キュー内のすべてのメッセージを伝達します。
protected セクションで、link_target_notification メソッドを定義します。
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor メンバー変数は、source_block 基底クラスによって定義されます。 このメンバー変数は、出力キューの先頭にあるメッセージの予約を保持しているターゲット ブロック (存在する場合) を指します。 新しいターゲットが priority_buffer オブジェクトにリンクされると、ランタイムは link_target_notification を呼び出します。 このメソッドは、ターゲットが予約を保持していない場合に、出力キュー内のすべてのメッセージを伝達します。
private セクションで、propagate_priority_order メソッドを定義します。
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
このメソッドは、出力キュー内のすべてのメッセージを伝達します。 ターゲット ブロックの 1 つがメッセージを受け入れるまで、キュー内の各メッセージが各ターゲット ブロックに提供されます。 priority_buffer クラスは、送信メッセージの順序を保持しています。 そこで、このメソッドでは、出力キュー内の最初のメッセージがいずれかのターゲット ブロックによって受け入れられるまで、他のメッセージをターゲット ブロックに提供しないようにします。
protected セクションで、propagate_message メソッドを定義します。
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message メソッドは、priority_buffer クラスがメッセージの受信側 (つまりターゲット) として機能するようにします。 このメソッドは、指定されたソース ブロックから提供されたメッセージを受信し、そのメッセージを優先順位キューに挿入します。 その後、propagate_message メソッドは、すべての出力メッセージをターゲット ブロックに非同期的に送信します。
ランタイムは、concurrency::asend 関数が呼び出されたとき、またはメッセージ ブロックが他のメッセージ ブロックに接続されたときに、このメソッドを呼び出します。
protected セクションで、send_message メソッドを定義します。
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
send_message メソッドは propagate_message と似ています。 ただし、このメソッドは、非同期的にではなく同期的に出力メッセージを送信します。
ランタイムは、concurrency::send 関数が呼び出された場合など、同期送信操作中にこのメソッドを呼び出します。
priority_buffer クラスには、多くのメッセージ ブロックの型に共通のコンストラクター オーバーロードが含まれています。 一部のコンストラクター オーバーロードは、concurrency::Scheduler オブジェクトまたは concurrency::ScheduleGroup オブジェクトを受け取ります。これらのオブジェクトを使用すると、特定のタスク スケジューラでメッセージ ブロックを管理できます。 フィルター関数を受け取るコンストラクター オーバーロードもあります。 フィルター関数を使用すると、メッセージ ブロックでのメッセージの受け入れまたは拒否をメッセージ ペイロードに基づいて行うことができます。 メッセージ フィルターの詳細については、「非同期メッセージ ブロック」を参照してください。 タスク スケジューラの詳細については、「タスク スケジューラ (同時実行ランタイム)」を参照してください。
priority_buffer クラスはメッセージを優先順位に従って並べてから受信順に並べるため、concurrency::asend 関数を呼び出した場合や、メッセージ ブロックが他のメッセージ ブロックに接続された場合など、メッセージを非同期的に受信する場合に非常に役立ちます。
[トップ]
完全な例
次の例では、priority_buffer クラスの完全な定義を示します。
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
次の例では、priority_buffer オブジェクトに対して多くの asend 操作と concurrency::receive 操作を同時に実行します。
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
この例では、次のサンプル出力が生成されます。
priority_buffer クラスは、メッセージを優先順位に従って並べてから、メッセージの受信順に並べます。 この例では、優先順位を示す数値が大きいメッセージほど、キューの先頭近くに挿入されています。
[トップ]
コードのコンパイル
プログラム例をコピーし、それを Visual Studio プロジェクトに貼り付けるか、priority_buffer クラスの定義を priority_buffer.h という名前のファイルに、テスト プログラムを priority_buffer.cpp という名前のファイルにそれぞれ貼り付け、Visual Studio のコマンド プロンプト ウィンドウで次のコマンドを実行します。
cl.exe /EHsc priority_buffer.cpp