Пошаговое руководство. Создание пользовательского блока сообщений

В этом документе описывается, как создать пользовательский тип блока сообщений, который упорядочивает входящие сообщения по приоритету.

Хотя встроенные типы блоков сообщений предоставляют широкий спектр функциональных возможностей, вы можете создать собственный тип блока сообщений и настроить его в соответствии с требованиями приложения. Описание встроенных типов блоков сообщений, предоставляемых библиотекой асинхронных агентов, см. в разделе "Асинхронные блоки сообщений".

Необходимые компоненты

Ознакомьтесь со следующими документами перед началом работы с этим пошаговом руководстве.

Разделы

Это пошаговое руководство содержит следующие разделы:

Проектирование настраиваемого блока сообщений

Блоки сообщений участвуют в процессе отправки и получения сообщений. Блок сообщений, который отправляет сообщения, называется исходным блоком. Блок сообщений, который получает сообщения, называется целевым блоком. Блок сообщений, который отправляет и получает сообщения, называется блоком распространения. Библиотека агентов использует параллелизм абстрактного класса ::ISource для представления исходных блоков и параллелизма абстрактного класса ::ITarget для представления целевых блоков. Типы блоков сообщений, которые действуют как источники, производные от ISource; типы блоков сообщений, которые действуют в качестве целевых объектов, производных от ITarget.

Хотя тип блока сообщений можно наследить непосредственно от ISource и ITargetиз библиотеки агентов определяет три базовых класса, которые выполняют большую часть функциональных возможностей, которые являются общими для всех типов блоков сообщений, например обработка ошибок и соединение блоков сообщений вместе в безопасном режиме параллелизма. Класс параллелизма::source_block является производным от ISource и отправляет сообщения в другие блоки. Класс параллелизма::target_block является производным от ITarget и получает сообщения из других блоков. Класс concurrency::p ropagator_block является производным от ISource и ITarget отправляет сообщения в другие блоки и получает сообщения из других блоков. Мы рекомендуем использовать эти три базовых класса для обработки сведений о инфраструктуре, чтобы сосредоточиться на поведении блока сообщений.

Классы source_blocktarget_blockи propagator_block классы — это шаблоны, параметризованные в типе, который управляет подключениями или ссылками между исходными и целевыми блоками и типом, который управляет обработкой сообщений. Библиотека агентов определяет два типа, которые выполняют управление ссылками, параллелизм::single_link_registry и параллелизм::multi_link_registry. Класс single_link_registry позволяет связать блок сообщений с одним источником или одним целевым объектом. Класс multi_link_registry позволяет связать блок сообщений с несколькими источниками или несколькими целевыми объектами. Библиотека агентов определяет один класс, который выполняет управление сообщениями, параллелизм::ordered_message_processor. Класс ordered_message_processor позволяет блокам сообщений обрабатывать сообщения в порядке их получения.

Чтобы лучше понять, как блоки сообщений связаны с их источниками и целевыми объектами, рассмотрим следующий пример. В этом примере показано объявление класса параллелизма::преобразователя .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Класс transformer является производным от propagator_blockи, следовательно, выступает как в качестве исходного блока, так и в качестве целевого блока. Он принимает сообщения типа _Input и отправляет сообщения типа _Output. Класс transformer указывает single_link_registry в качестве диспетчера ссылок для всех целевых блоков и multi_link_registry в качестве диспетчера ссылок для любых исходных блоков. transformer Поэтому объект может иметь до одного целевого объекта и неограниченного количества источников.

Класс, производный от source_block шести методов: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message и resume_propagation. Класс, производный от target_block этого, должен реализовать метод propagate_message и при необходимости реализовать метод send_message . Производный от propagator_block функционально эквивалентен производным от обоих source_block и target_block.

Метод propagate_to_any_targets вызывается средой выполнения для асинхронной или синхронной обработки всех входящих сообщений и распространения всех исходящих сообщений. Метод accept_message вызывается целевыми блоками для приема сообщений. Многие типы блоков сообщений, например unbounded_buffer, отправляют сообщения только в первый целевой объект, который получит его. Таким образом, он передает владение сообщением целевому объекту. Другие типы блоков сообщений, такие как параллелизм::overwrite_buffer, предлагают сообщения каждому из его целевых блоков. overwrite_buffer Поэтому создает копию сообщения для каждого из его целевых объектов.

