Udostępnij za pośrednictwem


Wskazówki: tworzenie niestandardowego bloku komunikatów

W tym dokumencie opisano sposób tworzenia niestandardowego typu bloku komunikatów, który porządkuje komunikaty przychodzące według priorytetu.

Mimo że wbudowane typy bloków komunikatów zapewniają szeroką gamę funkcji, można utworzyć własny typ bloku komunikatów i dostosować go tak, aby spełniał wymagania aplikacji. Opis wbudowanych typów bloków komunikatów udostępnianych przez bibliotekę agentów asynchronicznych znajduje się w temacie Asynchroniczne bloki komunikatów.

Wymagania wstępne

Przed rozpoczęciem tego przewodnika zapoznaj się z następującymi dokumentami:

Sekcje

Ten przewodnik zawiera następujące sekcje:

Projektowanie niestandardowego bloku komunikatów

Bloki komunikatów uczestniczą w akcie wysyłania i odbierania komunikatów. Blok komunikatów, który wysyła komunikaty, jest nazywany blokiem źródłowym. Blok komunikatów odbierający komunikaty jest nazywany blokiem docelowym. Blok komunikatów, który zarówno wysyła, jak i odbiera komunikaty, jest nazywany blokiem propagacji. Biblioteka agentów używa abstrakcyjnej współbieżności klasy ::ISource do reprezentowania bloków źródłowych i abstrakcyjnej współbieżności klasy ::ITarget do reprezentowania bloków docelowych. Typy bloków komunikatów, które działają jako źródła pochodzące z ISource; typy bloków komunikatów, które działają jako obiekty docelowe pochodzące z ITarget.

Chociaż typ bloku komunikatów można uzyskać bezpośrednio z ISource elementów i ITarget, biblioteka agentów definiuje trzy klasy podstawowe, które wykonują większość funkcji, które są wspólne dla wszystkich typów bloków komunikatów, na przykład obsługi błędów i łączenia bloków komunikatów w bezpieczny sposób współbieżności. Klasa concurrency::source_block pochodzi z ISource i wysyła komunikaty do innych bloków. Klasa concurrency::target_block pochodzi z ITarget innych bloków i odbiera komunikaty z innych bloków. Klasa concurrency::p ropagator_block pochodzi z ISource i ITarget wysyła komunikaty do innych bloków i odbiera komunikaty z innych bloków. Zalecamy użycie tych trzech klas bazowych do obsługi szczegółów infrastruktury, aby można było skoncentrować się na zachowaniu bloku komunikatów.

Klasy source_block, target_blocki propagator_block to szablony, które są sparametryzowane w typie, który zarządza połączeniami lub łączami między blokami źródłowymi i docelowymi oraz typem, który zarządza sposobem przetwarzania komunikatów. Biblioteka agentów definiuje dwa typy, które wykonują zarządzanie łączami, współbieżność::single_link_registry i współbieżność::multi_link_registry. Klasa single_link_registry umożliwia połączenie bloku komunikatów z jednym źródłem lub z jednym obiektem docelowym. Klasa multi_link_registry umożliwia łączenie bloku komunikatów z wieloma źródłami lub wieloma obiektami docelowymi. Biblioteka agentów definiuje jedną klasę, która wykonuje zarządzanie komunikatami, współbieżność::ordered_message_processor. Klasa ordered_message_processor umożliwia blokowanie komunikatów przetwarzania komunikatów w kolejności ich odebrania.

Aby lepiej zrozumieć, jak bloki komunikatów odnoszą się do ich źródeł i celów, rozważ poniższy przykład. W tym przykładzie pokazano deklarację klasy concurrency::transformer .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Klasa transformer pochodzi z propagator_blockklasy , a zatem działa zarówno jako blok źródłowy, jak i jako blok docelowy. Akceptuje komunikaty typu _Input i wysyła komunikaty typu _Output. Klasa transformer określa single_link_registry jako menedżera linków dla dowolnych bloków docelowych i multi_link_registry jako menedżera linków dla dowolnych bloków źródłowych. transformer W związku z tym obiekt może mieć maksymalnie jeden obiekt docelowy i nieograniczoną liczbę źródeł.

