다음을 통해 공유


연습: 사용자 지정 메시지 블록 만들기

이 문서에서는 들어오는 메시지를 우선 순위별로 정리하는 사용자 지정 메시지 블록 형식을 만드는 방법에 대해 설명합니다.

기본 제공 메시지 블록 형식이 광범위한 기능을 제공하지만 사용자 고유 메시지 블록 형식을 만들고 해당 형식을 응용 프로그램의 요구 사항에 맞게 사용자 지정할 수 있습니다. 비동기 에이전트 라이브러리에서 제공하는 기본 제공 메시지 블록 형식에 대한 설명은 비동기 메시지 블록을 참조하십시오.

사전 요구 사항

이 연습을 시작하기 전에 다음 문서를 읽어 보십시오.

단원

이 연습에는 다음 단원이 포함되어 있습니다.

  • 사용자 지정 메시지 블록 디자인

  • priority_buffer 클래스 정의

  • 전체 예제

사용자 지정 메시지 블록 디자인

메시지 블록은 메시지를 보내고 받는 동작에 관여합니다. 메시지를 보내는 메시지 블록을 소스 블록이라고 하고, 메시지를 받는 메시지 블록을 대상 블록이라고 합니다. 메시지를 보내고 받는 메시지 블록은 전파자 블록이라고 합니다. 에이전트 라이브러리에서는 추상 클래스 Concurrency::ISource를 사용하여 소스 블록을 나타내고 추상 클래스 Concurrency::ITarget을 사용하여 대상 블록을 나타냅니다. 소스로 사용되는 메시지 블록 형식은 ISource에서 파생되고 대상으로 사용되는 메시지 블록 형식은 ITarget에서 파생됩니다.

ISourceITarget에서 직접 메시지 블록 형식을 파생할 수 있지만 에이전트 라이브러리에서는 모든 메시지 블록 형식에 공통된 많은 기능(예: 오류 처리 및 메시지 블록을 동시성이 보장되는 방식으로 함께 연결)을 수행하는 세 가지 기본 클래스를 정의합니다. Concurrency::source_block 클래스는 ISource에서 파생되고 메시지를 다른 블록에 보냅니다. Concurrency::target_block 클래스는 ITarget에서 파생되고 다른 블록에서 메시지를 받습니다. Concurrency::propagator_block 클래스는 ISourceITarget에서 파생되고 메시지를 다른 블록에 보내고 다른 블록에서 메시지를 받습니다. 메시지 블록의 동작에 집중할 수 있도록 이러한 세 가지 기본 클래스를 사용하여 인프라 정보를 처리하는 것이 좋습니다.

source_block, target_blockpropagator_block 클래스는 소스 블록과 대상 블록 간 링크 또는 연결을 관리하는 형식 및 메시지의 처리 방식을 관리하는 형식에 대해 매개 변수화된 템플릿입니다. 에이전트 라이브러리에서는 링크 관리를 수행하는 두 가지 형식인 Concurrency::single_link_registryConcurrency::multi_link_registry를 정의합니다. single_link_registry 클래스는 하나의 메시지 블록을 하나의 소스나 하나의 대상에 연결할 수 있도록 설정합니다. multi_link_registry 클래스는 하나의 메시지 블록을 여러 개의 소스나 여러 개의 대상에 연결할 수 있도록 설정합니다. 에이전트 라이브러리에서는 메시지 관리를 수행하는 클래스인 Concurrency::ordered_message_processor를 정의합니다. ordered_message_processor 클래스는 메시지 블록에서 메시지를 받는 순서대로 처리할 수 있도록 설정합니다.

메시지 블록이 소스 및 대상과 관련되는 방식을 더 자세히 알아보려면 다음 예제를 살펴보십시오. 이 예제에서는 Concurrency::transformer 클래스의 선언을 보여 줍니다.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

transformer 클래스는 propagator_block에서 파생되므로 소스 블록과 함께 대상 블록 역할을 합니다. 이 클래스는 _Input 형식의 메시지를 수락하고 _Output 형식의 메시지를 보냅니다. transformer 클래스는 single_link_registry를 대상 블록의 링크 관리자로 지정하고 multi_link_registry를 소스 블록의 링크 관리자로 지정합니다. 따라서 transformer 개체에 대상은 하나, 소스는 개수에 제한 없이 포함할 수 있습니다.

