演练:创建自定义消息块

本文档介绍如何创建根据优先级对传入消息进行排序的自定义消息块类型。

尽管内置消息块类型提供广泛的功能,但你可以创建自己的消息块类型,并根据应用程序的要求自定义该类型。 有关异步代理库提供的内置消息块类型的介绍,请参阅异步消息块

先决条件

在开始操作本演练之前,请阅读以下文档:

部分

本演练包含以下各节:

设计自定义消息块

消息块参与发送和接收消息的操作。 发送消息的消息块称为源块。 接收消息的消息块称为目标块。 发送和接收消息的消息块称为传播器块。 代理库使用抽象类 concurrency::ISource 表示源快,使用抽象类 concurrency::ITarget 表示目标块。 充当源的消息块类型派生自 ISource;充当目标的消息块类型派生自 ITarget

虽然可以直接从 ISourceITarget 派生消息块类型,但代理库定义了三个基类,这些类执行所有消息块类型共有的大部分功能,例如处理错误,以及以并发安全的方式将消息块连接在一起。 concurrency::source_block 类派生自 ISource 并将消息发送给其他块。 concurrency::target_block 类派生自 ITarget 并从其他块接收消息。 concurrency::propagator_block 类派生自 ISourceITarget 并发送消息给其他块以及从其他块接收消息。 我们建议使用这三个基类来处理基础结构详细信息,以便你可以专注于消息块的行为。

source_blocktarget_blockpropagator_block 类是基于一个管理源块与目标块之间的连接或链接的类型,以及一个管理消息处理方式的类型参数化的模板。 代理库定义两种执行链接管理的类型:concurrency::single_link_registryconcurrency::multi_link_registrysingle_link_registry 类使消息块能够链接到一个源或一个目标。 multi_link_registry 类使消息块能够链接到多个源或多个目标。 代理库定义一个执行消息管理的类:concurrency::ordered_message_processorordered_message_processor 类使消息块能够按照消息的接收顺序处理消息。

为了更好地理解消息块如何与其源和目标相关,请考虑以下示例。 此示例演示 concurrency::transformer 类的声明。

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

transformer 类派生自 propagator_block,因此既充当源块又充当目标块。 它接受 _Input 类型的消息并发送 _Output 类型的消息。 transformer 类将 single_link_registry 指定为任何目标块的链接管理器,将 multi_link_registry 指定为任何源块的链接管理器。 因此,一个 transformer 对象最多可以有一个目标和无限数量的源。

派生自 source_block 的类必须实现六个方法:propagate_to_any_targetsaccept_messagereserve_messageconsume_messagerelease_messageresume_propagation。 派生自 target_block 的类必须实现 propagate_message 方法,并可以选择性地实现 send_message 方法。 从 propagator_block 派生在功能上等同于从 source_blocktarget_block 派生。

运行时调用 propagate_to_any_targets 方法来以异步或同步方式处理任何传入消息和传播任何传出消息。 目标块调用 accept_message 方法来接受消息。 许多消息块类型(例如 unbounded_buffer)仅将消息发送到接收它的第一个目标。 因此,它会将消息所有权转移给目标。 其他消息块类型(例如 concurrency::overwrite_buffer)将消息提供给其每个目标块。 因此,overwrite_buffer 为其每个目标创建消息的副本。

reserve_messageconsume_messagerelease_messageresume_propagation 方法使消息块能够参与消息保留。 目标块在收到消息时调用 reserve_message 方法,且必须保留消息供以后使用。 目标块保留消息后,可以调用 consume_message 方法来使用该消息,或调用 release_message 方法来取消保留。 与 accept_message 方法一样,consume_message 的实现可以转移消息所有权或返回消息的副本。 在目标块使用或释放保留的消息后,运行时将调用 resume_propagation 方法。 此方法通常会从队列中的下一条消息开始继续传播消息。

运行时调用 propagate_message 方法将消息从另一个块异步传输到当前块。 send_message 方法类似于 propagate_message,不同之处在于,它以同步而不是异步方式将消息发送到目标块。 send_message 的默认实现会拒绝所有传入消息。 如果消息未传递与目标块关联的可选筛选器函数,则运行时不会调用这些方法中的任何一个。 有关消息筛选器的详细信息,请参阅异步消息块

[返回页首]

定义 priority_buffer 类

priority_buffer 类是一种自定义消息块类型,它首先按优先级将传入消息排序,然后按消息的接收顺序排序。 priority_buffer 类与 concurrency::unbounded_buffer 类相似,因为它保留一列消息,另外还因为它同时充当源和目标消息块并可以同时具有多个源和多个目标。 但是,unbounded_buffer 仅根据从源接收消息的顺序来传播消息。

priority_buffer 类接收 std::tuple 类型的、包含 PriorityTypeType 元素的消息。 PriorityType 引用包含每条消息的优先级的类型;Type 引用消息的数据部分。 priority_buffer 类发送 Type 类型的消息。 priority_buffer 类还管理两个消息队列:适用于传入消息的 std::priority_queue 对象,以及适用于传出消息的 std::queue 对象。 当 priority_buffer 对象同时接收多个消息,或者当它在使用者读取任何消息之前接收多个消息时,按优先级将消息排序很有用。

除了派生自 propagator_block 的类必须实现的七个方法之外,priority_buffer 类还重写 link_target_notificationsend_message 方法。 priority_buffer 类还定义两个公共帮助器方法:enqueuedequeue,以及一个专用帮助器方法:propagate_priority_order

以下过程说明如何实现 priority_buffer 类。