Klasa pochodząca z source_block klasy musi implementować sześć metod: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message i resume_propagation. Klasa pochodząca z target_block klasy musi implementować metodę propagate_message i opcjonalnie może zaimplementować metodę send_message . Wyprowadzanie z propagator_block elementu jest funkcjonalnie równoważne wyprowadzeniu zarówno z , jak source_block i target_block.

Metoda propagate_to_any_targets jest wywoływana przez środowisko uruchomieniowe w celu asynchronicznego lub synchronicznego przetwarzania wszystkich przychodzących komunikatów i propagacji wszystkich komunikatów wychodzących. Metoda jest wywoływana accept_message przez bloki docelowe w celu akceptowania komunikatów. Wiele typów bloków komunikatów, takich jak unbounded_buffer, wysyła komunikaty tylko do pierwszego obiektu docelowego, który go otrzyma. W związku z tym przenosi własność komunikatu do elementu docelowego. Inne typy bloków komunikatów, takie jak współbieżność::overwrite_buffer, oferują komunikaty do każdego z bloków docelowych. overwrite_buffer W związku z tym tworzy kopię komunikatu dla każdego z jego obiektów docelowych.

Metody reserve_message, , release_messageconsume_messagei resume_propagation umożliwiają blokom komunikatów uczestnictwo w rezerwacji komunikatów. Bloki docelowe wywołają metodę reserve_message , gdy są one oferowane komunikat i muszą zarezerwować komunikat do późniejszego użycia. Po zarezerwowaniu komunikatu przez blok docelowy może wywołać consume_message metodę , aby użyć tego komunikatu lub release_message metody w celu anulowania rezerwacji. Podobnie jak w przypadku accept_message metody, implementacja consume_message polecenia może przenieść własność komunikatu lub zwrócić kopię komunikatu. Gdy blok docelowy zużywa lub zwalnia komunikat zarezerwowany, środowisko uruchomieniowe wywołuje metodę resume_propagation . Zazwyczaj ta metoda kontynuuje propagację komunikatów, zaczynając od następnego komunikatu w kolejce.

Środowisko uruchomieniowe wywołuje metodę propagate_message w celu asynchronicznego transferu komunikatu z innego bloku do bieżącego. Metoda send_message przypomina propagate_messagemetodę , z tą różnicą, że synchronicznie, a nie asynchronicznie, wysyła komunikat do bloków docelowych. Domyślna implementacja send_message odrzuca wszystkie komunikaty przychodzące. Środowisko uruchomieniowe nie wywołuje żadnej z tych metod, jeśli komunikat nie przekazuje opcjonalnej funkcji filtru skojarzonej z blokiem docelowym. Aby uzyskać więcej informacji na temat filtrów komunikatów, zobacz Asynchroniczne bloki komunikatów.

[Top]

Definiowanie klasy priority_buffer

Klasa priority_buffer jest niestandardowym typem bloku komunikatów, który porządkuje komunikaty przychodzące najpierw według priorytetu, a następnie według kolejności odbierania komunikatów. Klasa priority_buffer przypomina klasę concurrency::unbounded_buffer , ponieważ przechowuje kolejkę komunikatów, a także dlatego, że działa zarówno jako źródło, jak i docelowy blok komunikatów i może mieć zarówno wiele źródeł, jak i wiele obiektów docelowych. unbounded_buffer Jednak propagacja komunikatów opiera się tylko na kolejności, w której odbiera komunikaty ze swoich źródeł.

Klasa priority_buffer odbiera komunikaty typu std::tuple , które zawierają PriorityType elementy i Type . PriorityType odnosi się do typu, który ma priorytet każdego komunikatu; Type odnosi się do części danych komunikatu. Klasa priority_buffer wysyła komunikaty typu Type. Klasa priority_buffer zarządza również dwiema kolejkami komunikatów: obiekt std::p riority_queue dla komunikatów przychodzących i obiekt std::queue dla komunikatów wychodzących. Porządkowanie komunikatów według priorytetu jest przydatne, gdy priority_buffer obiekt odbiera wiele komunikatów jednocześnie lub gdy odbiera wiele komunikatów, zanim jakiekolwiek komunikaty będą odczytywane przez użytkowników.

