逐步解說:建立自訂訊息區
本文件說明如何建立可依優先權排序傳入訊息的自訂訊息區塊類型。
雖然內建訊息區塊類型提供各種不同的功能,但您可以建立自己的訊息區塊類型,並予以自訂以符合應用程式需求。 如需非同步代理程式程式庫所提供之內建訊息區塊類型的說明,請參閱非同步訊息區。
必要條件
在您開始閱讀此逐步解說前,請先參閱下列文件:
章節
此逐步解說包含下列章節:
設計自訂訊息區塊
定義 priority_buffer 類別
完整的範例
設計自訂訊息區塊
訊息區塊參與傳送及接收訊息的動作。 傳送訊息的訊息區塊稱為「來源區塊」(Source Block)。 接收訊息的訊息區塊稱為「目標區塊」(Target Block)。 傳送及接收訊息的訊息區塊稱為「傳播區塊」(Propagator Block)。 代理程式程式庫使用抽象類別 concurrency::ISource 來代表來源區塊,使用抽象類別 concurrency::ITarget 來代表目標區塊。 當做來源的訊息區類型會衍生自 ISource,而當做目標的訊息區類型則衍生自 ITarget。
您可以從 ISource 和 ITarget 直接衍生訊息區塊類型,但是代理程式程式庫定義了三個基底類別,可執行所有訊息區塊類型通用的大部分功能,例如處理錯誤和以並行安全的方式將訊息區塊連接起來。 concurrency::source_block 類別衍生自 ISource,會將訊息傳送至其他區塊。 concurrency::target_block 類別衍生自 ITarget,會從其他區塊接收訊息。 concurrency::propagator_block 類別衍生自 ISource 和ITarget,會將訊息傳送至其他區塊並從其他區塊接收訊息。 建議您使用這三個基底類別來處理基礎結構詳細資料,以專注於訊息區塊的行為。
source_block、target_block 和 propagator_block 類別是範本,根據管理來源和目標區塊之間連接 (或連結) 的型別,以及管理訊息處理方式的型別而參數化。 代理程式程式庫定義了兩個可執行連結管理的型別:concurrency::single_link_registry 和 concurrency::multi_link_registry。 single_link_registry 類別可讓訊息區塊連結至一個來源或一個目標。 multi_link_registry 類別可讓訊息區塊連結至多個來源或多個目標。 代理程式程式庫定義了一個可執行訊息管理的類別:concurrency::ordered_message_processor。 ordered_message_processor 類別可讓訊息區塊按照接收順序處理訊息。
若要更加了解訊息區塊與其來源和目標之間的關聯性,請參考下列範例。 這個範例示範 concurrency::transformer 類別的宣告。
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
transformer 類別衍生自 propagator_block,因此會同時當做來源區塊和目標區塊。 它接收 _Input 型別的訊息及傳送 _Output 型別的訊息。 transformer 類別將 single_link_registry 指定為任何目標區塊的連結管理員,並將 multi_link_registry 指定為任何來源區塊的連結管理員。 因此,transformer 物件最多可以有一個目標而不限數目的來源。
衍生自 source_block 的類別必須實作六個方法:propagate_to_any_targets、accept_message、reserve_message、consume_message、release_message 和 resume_propagation。 衍生自 target_block 的類別必須實作 propagate_message 方法,並可以選擇性地實作 send_message 方法。 衍生自 propagator_block 在功能上相當於衍生自 source_block 和 target_block。
執行階段呼叫 propagate_to_any_targets 方法,以非同步方式或同步方式處理任何傳入訊息及向外傳播任何外寄訊息。 目標區塊呼叫 accept_message 方法,以接受訊息。 許多訊息區塊類型 (例如 unbounded_buffer) 只將訊息傳送至第一個會接收的目標。 因此,它會將訊息擁有權轉移給目標。 其他訊息區塊類型 (例如 concurrency::overwrite_buffer) 會將訊息提供給每個目標區塊。 因此,overwrite_buffer 會為每個目標建立訊息的複本。
reserve_message、consume_message、release_message 和 resume_propagation 方法可讓訊息區塊參與訊息保留。 當提供訊息給目標區塊,而目標區塊必須保留訊息以供稍後使用時,目標區塊會呼叫 reserve_message 方法。 在目標區塊保留訊息之後,它可以呼叫 consume_message 方法以使用該訊息,或呼叫 release_message 方法以取消保留。 如同 accept_message 方法,consume_message 的實作可以轉移訊息擁有權或傳回訊息複本。 在目標區塊使用或釋放保留的訊息之後,執行階段會呼叫 resume_propagation 方法。 一般而言,這個方法會從佇列中的下一個訊息開始繼續訊息傳播。
執行階段會呼叫 propagate_message 方法,以非同步方式將訊息從另一個區塊轉送至目前區塊。 send_message 方法類似 propagate_message,不同之處在於它是以同步方式 (而不是以非同步方式) 將訊息傳送至目標區塊。 send_message 的預設實作會拒絕所有傳入訊息。 如果訊息未通過與目標區塊相關聯的選擇性篩選函式,執行階段不會呼叫這兩個方法。 如需訊息篩選條件的詳細資訊,請參閱非同步訊息區。
[上方]
定義 priority_buffer 類別
priority_buffer 類別是先依優先權然後再依接收訊息的順序排序傳入訊息的自訂訊息區塊類型。 priority_buffer 類別類似 concurrency::unbounded_buffer 類別的原因有兩個:它保存訊息佇列,以及當做來源和目標訊息區塊,可以有多個來源和多個目標。 不過,unbounded_buffer 訊息傳播只能根據從來源接收訊息的順序。
priority_buffer 類別接收 std::tuple 型別且包含 PriorityType 和 Type 項目的訊息。 PriorityType 是指保存每則訊息之優先權的型別;Type 是指訊息的資料部分。 priority_buffer 類別傳送 Type 型別的訊息。 priority_buffer 類別也管理兩個訊息佇列:用於傳入訊息的 std::priority_queue 物件和用於外寄訊息的 std::queue 物件。 當 priority_buffer 物件同時接收多則訊息,或在消費者讀取任何訊息之前接收多則訊息時,依優先權排序訊息很實用。
除了衍生自 propagator_block 的類別所必須實作的七個方法之外,priority_buffer 類別也會覆寫 link_target_notification 和 send_message 方法。 priority_buffer 類別也定義兩個公用 Helper 方法 enqueue 和 dequeue,和一個私用 Helper 方法 propagate_priority_order。
下列程序說明如何實作 priority_buffer 類別。
若要定義 priority_buffer 類別
建立 C++ 標頭檔,並將它命名為 priority_buffer.h。 或者,您也可以使用專案中現有的標頭檔。
在 priority_buffer.h 中,加入下列程式碼。
#pragma once #include <agents.h> #include <queue>
在 std 命名空間中,定義作用於 concurrency::message 物件之 std::less 和 std::greater 的特製化。
namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }
priority_buffer 類別將 message 物件儲存在 priority_queue 物件中。 這些型別特製化可讓優先權佇列根據訊息優先權來排序訊息。 優先權是 tuple 物件的第一個項目。
在 concurrencyex 命名空間中,宣告 priority_buffer 類別。
namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }
priority_buffer 類別衍生自 propagator_block。 因此,它可以傳送及接收訊息。 priority_buffer 類別可以有多個可接收 Type 型別之訊息的目標。 它也可以有多個可傳送 tuple<PriorityType, Type> 型別之訊息的來源。
在 priority_buffer 類別的 private 區段中,加入下列成員變數。
// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;
priority_queue 物件保存傳入訊息,而 queue 物件則保存外寄訊息。 priority_buffer 物件可以同時接收多則訊息,critical_section 物件則會同步處理對輸入訊息佇列的存取。
在 private 區段中,定義複製建構函式和指派運算子。 這會防止 priority_queue 物件指派。
// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);
在 public 區段中,定義許多訊息區塊類型通用的建構函式。 也定義解構函式。
// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }
在 public 區段中,定義 enqueue 和 dequeue 方法。 這些 Helper 方法提供替代方式來對 priority_buffer 物件傳送訊息及接收訊息。
// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }
在 protected 區段中,定義 propagate_to_any_targets 方法。
// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }
propagate_to_any_targets 方法會將位於輸入佇列前端的訊息轉送至輸出佇列,並向外傳播輸出佇列中的所有訊息。
在 protected 區段中,定義 accept_message 方法。
// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }
當目標區塊呼叫 accept_message 方法時,priority_buffer 類別會將訊息擁有權轉移給第一個接收訊息的目標區塊 (這種行為類似 unbounded_buffer 的行為)。
在 protected 區段中,定義 reserve_message 方法。
// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }
當提供的訊息識別碼符合位於佇列前端之訊息的識別碼時,priority_buffer 類別允許目標區塊保留訊息。 換言之,如果 priority_buffer 物件尚未收到其他訊息,且尚未向外傳播目前訊息,目標可以保留此訊息。
在 protected 區段中,定義 consume_message 方法。
// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }
目標區塊會呼叫 consume_message,轉移它所保留之訊息的擁有權。
在 protected 區段中,定義 release_message 方法。
// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }
目標區塊會呼叫 release_message,取消訊息保留。
在 protected 區段中,定義 resume_propagation 方法。
// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }
在目標區塊使用或釋放保留的訊息之後,執行階段會呼叫 resume_propagation。 這個方法會向外傳播輸出佇列中的任何訊息。
在 protected 區段中,定義 link_target_notification 方法。
// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }
_M_pReservedFor 成員變數是由基底類別 source_block 所定義。 這個成員變數指向保留輸出佇列前端之訊息的目標區塊 (如果有的話)。 當新目標連結至 priority_buffer 物件時,執行階段會呼叫 link_target_notification。 如果沒有保留訊息的目標時,這個方法會向外傳播輸出佇列中的任何訊息。
在 private 區段中,定義 propagate_priority_order 方法。
// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }
這個方法會從輸出佇列向外傳播所有訊息。 佇列中的每則訊息都會提供給每個目標區塊,直到其中一個目標區塊接受訊息。 priority_buffer 類別會保留外寄訊息的順序。 因此,目標區塊必須先接受輸出佇列中的第一個訊息,這個方法才會將任何其他訊息提供給目標區塊。
在 protected 區段中,定義 propagate_message 方法。
// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }
propagate_message 方法可讓 priority_buffer 類別當做訊息接收者 (或目標)。 這個方法會接收提供之來源區塊提供的訊息,並將該訊息插入優先權佇列中。 propagate_message 方法接著以非同步方式將所有輸出訊息傳送至目標區塊。
當您呼叫 concurrency::asend 函式或當訊息區塊連接至其他訊息區塊時,執行階段會呼叫這個方法。
在 protected 區段中,定義 send_message 方法。
// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }
send_message 方法類似 propagate_message。 不過,它是以同步方式 (而不是以非同步方式) 傳送輸出訊息。
在同步傳送作業期間,例如當您呼叫 concurrency::send 函式時,執行階段會呼叫這個方法。
priority_buffer 類別包含許多訊息區塊類型中常見的建構函式多載。 有些建構函式多載接受 concurrency::Scheduler 或 concurrency::ScheduleGroup 物件,讓訊息區塊受特定工作排程器管理。 有些建構函式多載接受篩選函式。 篩選函式可讓訊息區塊根據訊息裝載來接受或拒絕訊息。 如需訊息篩選條件的詳細資訊,請參閱非同步訊息區。 如需工作排程器的詳細資訊,請參閱工作排程器 (並行執行階段)。
因為 priority_buffer 類別先依優先權然後再依接收訊息的順序來排序訊息,所以在以非同步方式接收訊息時,例如當您呼叫 concurrency::asend 函式或當訊息區塊連接至其他訊息區塊時,這個類別最實用。
[上方]
完整的範例
下列範例顯示 priority_buffer 類別的完整定義。
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
下列範例會在 priority_buffer 物件上並行執行一些 asend 和 concurrency::receive 作業。
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
這個範例 (Example) 產生下列範例 (Sample) 輸出。
priority_buffer 類別先依優先權然後再依接收訊息的順序來排序訊息。 在這個範例中,優先權為較大數字的訊息會插入佇列的前端。
[上方]
編譯程式碼
請複製範例程式碼,並將它貼在 Visual Studio 專案中,或將 priority_buffer 類別定義貼在 priority_buffer.h 檔案中,並將測試程式貼在 priority_buffer.cpp 檔案中,然後在 Visual Studio 的 [命令提示字元] 視窗中執行下列命令。
cl.exe /EHsc priority_buffer.cpp