source_block에서 파생되는 클래스는 6개의 메서드인 propagate_to_any_targets, accept_message, reserve_message, consume_message, release_messageresume_propagation을 구현해야 합니다. target_block에서 파생되는 클래스는 propagate_message 메서드를 구현해야 하며 필요에 따라 send_message 메서드를 구현할 수 있습니다. propagator_block에서 파생하는 것은 source_blocktarget_block에서 파생하는 것과 동일한 기능입니다.

propagate_to_any_targets 메서드는 런타임에 호출되어 동기적 또는 비동기적으로 들어오는 메시지를 처리하고 보내는 메시지를 전파합니다. accept_message 메서드는 대상 블록에서 호출되어 메시지를 수락합니다. unbounded_buffer와 같은 대부분의 메시지 블록 형식은 메시지를 받을 첫 번째 대상에만 메시지를 보냅니다. 따라서 메시지의 소유권이 대상으로 이전됩니다. Concurrency::overwrite_buffer와 같은 기타 메시지 블록 형식은 각 대상 블록에 메시지를 제공합니다. 따라서 overwrite_buffer는 각 대상을 위한 메시지의 복사본을 만듭니다.

reserve_message, consume_message, release_messageresume_propagation 메서드는 메시지 블록이 메시지 예약에 참여할 수 있도록 설정합니다. 대상 블록이 제공된 메시지를 나중에 사용하도록 예약해야 하는 경우 대상 블록에서 reserve_message 메서드를 호출합니다. 대상 블록은 메시지를 예약한 후 consume_message 메서드를 호출하여 해당 메시지를 사용하거나 release_message 메서드를 사용하여 예약을 취소할 수 있습니다. accept_message 메서드와 마찬가지로 consume_message의 구현을 통해 메시지의 소유권을 이전하거나 메시지 복사본을 반환할 수 있습니다. 대상 블록이 예약된 메시지를 사용하거나 해제한 후 런타임에서 resume_propagation 메서드를 호출합니다. 일반적으로 이 메서드는 큐의 다음 메시지부터 시작하여 계속해서 메시지를 전파합니다.

런타임에서 propagate_message 메서드를 호출하여 다른 블록의 메시지를 현재 블록으로 비동기적으로 전송합니다. send_message 메서드는 메시지를 비동기적이 아닌 동기적으로 대상 블록에 보낸다는 점을 제외하면 propagate_message와 비슷합니다. send_message의 기본 구현에서는 들어오는 모든 메시지를 거부합니다. 메시지가 대상 블록과 연결된 선택적 필터 함수를 전달하지 않으면 런타임에서 이러한 메서드를 호출하지 않습니다. 메시지 필터에 대한 자세한 내용은 비동기 메시지 블록을 참조하십시오.

[맨 위로 이동]

priority_buffer 클래스 정의

priority_buffer 클래스는 들어오는 메시지를 우선 순위별로 정리한 다음 메시지를 받는 순서대로 정리하는 사용자 지정 메시지 블록 형식입니다. priority_buffer 클래스는 메시지 큐를 포함하고 있고 소스 및 대상 메시지 블록 역할을 하며 여러 소스 및 여러 대상을 포함할 수 있으므로 Concurrency::unbounded_buffer 클래스와 비슷합니다. 그러나 unbounded_buffer는 메시지를 전파할 때 소스에서 메시지를 받는 순서로만 전파합니다.

priority_buffer 클래스는 PriorityType 및 Type 요소가 들어 있는 std::tuple 형식의 메시지를 받습니다. PriorityType은 각 메시지의 우선 순위를 포함하고 있는 형식을 말하고 Type은 메시지의 데이터 부분을 말합니다. priority_buffer 클래스는 Type 형식의 메시지를 보냅니다. 또한 priority_buffer 클래스는 들어오는 메시지에 대한 std::priority_queue 개체와 보내는 메시지에 대한 std::queue 개체라는 두 가지 메시지 큐를 관리합니다. 메시지를 우선 순위별로 정리하는 것은 priority_buffer 개체가 여러 메시지를 동시에 받거나 사용자가 메시지를 읽기 전에 여러 메시지를 받는 경우에 유용합니다.

priority_buffer 클래스는 propagator_block에서 파생되는 클래스가 구현해야 하는 7개의 메서드 외에도 link_target_notificationsend_message 메서드를 재정의합니다. 또한 priority_buffer 클래스는 enqueuedequeue라는 두 개의 공용 도우미 메서드를 정의하고 propagate_priority_order라는 개인 도우미 메서드를 정의합니다.