Oprócz siedmiu metod, które klasa pochodząca z propagator_block klasy musi implementować, priority_buffer klasa również zastępuje link_target_notification metody i send_message . Klasa priority_buffer definiuje również dwie publiczne metody pomocnika i enqueue , i dequeuemetodę prywatnego pomocnika . propagate_priority_order

Poniższa procedura opisuje sposób implementowania priority_buffer klasy.

Aby zdefiniować klasę priority_buffer

  1. Utwórz plik nagłówka języka C++ i nadaj mu priority_buffer.hnazwę . Alternatywnie możesz użyć istniejącego pliku nagłówka, który jest częścią projektu.

  2. W priority_buffer.hpliku dodaj następujący kod.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. W przestrzeni nazw zdefiniuj std specjalizacje std::less i std::greater , które działają na obiektach concurrency::message .

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Klasa priority_buffer przechowuje message obiekty w priority_queue obiekcie. Specjalizacje tego typu umożliwiają kolejki priorytetów sortowanie komunikatów zgodnie z ich priorytetem. Priorytetem jest pierwszy element tuple obiektu.

  4. W przestrzeni nazw zadeklaruj concurrencyex klasę priority_buffer .

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    Klasa priority_buffer pochodzi z klasy propagator_block. W związku z tym może wysyłać i odbierać komunikaty. Klasa priority_buffer może mieć wiele obiektów docelowych, które odbierają komunikaty typu Type. Może również mieć wiele źródeł, które wysyłają komunikaty typu tuple<PriorityType, Type>.

  5. private W sekcji priority_buffer klasy dodaj następujące zmienne składowe.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Obiekt priority_queue przechowuje komunikaty przychodzące; queue obiekt przechowuje komunikaty wychodzące. Obiekt priority_buffer może odbierać wiele komunikatów jednocześnie; critical_section obiekt synchronizuje dostęp do kolejki komunikatów wejściowych.

  6. W sekcji zdefiniuj private konstruktor kopiujący i operator przypisania. priority_queue Zapobiega to przypisywaniu obiektów.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. W sekcji zdefiniuj public konstruktory wspólne dla wielu typów bloków komunikatów. Zdefiniuj również destruktor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. W sekcji zdefiniuj public metody enqueue i dequeue. Te metody pomocnicze zapewniają alternatywny sposób wysyłania komunikatów do obiektu i odbierania priority_buffer ich z obiektu.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. W sekcji zdefiniuj protected metodę propagate_to_any_targets .

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Metoda propagate_to_any_targets przenosi komunikat znajdujący się przed kolejką wejściową do kolejki wyjściowej i propaguje wszystkie komunikaty w kolejce wyjściowej.

  10. W sekcji zdefiniuj protected metodę accept_message .

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Gdy blok docelowy wywołuje metodę accept_message , priority_buffer klasa przenosi własność komunikatu do pierwszego bloku docelowego, który go akceptuje. (Przypomina to zachowanie . unbounded_buffer)

  11. W sekcji zdefiniuj protected metodę reserve_message .

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Klasa priority_buffer zezwala na zablokowanie docelowego bloku, aby zarezerwować komunikat, gdy podany identyfikator komunikatu jest zgodny z identyfikatorem komunikatu, który znajduje się przed kolejką. Innymi słowy, obiekt docelowy może zarezerwować komunikat, jeśli priority_buffer obiekt nie otrzymał jeszcze dodatkowego komunikatu i nie rozpropagował jeszcze bieżącego komunikatu.

  12. W sekcji zdefiniuj protected metodę consume_message .

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Blok docelowy wywołuje metodę consume_message przeniesienia własności komunikatu, który został zarezerwowany.

  13. W sekcji zdefiniuj protected metodę release_message .

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Docelowe wywołania release_message bloku, aby anulować rezerwację do komunikatu.

  14. W sekcji zdefiniuj protected metodę resume_propagation .

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Środowisko uruchomieniowe wywołuje resume_propagation po bloku docelowym zużywa lub zwalnia komunikat zarezerwowany. Ta metoda propaguje wszystkie komunikaty, które znajdują się w kolejce wyjściowej.

  15. W sekcji zdefiniuj protected metodę link_target_notification .

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    Zmienna _M_pReservedFor składowa jest definiowana przez klasę bazową . source_block Ta zmienna składowa wskazuje blok docelowy, jeśli istnieje, który przechowuje rezerwację do komunikatu, który znajduje się przed kolejką wyjściową. Środowisko uruchomieniowe jest wywoływane link_target_notification , gdy nowy obiekt docelowy jest połączony z obiektem priority_buffer . Ta metoda propaguje wszystkie komunikaty, które znajdują się w kolejce wyjściowej, jeśli żaden element docelowy nie przechowuje rezerwacji.

  16. W sekcji zdefiniuj private metodę propagate_priority_order .

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Ta metoda propaguje wszystkie komunikaty z kolejki wyjściowej. Każdy komunikat w kolejce jest oferowany do każdego bloku docelowego, dopóki jeden z bloków docelowych nie zaakceptuje komunikatu. Klasa priority_buffer zachowuje kolejność wychodzących komunikatów. W związku z tym pierwszy komunikat w kolejce wyjściowej musi zostać zaakceptowany przez blok docelowy, zanim ta metoda wyświetli dowolny inny komunikat do bloków docelowych.

  17. W sekcji zdefiniuj protected metodę propagate_message .

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Metoda propagate_message umożliwia priority_buffer klasie działanie jako odbiornik komunikatu lub element docelowy. Ta metoda odbiera komunikat oferowany przez podany blok źródłowy i wstawia ten komunikat do kolejki priorytetu. propagate_message Metoda następnie asynchronicznie wysyła wszystkie komunikaty wyjściowe do bloków docelowych.

    Środowisko uruchomieniowe wywołuje tę metodę podczas wywoływania funkcji concurrency::asend lub gdy blok komunikatów jest połączony z innymi blokami komunikatów.

  18. W sekcji zdefiniuj protected metodę send_message .

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Metoda send_message przypomina metodę propagate_message. Jednak wysyła komunikaty wyjściowe synchronicznie zamiast asynchronicznie.

    Środowisko uruchomieniowe wywołuje tę metodę podczas synchronicznej operacji wysyłania, na przykład podczas wywoływania funkcji concurrency::send .

