演练:创建自定义消息块

本文档介绍如何创建按优先级排序传入消息的自定义消息块类型。

尽管内置的消息块类型提供许多不同的功能,但您可以创建自己的消息块类型,并对其进行自定义以满足应用程序的要求。 有关异步代理库提供的内置消息块类型的说明,请参见异步消息块

系统必备

在开始本演练之前,请阅读下列文档:

各节内容

本演练包含以下各节:

  • 设计自定义消息块

  • 定义 priority_buffer 类

  • 完整示例

设计自定义消息块

消息块参与发送和接收消息的操作。 发送消息的消息块称为“源块”。 接收消息的消息块称为“目标块”。 既发送又接收消息的消息块称为“传播器块”。 代理库使用抽象类 concurrency::ISource 表示源快,使用抽象类 concurrency::ITarget 表示目标块。 用作源的消息块类型从 ISource 派生;用作目标的消息块类型从 ITarget 派生。

尽管您可以直接从 ISourceITarget 派生消息块类型,代理库也会定义三种基类以执行所有消息块类型通用的大部分功能,例如,处理错误以及以并发安全的方式将消息块连接在一起。 concurrency::source_block 类派生自 ISource 并将消息发送给其他块。 concurrency::target_block 类派生自 ITarget 并从其他块接收消息。 concurrency::propagator_block 类派生自 ISourceITarget 并发送消息给其他块以及从其他块接收消息。 建议您使用这三种基类处理基础结构细节,以便您可以关注消息块的行为。

source_blocktarget_blockpropagator_block 类是针对管理源和目标块之间连接(或链接)的类型和管理如何处理消息的类型参数化的模板。 代理库定义两种执行链接管理的类型:concurrency::single_link_registryconcurrency::multi_link_registrysingle_link_registry 类允许消息块链接至一个源或一个目标。 multi_link_registry 类允许消息块链接至多个源或多个目标。 代理库定义一个执行消息管理的类:concurrency::ordered_message_processorordered_message_processor 类允许消息块按照其接收消息的顺序处理消息。

为了更好地理解消息块如何与它们的源和目标关联,请考虑以下示例。 本示例显示 concurrency::transformer 类的声明。

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

transformer 类派生自 propagator_block,因此可同时充当源块和目标块。 它接受 _Input 类型的消息,发送 _Output 类型的消息。 transformer 类指定 single_link_registry 作为所有目标块的链接管理器,指定 multi_link_registry 作为所有源块的链接管理器。 因此,transformer 对象最多能有一个目标,但源的数量没有限制。

source_block 派生的类必须实现六个方法:propagate_to_any_targetsaccept_messagereserve_messageconsume_messagerelease_messageresume_propagation。 从 target_block 派生的类必须实现 propagate_message 方法,并可以选择实现 send_message 方法。 从 propagator_block 派生在功能上等效于同时从 source_blocktarget_block 派生。

propagate_to_any_targets 方法由运行时调用以异步或同步处理所有传入的消息以及传播所有传出的消息。 accept_message 方法由目标块调用以接受消息。 许多消息块类型(例如 unbounded_buffer)仅将消息发送给接收该消息的第一个目标。 因此,它会将消息的所有权转移给目标。 其他消息块类型(例如 concurrency::overwrite_buffer)将消息提供给其每个目标块。 因此,overwrite_buffer 为其每个目标创建一份消息的副本。

reserve_messageconsume_messagerelease_messageresume_propagation 方法允许消息块参与消息保留。 当目标块收到消息并必须保留该消息以备日后使用时,则会调用 reserve_message 方法。 目标块保留消息后,它可以调用 consume_message 方法以使用该消息,或调用 release_message 方法以取消保留。 与 accept_message 方法相同,consume_message 的实现可以转移消息的所有权或返回消息的副本。 当目标块使用或释放保留的消息之后,运行时会调用 resume_propagation 方法。 通常,该方法会从队列中的下一条消息开始继续进行消息传播。

运行时调用 propagate_message 方法以从另一个块将消息异步传输到当前块。 send_message 方法与 propagate_message 类似,但它同步而不是异步将消息发送给目标块。 send_message 的默认实现拒绝所有传入消息。 如果消息未传递与目标块关联的可选筛选功能,则运行时不调用其中任一方法。 有关消息筛选器的更多信息,请参见异步消息块

[Top]

定义 priority_buffer 类

priority_buffer 类是自定义的消息块类型,它先按照优先级、然后按照接收消息的顺序排序传入的消息。 priority_buffer 类与 concurrency::unbounded_buffer 类相似,因为它保留一列消息,另外还因为它同时充当源和目标消息块并可以同时具有多个源和多个目标。 但是,unbounded_buffer 仅按照其从源接收消息的顺序进行消息传播。

priority_buffer 类接收 std::tuple 类型的消息,该类型包含 PriorityType 和 Type 元素。 PriorityType 是指保留每条消息优先级的类型;Type 是指消息的数据部分。 priority_buffer 类发送 Type 类型的消息。 priority_buffer 类还管理两个消息队列:std::priority_queue 对象(用于传入消息)和 std::queue 对象(用于传出消息)。 当 priority_buffer 对象同时接收多个消息或在用户阅读任何消息之前接收多个消息时,按优先级排序消息会很有用。

除了从 propagator_block 派生的类必须实现的七个方法以外,priority_buffer 类还替换 link_target_notificationsend_message 方法。 priority_buffer 类还定义两个公共帮助程序方法(enqueuedequeue)以及一个专用帮助程序方法(propagate_priority_order)。

下面的过程介绍如何实现 priority_buffer 类。