다음 절차에서는 priority_buffer 클래스를 구현하는 방법에 대해 설명합니다.

priority_buffer 클래스를 정의하려면

  1. C++ 헤더 파일을 만들고 이름을 priority_buffer.h로 지정합니다. 또는 프로젝트에 속한 기존 헤더 파일을 사용할 수 있습니다.

  2. priority_buffer.h에서 다음 코드를 추가합니다.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std 네임스페이스에서 Concurrency::message 개체에 대해 작동하는 std::lessstd::greater의 특수화를 정의합니다.

    namespace std 
    {
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
    {  
       typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator< to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) < get<0>(right->payload));
       }
    };
    
    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
    {  
       typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
       bool operator()(const MessageType* left, const MessageType* right) const
       {  
          // apply operator> to the first element (the priority) 
          // of the tuple payload.
          return (get<0>(left->payload) > get<0>(right->payload));
       }
    };
    
    }
    

    priority_buffer 클래스는 message 개체를 priority_queue 개체에 저장합니다. 이러한 형식의 특수화를 사용하면 우선 순위 큐가 메시지의 우선 순위에 따라 메시지를 정렬할 수 있습니다. 우선 순위는 tuple 개체의 첫 번째 요소입니다.

  4. Concurrency 네임스페이스에서 priority_buffer 클래스를 선언합니다.

    namespace Concurrency 
    {
    template<class Type, 
             typename PriorityType = int,
             typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : 
       public propagator_block<multi_link_registry<ITarget<Type>>,
                               multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
    protected:
    private:
    };
    }
    

    priority_buffer 클래스는 propagator_block에서 파생됩니다. 따라서 이 클래스에서는 메시지를 보내고 받을 수 있습니다. priority_buffer 클래스는 Type 형식의 메시지를 받는 여러 대상을 포함할 수 있습니다. 또한 tuple<PriorityType, Type> 형식의 메시지를 보내는 여러 소스도 포함할 수 있습니다.

  5. priority_buffer 클래스의 private 섹션에서 다음 멤버 변수를 추가합니다.

    // Stores incoming messages. 
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
       message<_Source_type>*, 
       std::vector<message<_Source_type>*>, 
       Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<message<_Target_type>*> _output_messages;
    

    priority_queue 개체는 들어오는 메시지를 포함하고 queue 개체는 보내는 메시지를 포함합니다. priority_buffer 개체는 여러 메시지를 동시에 받을 수 있고 critical_section 개체는 입력 메시지 큐에 대한 액세스를 동기화합니다.

  6. private 섹션에서 복사 생성자와 할당 연산자를 정의합니다. 그러면 priority_queue 개체를 할당할 수 없게 됩니다.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public 섹션에서 여러 메시지 블록 형식에 공통되는 생성자를 정의합니다. 그리고 소멸자도 정의합니다.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {       
       initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
       initialize_source_and_target();
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler)
    {
       initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(Scheduler& scheduler, filter_method const& filter) 
    {
       initialize_source_and_target(&scheduler);
       register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group)
    {
       initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
    {
       initialize_source_and_target(NULL, &schedule_group);
       register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
       // Remove all links.
       remove_network_links();
    }
    
  8. public 섹션에서 enqueuedequeue 메서드를 정의합니다. 이 도우미 메서드는 priority_buffer 개체에 메시지를 보내고 해당 개체에서 메시지를 받을 수 있는 다른 방법을 제공합니다.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
      return Concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
      return receive<Type>(this);
    }
    
  9. protected 섹션에서 propagate_to_any_targets 메서드를 정의합니다.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(message<_Target_type>*)
    {
       // Retrieve the message from the front of the input queue.
       message<_Source_type>* input_message = NULL;
       {
          critical_section::scoped_lock lock(_input_lock);
          if (_input_messages.size() > 0)
          {
             input_message = _input_messages.top();
             _input_messages.pop();
          }
       }
    
       // Move the message to the output queue.
       if (input_message != NULL)
       {
          // The payload of the output message does not contain the 
          // priority of the message.
          message<_Target_type>* output_message = 
             new message<_Target_type>(get<1>(input_message->payload));
          _output_messages.push(output_message);
    
          // Free the memory for the input message.
          delete input_message;
    
          // Do not propagate messages if the new message is not the head message.
          // In this case, the head message is reserved by another message block.
          if (_output_messages.front()->msg_id() != output_message->msg_id())
          {
             return;
          }
       }
    
       // Propagate out the output messages.
       propagate_priority_order();
    }
    

    propagate_to_any_targets 메서드는 입력 큐의 앞에 있는 메시지를 출력 큐로 전송하고 출력 큐의 모든 메시지를 전파합니다.

  10. protected 섹션에서 accept_message 메서드를 정의합니다.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
    {        
       message<_Target_type>* message = NULL;
    
       // Transfer ownership if the provided message identifier matches
       // the identifier of the front of the output message queue.
       if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
       {
          message = _output_messages.front();            
          _output_messages.pop();
       }
    
       return message;
    }
    

    대상 블록에서 accept_message 메서드를 호출하면 priority_buffer 클래스가 메시지 소유권을 메시지를 수락하는 첫 번째 대상 블록으로 이전합니다. 이 동작은 unbounded_buffer의 동작과 비슷합니다.

  11. protected 섹션에서 reserve_message 메서드를 정의합니다.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(runtime_object_identity msg_id)
    {
       // Allow the message to be reserved if the provided message identifier
       // is the message identifier of the front of the message queue.
       return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
    }
    

    priority_buffer 클래스는 제공된 메시지 식별자가 큐의 앞에 있는 메시지의 식별자와 일치하는 경우 대상 블록에서 메시지를 예약하도록 허용합니다. 즉, priority_buffer 개체가 아직 추가 메시지를 받지 않았고 현재 메시지를 전파하지 않은 경우 대상에서 메시지를 예약할 수 있습니다.

  12. protected 섹션에서 consume_message 메서드를 정의합니다.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual message<Type>* consume_message(runtime_object_identity msg_id)
    {
       // Transfer ownership of the message to the caller.
       return accept_message(msg_id);
    }
    

    대상 블록에서 consume_message를 호출하여 예약한 메시지의 소유권을 이전합니다.

  13. protected 섹션에서 release_message 메서드를 정의합니다.

    // Releases a previous message reservation.
    virtual void release_message(runtime_object_identity msg_id)
    {
       // The head message must be the one that is reserved. 
       if (_output_messages.empty() || 
           _output_messages.front()->msg_id() != msg_id)
       {
          throw message_not_found();
       }
    }
    

    대상 블록에서 release_message를 호출하여 메시지에 대한 예약을 취소합니다.

  14. protected 섹션에서 resume_propagation 메서드를 정의합니다.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
       // Propagate out any messages in the output queue.
       if (_output_messages.size() > 0)
       {
          async_send(NULL);
       }
    }
    

    대상 블록이 예약된 메시지를 사용하거나 해제한 후 런타임에서 resume_propagation 메서드를 호출합니다. 이 메서드는 출력 큐에 있는 모든 메시지를 전파합니다.

  15. protected 섹션에서 link_target_notification 메서드를 정의합니다.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(ITarget<_Target_type>*)
    {
       // Do not propagate messages if a target block reserves
       // the message at the front of the queue.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out any messages that are in the output queue.
       propagate_priority_order();
    }
    

    _M_pReservedFor 멤버 변수는 기본 클래스인 source_block에 의해 정의됩니다. 이 멤버 변수는 출력 큐의 앞에 있는 메시지에 대한 예약을 포함하고 있는 대상 블록(있는 경우)을 가리킵니다. 새 대상을 priority_buffer 개체에 연결하면 런타임에서 link_target_notification을 호출합니다. 이 메서드는 대상에 예약이 포함되어 있지 않는 경우 출력 큐에 있는 모든 메시지를 전파합니다.

  16. private 섹션에서 propagate_priority_order 메서드를 정의합니다.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
       // Cancel propagation if another block reserves the head message.
       if (_M_pReservedFor != NULL)
       {
          return;
       }
    
       // Propagate out all output messages. 
       // Because this block preserves message ordering, stop propagation
       // if any of the messages are not accepted by a target block.
       while (!_output_messages.empty())
       {
          // Get the next message.
          message<_Target_type> * message = _output_messages.front();
    
          message_status status = declined;
    
          // Traverse each target in the order in which they are connected.
          for (target_iterator iter = _M_connectedTargets.begin(); 
               *iter != NULL; 
               ++iter)
          {
             // Propagate the message to the target.
             ITarget<_Target_type>* target = *iter;
             status = target->propagate(message, this);
    
             // If the target accepts the message then ownership of message has 
             // changed. Do not propagate this message to any other target.
             if (status == accepted)
             {
                break;
             }
    
             // If the target only reserved this message, we must wait until the 
             // target accepts the message.
             if (_M_pReservedFor != NULL)
             {
                break;
             }
          }
    
          // If status is anything other than accepted, then the head message
          // was not propagated out. To preserve the order in which output 
          // messages are propagated, we must stop propagation until the head 
          // message is accepted.
          if (status != accepted)
          {
              break;
          }          
       }
    }
    

    이 메서드는 출력 큐에 있는 모든 메시지를 전파합니다. 대상 블록 중 하나가 메시지를 수락할 때까지 큐의 모든 메시지는 모든 대상 블록에 제공됩니다. priority_buffer 클래스는 보내는 메시지의 순서를 유지합니다. 따라서 이 메서드가 다른 메시지를 대상 블록에 제공하기 전에 대상 블록에서 출력 큐의 첫 번째 메시지를 수락해야 합니다.

  17. protected 섹션에서 propagate_message 메서드를 정의합니다.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual message_status propagate_message(message<_Source_type>* message, 
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Asynchronously send the message to the target blocks.
          async_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    propagate_message 메서드를 사용하면 priority_buffer 클래스를 메시지 수신자 또는 대상으로 사용할 수 있습니다. 이 메서드는 제공된 소스 블록에서 제공하는 메시지를 받고 해당 메시지를 우선 순위 큐에 삽입합니다. 그런 다음 propagate_message 메서드는 모든 출력 메시지를 대상 블록으로 비동기적으로 보냅니다.

    Concurrency::asend 함수를 호출 때 또는 메시지 블록이 다른 메시지 블록에 연결되어 있을 때 런타임에서 이 메서드를 호출합니다.

  18. protected 섹션에서 send_message 메서드를 정의합니다.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual message_status send_message(message<_Source_type>* message,
       ISource<_Source_type>* source)
    {
       // Accept the message from the source block.
       message = source->accept(message->msg_id(), this);
    
       if (message != NULL)
       {
          // Insert the message into the input queue. The type parameter Pr
          // defines how to order messages by priority.
          {
             critical_section::scoped_lock lock(_input_lock);
             _input_messages.push(message);
          }
    
          // Synchronously send the message to the target blocks.
          sync_send(NULL);
          return accepted;
       }
       else
       {
          return missed;
       }      
    }
    

    send_message 메서드는 propagate_message와 비슷합니다. 그러나 이 메서드는 출력 메시지를 비동기적이 아닌 동기적으로 보냅니다.

    Concurrency::send 함수를 호출할 때와 같이 동기 보내기 작업 중에 런타임에서 이 메서드를 호출합니다.

priority_buffer 클래스에는 여러 메시지 블록 형식에 일반적인 생성자 오버로드가 포함되어 있습니다. 일부 생성자 오버로드는 특정 작업 스케줄러에서 메시지 블록을 관리할 수 있게 하는 Concurrency::Scheduler 또는 Concurrency::ScheduleGroup 개체를 사용합니다. 다른 생성자 오버로드는 필터 함수를 사용합니다. 필터 함수를 사용하면 메시지 블록이 해당 페이로드를 기준으로 메시지를 수락하거나 거부하도록 할 수 있습니다. 메시지 필터에 대한 자세한 내용은 비동기 메시지 블록을 참조하십시오. 작업 스케줄러에 대한 자세한 내용은 작업 스케줄러(동시성 런타임)를 참조하십시오.

priority_buffer 클래스는 메시지를 우선 순위별로 정리한 다음 메시지를 받은 순서대로 정리하므로 이 클래스는 Concurrency::asend 함수를 호출할 때 또는 메시지 블록이 다른 메시지 블록에 연결되어 있을 때와 같이 메시지를 비동기적으로 받는 경우 가장 유용합니다.

[맨 위로 이동]

전체 예제

다음 예제에서는 priority_buffer 클래스의 전체 정의를 보여 줍니다.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
// A specialization of less that tests whether the priority element of a 
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<Concurrency::message<tuple<PriorityType,Type>>*> 
{  
   typedef Concurrency::message<tuple<PriorityType, Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator< to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) < get<0>(right->payload));
   }
};