consume_messageМетоды reserve_message, release_messageи resume_propagation методы позволяют блокам сообщений участвовать в резервировании сообщений. Целевые блоки вызывают reserve_message метод, когда они предлагают сообщение и должны зарезервировать сообщение для последующего использования. После того как целевой блок резервирует сообщение, он может вызвать consume_message метод для использования этого сообщения или release_message метода для отмены резервирования. Как и в accept_message случае с методом, реализация consume_message может передавать владение сообщением или возвращать копию сообщения. После того как целевой блок использует или освобождает зарезервированное сообщение, среда выполнения вызывает resume_propagation метод. Как правило, этот метод продолжает распространение сообщений, начиная с следующего сообщения в очереди.

Среда выполнения вызывает propagate_message метод для асинхронной передачи сообщения из другого блока в текущий. Метод send_message похож propagate_message, за исключением того, что он синхронно, а не асинхронно отправляет сообщение в целевые блоки. Реализация по умолчанию send_message отклоняет все входящие сообщения. Среда выполнения не вызывает ни один из этих методов, если сообщение не передает необязательную функцию фильтра, связанную с целевым блоком. Дополнительные сведения о фильтрах сообщений см. в разделе "Асинхронные блоки сообщений".

[В начало]

Определение класса priority_buffer

Класс priority_buffer — это пользовательский тип блока сообщений, который сначала упорядочивает входящие сообщения по приоритету, а затем по порядку получения сообщений. Класс priority_buffer напоминает класс параллелизма::unbounded_buffer , так как он содержит очередь сообщений, а также потому, что он действует как в качестве источника, так и целевого блока сообщений, и может иметь как несколько источников, так и несколько целевых объектов. Однако распространение unbounded_buffer сообщений базируется только в том порядке, в котором он получает сообщения из своих источников.

Класс priority_buffer получает сообщения типа std::кортежа , содержащие PriorityType и Type элементы. PriorityType относится к типу, который имеет приоритет каждого сообщения; Type ссылается на часть данных сообщения. Класс priority_buffer отправляет сообщения типа Type. Класс priority_buffer также управляет двумя очередями сообщений: объект std::p riority_queue для входящих сообщений и объект std::queue для исходящих сообщений. Упорядочивание сообщений по приоритету полезно, если priority_buffer объект получает несколько сообщений одновременно или при получении нескольких сообщений перед чтением сообщений потребителями.

Помимо семи методов, производных от propagator_block класса, который должен реализовываться, priority_buffer класс также переопределяет link_target_notification методы и send_message методы. Класс priority_buffer также определяет два открытых вспомогательных метода, enqueue а dequeueтакже частный вспомогательный метод propagate_priority_order.

В следующей процедуре описывается реализация priority_buffer класса.