定义 priority_buffer 类

  1. 创建一个 C++ 头文件并将其命名为 priority_buffer.h。 或者,您可以使用项目中的现有头文件。

  2. 在 priority_buffer.h 中,添加以下代码。

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std 命名空间中,定义 std::lessstd::greater(对 concurrency::message 对象执行操作)的专用化。

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    priority_buffer 类将 message 对象存储在 priority_queue 对象中。 这些类型专用化允许优先级队列根据它们的优先级排序消息。 优先级是 tuple 对象的第一个元素。

  4. concurrencyex 命名空间中,声明 priority_buffer 类。

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    priority_buffer 类派生自 propagator_block。 因此,它可以同时发送和接收消息。 priority_buffer 类可以具有多个接收 Type 类型消息的目标。 它还可以具有多个发送 tuple<PriorityType, Type> 类型消息的源。

  5. priority_buffer 类的 private 部分,添加以下成员变量。

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    priority_queue 对象保存传入消息;queue 对象保存传出消息。 priority_buffer 对象可以同时接收多个消息;critical_section 对象可同步访问输入消息队列。

  6. 在 private 部分,定义复制构造函数和赋值运算符。 这会阻止 priority_queue 对象赋值。

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. 在 public 部分,定义许多消息块类型通用的构造函数。 还需定义析构函数。

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. 在 public 部分,定义 enqueuedequeue 方法。 这些帮助程序方法提供向 priority_buffer 对象发送以及从其处接收消息的替代方法。

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. 在 protected 部分,定义 propagate_to_any_targets 方法。

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    propagate_to_any_targets 方法将位于输入队列前面的消息传送至输出队列,并传播输出队列中的所有消息。

  10. 在 protected 部分,定义 accept_message 方法。

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    当目标块调用 accept_message 方法时,priority_buffer 类会将消息的所有权转移给接受该消息的第一个目标块。(这类似于 unbounded_buffer 的行为。)

  11. 在 protected 部分,定义 reserve_message 方法。

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    当提供的消息标识符与位于队列前面的消息的标识符匹配时,priority_buffer 类允许目标块保留消息。 也就是说,如果 priority_buffer 对象尚未收到其他消息并且尚未传播当前消息,则目标可以保留消息。

  12. 在 protected 部分,定义 consume_message 方法。

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    目标块调用 consume_message 以转移其保留消息的所有权。

  13. 在 protected 部分,定义 release_message 方法。

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    目标块调用 release_message 以取消它对消息的保留。

  14. 在 protected 部分,定义 resume_propagation 方法。

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    当目标块使用或释放保留的消息之后,运行时调用 resume_propagation。 该方法传播输出队列中的所有消息。

  15. 在 protected 部分,定义 link_target_notification 方法。

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    _M_pReservedFor 成员变量由 source_block 基类定义。 该成员变量所指向的目标块(如果有)始终保留位于输出队列前面的消息。 当新目标链接至 priority_buffer 对象时,运行时会调用 link_target_notification。 如果没有目标存放保留消息,则该方法会传播输出队列中的所有消息。

  16. 在 private 部分,定义 propagate_priority_order 方法。

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    该方法传播输出队列中的所有消息。 队列中的每一条消息提供给每一个目标块,直到其中一个目标块接受消息。 priority_buffer 类保留传出消息的顺序。 因此,当该方法提供任何其他消息给某目标块之前,该目标块必须接受输出队列中的第一条消息。

  17. 在 protected 部分,定义 propagate_message 方法。

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    propagate_message 方法允许 priority_buffer 类充当消息接收程序或目标。 该方法接收提供的源块提供的消息,并将该消息插入优先级队列中。 propagate_message 方法然后将所有输出消息异步发送给目标块。

    当您调用 concurrency::asend 函数或消息块连接至其他消息块时,运行时会调用该方法。

  18. 在 protected 部分,定义 send_message 方法。

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    send_message 方法与 propagate_message 类似。 但它同步而不是异步发送输出消息。

    在同步发送操作期间运行时会调用该方法,例如当您调用 concurrency::send 函数时。

priority_buffer 类包含构造函数重载,许多消息块类型都具有这些重载。 某些构造函数重载可获取 Concurrency::Schedulerconcurrency::ScheduleGroup 对象,以允许消息块由特定的任务计划程序管理。 其他构造函数重载可获取筛选功能。 筛选功能允许消息块根据消息的负载接受或拒绝消息。 有关消息筛选器的更多信息,请参见异步消息块。 有关任务计划程序的更多信息,请参见任务计划程序(并发运行时)

由于 priority_buffer 类先按照优先级再按照接收消息的顺序排序消息,因此该类在异步接收消息时最有用,例如当您调用 concurrency::asend 函数或消息块连接至其他消息块时。

[Top]

完整示例

下面的示例显示 priority_buffer 类的完整定义。

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

下面的示例对 priority_buffer 对象并发执行许多 asendconcurrency::receive 操作。

// priority_buffer.cpp 
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h" 

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations 
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

此示例产生下面的示例输出。

  

priority_buffer 类先按照优先级再按照接收消息的顺序排序消息。 在本示例中,消息的优先级数字越大,它在队列中越靠前。

[Top]

编译代码

复制示例代码并将其粘贴到Visual Studio项目中,或将 priority_buffer类的定义粘贴到名为 priority_buffer.h的文件中并将测试程序粘贴到名为priority_buffer.cpp 的文件中,然后在 Visual Studio命令提示符窗口中运行以下命令。

cl.exe /EHsc priority_buffer.cpp

请参见

概念

异步消息块

消息传递函数

其他资源

并发运行时演练