// A specialization of less that tests whether the priority element of a 
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<Concurrency::message<tuple<PriorityType, Type>>*> 
{  
   typedef Concurrency::message<std::tuple<PriorityType,Type>> MessageType;

   bool operator()(const MessageType* left, const MessageType* right) const
   {  
      // apply operator> to the first element (the priority) 
      // of the tuple payload.
      return (get<0>(left->payload) > get<0>(right->payload));
   }
};

}

namespace Concurrency 
{
// A message block type that orders incoming messages first by priority, 
// and then by the order in which messages are received. 
template<class Type, 
         typename PriorityType = int,
         typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : 
   public propagator_block<multi_link_registry<ITarget<Type>>,
                           multi_link_registry<ISource<std::tuple<PriorityType, Type>>>>
{  
public:
   // Constructs a priority_buffer message block.
   priority_buffer() 
   {       
      initialize_source_and_target();
   }

   // Constructs a priority_buffer message block with the given filter function.
   priority_buffer(filter_method const& filter)
   {
      initialize_source_and_target();
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler)
   {
      initialize_source_and_target(&scheduler);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided Scheduler object to propagate messages.
   priority_buffer(Scheduler& scheduler, filter_method const& filter) 
   {
      initialize_source_and_target(&scheduler);
      register_filter(filter);
   }

   // Constructs a priority_buffer message block that uses the provided 
   // SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group)
   {
      initialize_source_and_target(NULL, &schedule_group);
   }

   // Constructs a priority_buffer message block with the given filter function 
   // and uses the provided SchedulerGroup object to propagate messages.
   priority_buffer(ScheduleGroup& schedule_group, filter_method const& filter)
   {
      initialize_source_and_target(NULL, &schedule_group);
      register_filter(filter);
   }

   // Destroys the message block.
   ~priority_buffer()
   {
      // Remove all links.
      remove_network_links();
   }

   // Sends an item to the message block.
   bool enqueue(Type const& item)
   {
     return Concurrency::asend<Type>(this, item);
   }

   // Receives an item from the message block.
   Type dequeue()
   {
     return receive<Type>(this);
   }

protected:
   // Asynchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::propagate.
   virtual message_status propagate_message(message<_Source_type>* message, 
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Asynchronously send the message to the target blocks.
         async_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Synchronously passes a message from an ISource block to this block.
   // This method is typically called by propagator_block::send.
   virtual message_status send_message(message<_Source_type>* message,
      ISource<_Source_type>* source)
   {
      // Accept the message from the source block.
      message = source->accept(message->msg_id(), this);

      if (message != NULL)
      {
         // Insert the message into the input queue. The type parameter Pr
         // defines how to order messages by priority.
         {
            critical_section::scoped_lock lock(_input_lock);
            _input_messages.push(message);
         }

         // Synchronously send the message to the target blocks.
         sync_send(NULL);
         return accepted;
      }
      else
      {
         return missed;
      }      
   }

   // Accepts a message that was offered by this block by transferring ownership
   // to the caller.
   virtual message<_Target_type>* accept_message(runtime_object_identity msg_id)
   {        
      message<_Target_type>* message = NULL;

      // Transfer ownership if the provided message identifier matches
      // the identifier of the front of the output message queue.
      if (!_output_messages.empty() && 
           _output_messages.front()->msg_id() == msg_id)
      {
         message = _output_messages.front();            
         _output_messages.pop();
      }

      return message;
   }

   // Reserves a message that was previously offered by this block.
   virtual bool reserve_message(runtime_object_identity msg_id)
   {
      // Allow the message to be reserved if the provided message identifier
      // is the message identifier of the front of the message queue.
      return (!_output_messages.empty() && 
               _output_messages.front()->msg_id() == msg_id);
   }

   // Transfers the message that was previously offered by this block 
   // to the caller. The caller of this method is the target block that 
   // reserved the message.
   virtual message<Type>* consume_message(runtime_object_identity msg_id)
   {
      // Transfer ownership of the message to the caller.
      return accept_message(msg_id);
   }

   // Releases a previous message reservation.
   virtual void release_message(runtime_object_identity msg_id)
   {
      // The head message must be the one that is reserved. 
      if (_output_messages.empty() || 
          _output_messages.front()->msg_id() != msg_id)
      {
         throw message_not_found();
      }
   }

   // Resumes propagation after a reservation has been released.
   virtual void resume_propagation()
   {
      // Propagate out any messages in the output queue.
      if (_output_messages.size() > 0)
      {
         async_send(NULL);
      }
   }

   // Notifies this block that a new target has been linked to it.
   virtual void link_target_notification(ITarget<_Target_type>*)
   {
      // Do not propagate messages if a target block reserves
      // the message at the front of the queue.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out any messages that are in the output queue.
      propagate_priority_order();
   }

   // Transfers the message at the front of the input queue to the output queue
   // and propagates out all messages in the output queue.
   virtual void propagate_to_any_targets(message<_Target_type>*)
   {
      // Retrieve the message from the front of the input queue.
      message<_Source_type>* input_message = NULL;
      {
         critical_section::scoped_lock lock(_input_lock);
         if (_input_messages.size() > 0)
         {
            input_message = _input_messages.top();
            _input_messages.pop();
         }
      }

      // Move the message to the output queue.
      if (input_message != NULL)
      {
         // The payload of the output message does not contain the 
         // priority of the message.
         message<_Target_type>* output_message = 
            new message<_Target_type>(get<1>(input_message->payload));
         _output_messages.push(output_message);

         // Free the memory for the input message.
         delete input_message;

         // Do not propagate messages if the new message is not the head message.
         // In this case, the head message is reserved by another message block.
         if (_output_messages.front()->msg_id() != output_message->msg_id())
         {
            return;
         }
      }

      // Propagate out the output messages.
      propagate_priority_order();
   }

private:

   // Propagates messages in priority order.
   void propagate_priority_order()
   {
      // Cancel propagation if another block reserves the head message.
      if (_M_pReservedFor != NULL)
      {
         return;
      }

      // Propagate out all output messages. 
      // Because this block preserves message ordering, stop propagation
      // if any of the messages are not accepted by a target block.
      while (!_output_messages.empty())
      {
         // Get the next message.
         message<_Target_type> * message = _output_messages.front();

         message_status status = declined;

         // Traverse each target in the order in which they are connected.
         for (target_iterator iter = _M_connectedTargets.begin(); 
              *iter != NULL; 
              ++iter)
         {
            // Propagate the message to the target.
            ITarget<_Target_type>* target = *iter;
            status = target->propagate(message, this);

            // If the target accepts the message then ownership of message has 
            // changed. Do not propagate this message to any other target.
            if (status == accepted)
            {
               break;
            }

            // If the target only reserved this message, we must wait until the 
            // target accepts the message.
            if (_M_pReservedFor != NULL)
            {
               break;
            }
         }

         // If status is anything other than accepted, then the head message
         // was not propagated out. To preserve the order in which output 
         // messages are propagated, we must stop propagation until the head 
         // message is accepted.
         if (status != accepted)
         {
             break;
         }          
      }
   }

private:

   // Stores incoming messages. 
   // The type parameter Pr specifies how to order messages by priority.
   std::priority_queue<
      message<_Source_type>*, 
      std::vector<message<_Source_type>*>, 
      Pr
   > _input_messages;

   // Synchronizes access to the input message queue.
   critical_section _input_lock;

   // Stores outgoing messages.
   std::queue<message<_Target_type>*> _output_messages;

private:
   // Hide assignment operator and copy constructor.
   priority_buffer const &operator =(priority_buffer const&);
   priority_buffer(priority_buffer const &);
};

}

다음 예제에서는 priority_buffer 개체 대한 여러 asendConcurrency::receive 작업을 동시에 수행합니다.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace Concurrency;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

이 예제를 실행하면 다음과 같은 샘플 결과가 출력됩니다.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

priority_buffer 클래스는 먼저 메시지를 우선 순위별로 정리한 다음 메시지를 받는 순서대로 정리합니다. 이 예제에서는 우선 순위가 높은 메시지가 큐의 앞쪽에 삽입됩니다.

[맨 위로 이동]

코드 컴파일

예제 코드를 복사하여 Visual Studio 프로젝트에 붙여 넣거나 priority_buffer 클래스의 정의를 priority_buffer.h 파일에 붙여 넣고 테스트 프로그램을 priority_buffer.cpp 파일에 붙여 넣은 다음 Visual Studio 2010 명령 프롬프트 창에서 다음 명령을 실행합니다.

cl.exe /EHsc priority_buffer.cpp

참고 항목

개념

동시성 런타임 연습

비동기 메시지 블록

메시지 전달 함수