Klasa priority_buffer zawiera przeciążenia konstruktora typowe dla wielu typów bloków komunikatów. Niektóre przeciążenia konstruktora przyjmują współbieżność::Scheduler lub współbieżność::ScheduleGroup obiekty, które umożliwiają zarządzanie blokiem komunikatów przez określony harmonogram zadań. Inne przeciążenia konstruktora przyjmują funkcję filtru. Funkcje filtrowania umożliwiają blokom komunikatów akceptowanie lub odrzucanie komunikatu na podstawie ładunku. Aby uzyskać więcej informacji na temat filtrów komunikatów, zobacz Asynchroniczne bloki komunikatów. Aby uzyskać więcej informacji na temat harmonogramów zadań, zobacz Harmonogram zadań.

priority_buffer Ponieważ klasy porządkują komunikaty według priorytetu, a następnie według kolejności odbierania komunikatów, ta klasa jest najbardziej przydatna, gdy odbiera komunikaty asynchronicznie, na przykład podczas wywoływania funkcji concurrency::asend lub gdy blok komunikatów jest połączony z innymi blokami komunikatów.

[Top]

Kompletny przykład

W poniższym przykładzie przedstawiono pełną definicję priority_buffer klasy.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Poniższy przykład współbieżnie wykonuje wiele asendoperacji współbieżności::receive na priority_buffer obiekcie.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

W tym przykładzie są generowane następujące przykładowe dane wyjściowe.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

Klasa priority_buffer porządkuje komunikaty najpierw według priorytetu, a następnie według kolejności, w której odbiera komunikaty. W tym przykładzie komunikaty z większym priorytetem liczbowym są wstawiane do przodu kolejki.

[Top]

Kompilowanie kodu

Skopiuj przykładowy kod i wklej go w projekcie programu Visual Studio lub wklej definicję priority_buffer klasy w pliku o nazwie i programie testowym w pliku o nazwie priority_buffer.hpriority_buffer.cpp , a następnie uruchom następujące polecenie w oknie wiersza polecenia programu Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Zobacz też

Środowisko uruchomieniowe współbieżności — wskazówki
Bloki komunikatów asynchronicznych
Funkcje przekazywania komunikatów