Определение класса priority_buffer

  1. Создайте файл заголовка C++ и назовите его priority_buffer.h. Кроме того, можно использовать существующий файл заголовка, который является частью проекта.

  2. Добавьте priority_buffer.hследующий код.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std В пространстве имен определите специализации std::less и std::great, которые действуют на объектах concurrency::message.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Класс priority_buffer хранит message объекты в объекте priority_queue . Эти специализации типов позволяют очереди приоритетов сортировать сообщения в соответствии с их приоритетом. Приоритетом является первый элемент tuple объекта.

  4. В пространстве имен объявите concurrencyexpriority_buffer класс.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    Класс priority_buffer является производным от propagator_block. Поэтому он может отправлять и получать сообщения. Класс priority_buffer может иметь несколько целевых объектов, получающих сообщения типа Type. Он также может содержать несколько источников, отправляющих сообщения типа tuple<PriorityType, Type>.

  5. private В разделе priority_buffer класса добавьте следующие переменные-члены.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Объект priority_queue содержит входящие сообщения; queue объект содержит исходящие сообщения. priority_buffer Объект может одновременно получать несколько сообщений; critical_section объект синхронизирует доступ к очереди входных сообщений.

  6. В разделе определите private конструктор копирования и оператор назначения. Это предотвращает priority_queue назначение объектов.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public В разделе определите конструкторы, которые являются общими для многих типов блоков сообщений. Также определите деструктор.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. В разделе определите public методы enqueue и dequeue. Эти вспомогательные методы предоставляют альтернативный способ отправки сообщений и получения сообщений от priority_buffer объекта.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. В разделе определите protectedpropagate_to_any_targets метод.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Метод propagate_to_any_targets передает сообщение, которое находится в передней части входной очереди в выходную очередь и распространяет все сообщения в выходной очереди.

  10. В разделе определите protectedaccept_message метод.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Когда целевой блок вызывает accept_message метод, priority_buffer класс передает владение сообщением первому целевому блоку, который принимает его. (Это напоминает поведение unbounded_buffer.)

  11. В разделе определите protectedreserve_message метод.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Класс priority_buffer позволяет целевому блоку зарезервировать сообщение, если предоставленный идентификатор сообщения соответствует идентификатору сообщения, расположенного перед очередью. Другими словами, целевой объект может зарезервировать сообщение, если priority_buffer объект еще не получил дополнительное сообщение и еще не распространил текущий.

  12. В разделе определите protectedconsume_message метод.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Целевой блок вызывает consume_message передачу собственности на сообщение, зарезервированное им.

  13. В разделе определите protectedrelease_message метод.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Целевой блок вызывает release_message отмену резервирования сообщения.

  14. В разделе определите protectedresume_propagation метод.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Среда выполнения вызывается resume_propagation после использования целевого блока или выпуска зарезервированного сообщения. Этот метод распространяет все сообщения, находящиеся в выходной очереди.

  15. В разделе определите protectedlink_target_notification метод.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    Переменная-член _M_pReservedFor определяется базовым классом source_block. Эта переменная-член указывает на целевой блок, который содержит резервирование к сообщению, которое находится в передней части очереди выходных данных. Среда выполнения вызывает link_target_notification , когда новый целевой объект связан с priority_buffer объектом. Этот метод распространяет все сообщения, находящиеся в выходной очереди, если целевой объект не содержит резервирования.

  16. В разделе определите privatepropagate_priority_order метод.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Этот метод распространяет все сообщения из выходной очереди. Каждое сообщение в очереди предлагается каждому целевому блоку, пока один из целевых блоков не принимает сообщение. Класс priority_buffer сохраняет порядок исходящих сообщений. Поэтому первое сообщение в выходной очереди должно приниматься целевым блоком, прежде чем этот метод предлагает любое другое сообщение целевым блокам.

  17. В разделе определите protectedpropagate_message метод.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Метод propagate_message позволяет priority_buffer классу выступать в качестве приемника сообщений или целевого объекта. Этот метод получает сообщение, предлагаемое предоставленным блоком источника, и вставляет это сообщение в очередь приоритета. Затем propagate_message метод асинхронно отправляет все выходные сообщения в целевые блоки.

    Среда выполнения вызывает этот метод при вызове функции параллелизма::asend или при подключении блока сообщений к другим блокам сообщений.

  18. В разделе определите protectedsend_message метод.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Метод send_message похож propagate_messageна . Однако он отправляет выходные сообщения синхронно, а не асинхронно.

    Среда выполнения вызывает этот метод во время синхронной операции отправки, например при вызове функции параллелизма::send .

Класс priority_buffer содержит перегрузки конструктора, типичные во многих типах блоков сообщений. Некоторые перегрузки конструктора принимают параллелизм::Scheduler или параллелизм::ScheduleGroup, которые позволяют блоку сообщений управлять определенным планировщиком задач. Другие перегрузки конструктора принимают функцию фильтра. Функции фильтрации позволяют блокам сообщений принимать или отклонять сообщение на основе полезных данных. Дополнительные сведения о фильтрах сообщений см. в разделе "Асинхронные блоки сообщений". Дополнительные сведения о планировщиках задач см. в разделе Планировщик задач.

priority_buffer Так как класс упорядочивает сообщения по приоритету, а затем по порядку получения сообщений, этот класс наиболее полезен при асинхронном получении сообщений, например при вызове функции параллелизма::asend или при подключении блока сообщений к другим блокам сообщений.

[В начало]

Полный пример

В следующем примере показано полное определение priority_buffer класса.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

В следующем примере параллельно выполняется несколько операций asend параллелизма ::receive в объекте priority_buffer .

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

В этом примере создается следующий пример выходных данных.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

priority_buffer Класс упорядочивает сообщения сначала по приоритету, а затем по порядку, в котором он получает сообщения. В этом примере сообщения с более высоким числовым приоритетом вставляются в передней части очереди.

[В начало]

Компиляция кода

Скопируйте пример кода и вставьте его в проект Visual Studio или вставьте определение priority_buffer класса в файл с именем priority_buffer.h и тестовой программой в файле, который называется priority_buffer.cpp , а затем выполните следующую команду в окне командной строки Visual Studio.

cl.exe /EHsc priority_buffer.cpp

См. также

Пошаговые руководства по среде выполнения с параллелизмом
Асинхронные блоки сообщений
Функции передачи сообщений