定义 priority_buffer 类

  1. 创建一个 C++ 头文件并将其命名为 priority_buffer.h。 或者,可以使用作为项目一部分的现有头文件。

  2. priority_buffer.h 中添加以下代码。

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std 命名空间中,定义 std::lessstd::greater(对 concurrency::message 对象执行操作)的专用化。

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    priority_buffer 类将 message 对象存储在 priority_queue 对象中。 这些类型专用化使优先级队列能够根据消息的优先级将消息排序。 priority 是 tuple 对象的第一个元素。

  4. concurrencyex 命名空间中声明 priority_buffer 类。

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    priority_buffer 类从 propagator_block 派生。 因此,它既可以发送消息,也可以接收消息。 priority_buffer 类可以有多个接收 Type 类型的消息的目标。 它还可以有多个发送 tuple<PriorityType, Type> 类型的消息的源。

  5. priority_buffer 类的 private 节中添加以下成员变量。

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    priority_queue 对象包含传入消息;queue 对象包含传出消息。 priority_buffer 对象可以同时接收多个消息;critical_section 对象同步对输入消息队列的访问。

  6. private 节中,定义复制构造函数和赋值运算符。 这可以使得 priority_queue 对象不可分配。

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public 节中,定义许多消息块类型公用的构造函数。 另外,请定义析构函数。

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. public 节中,定义方法 enqueuedequeue。 这些帮助器方法提供替代的方式用于向 priority_buffer 对象发送消息以及从该对象接收消息。

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. protected 节中,定义 propagate_to_any_targets 方法。

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    propagate_to_any_targets 方法将输入队列前面的消息传输到输出队列,并将输出队列中的所有消息传播出去。

  10. protected 节中,定义 accept_message 方法。

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    当目标块调用 accept_message 方法时,priority_buffer 类会将消息所有权转移给第一个接受它的目标块。 (这类似于 unbounded_buffer 的行为。)

  11. protected 节中,定义 reserve_message 方法。

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    当提供的消息标识符与队列前面的消息标识符匹配时,priority_buffer 类允许目标块保留消息。 换言之,如果 priority_buffer 对象尚未收到其他消息并且尚未传播当前消息,则目标可以保留该消息。

  12. protected 节中,定义 consume_message 方法。

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    目标块调用 consume_message 来转移它保留的消息的所有权。

  13. protected 节中,定义 release_message 方法。

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    目标块调用 release_message 以取消它对消息的保留。

  14. protected 节中,定义 resume_propagation 方法。

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    在目标块使用或释放保留的消息后,运行时将调用 resume_propagation。 此方法传播输出队列中的所有消息。

  15. protected 节中,定义 link_target_notification 方法。

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    _M_pReservedFor 成员变量由基类 source_block 定义。 此成员变量指向目标块(如果有),该块包含对输出队列前面的消息的保留。 当新目标已链接到 priority_buffer 对象时,运行时将调用 link_target_notification。 如果没有目标包含保留,则此方法将传播输出队列中的所有消息。

  16. private 节中,定义 propagate_priority_order 方法。

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    此方法传播输出队列中的所有消息。 队列中的每条消息将提供给每个目标块,直到其中一个目标块接受了该消息。 priority_buffer 类保留传出消息的顺序。 因此,在此方法向目标块提供任何其他消息之前,目标块必须接受输出队列中的第一条消息。

  17. protected 节中,定义 propagate_message 方法。

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    propagate_message 方法使 priority_buffer 类能够充当消息接收方(目标)。 此方法接收由提供的源块所提供的消息,并将该消息插入优先级队列。 propagate_message 方法随后以异步方式将所有输出消息发送到目标块。

    当你调用 concurrency::asend 函数或者当消息块已连接到其他消息块时,运行时将调用此方法。

  18. protected 节中,定义 send_message 方法。

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    send_message 方法类似于 propagate_message。 但是,它以同步而不是异步方式发送输出消息。

    在执行同步发送操作期间(例如,在调用 concurrency::send 函数时),运行时将调用此方法。

priority_buffer 类包含许多消息块类型中常见的构造函数重载。 某些构造函数重载采用 concurrency::Schedulerconcurrency::ScheduleGroup 对象,使消息块可由特定的任务计划程序管理。 其他构造函数重载采用筛选器函数。 筛选器函数使消息块能够根据其有效负载接受或拒绝消息。 有关消息筛选器的详细信息,请参阅异步消息块。 有关任务计划程序的详细信息,请参阅任务计划程序

由于 priority_buffer 类先按照优先级再按照消息接收顺序将消息排序,因此该类在异步接收消息时最有用,例如,在调用 concurrency::asend 函数或者当消息块已连接到其他消息块时。

[返回页首]

完整示例

以下示例演示了 priority_buffer 类的完整定义。

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

以下示例对 priority_buffer 对象并发执行多个 asendconcurrency::receive 操作。

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

此示例生成以下示例输出。

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

priority_buffer 类首先按优先级再按接收消息的顺序将消息排序。 在此示例中,数值优先级较大的消息将插入到队列的前面。

[返回页首]

编译代码

复制代码示例并将其粘贴到 Visual Studio 项目中,或将 priority_buffer 类的定义粘贴到名为 priority_buffer.h 的文件中,并将测试程序粘贴到名为 priority_buffer.cpp 的文件中,然后在 Visual Studio 命令提示符窗口中运行以下命令。

cl.exe /EHsc priority_buffer.cpp

另请参阅

并发运行时演练
异步消息块
消息传递函数