Sdílet prostřednictvím


Návod: Vytvoření vlastního bloku zpráv

Tento dokument popisuje, jak vytvořit vlastní typ bloku zpráv, který bude řadit příchozí zprávy podle priority.

I když integrované typy bloků zpráv poskytují širokou škálu funkcí, můžete vytvořit vlastní typ bloku zpráv a přizpůsobit ho tak, aby splňoval požadavky vaší aplikace. Popis předdefinovaných typů bloků zpráv, které poskytuje knihovna asynchronních agentů, naleznete v tématu Asynchronní bloky zpráv.

Požadavky

Před zahájením tohoto návodu si přečtěte následující dokumenty:

Oddíly

Tento názorný postup obsahuje následující části:

Návrh vlastního bloku zpráv

Bloky zpráv se účastní odesílání a přijímání zpráv. Blok zpráv, který odesílá zprávy, se označuje jako zdrojový blok. Blok zpráv, který přijímá zprávy, se označuje jako cílový blok. Blok zpráv, který odesílá i přijímá zprávy, se označuje jako blok šíření. Knihovna agentů používá abstraktní třídu concurrency::ISource k reprezentaci zdrojových bloků a abstraktní třídy concurrency::ITarget k reprezentaci cílových bloků. Typy bloku zpráv, které fungují jako zdroje, odvozeny od ISource; typy bloku zpráv, které fungují jako cíle odvozeny od ITarget.

I když můžete typ bloku zprávy odvodit přímo z ISource a ITarget, Knihovna Agents definuje tři základní třídy, které provádějí většinu funkcí, které jsou společné pro všechny typy bloků zpráv, například zpracování chyb a propojení bloků zpráv způsobem bezpečným způsobem souběžnosti. Concurrency ::source_block třída je odvozena a ISource odesílá zprávy do jiných bloků. Concurrency ::target_block třída je odvozena a ITarget přijímá zprávy z jiných bloků. Concurrency::p ropagator_block třída je odvozena a ISource ITarget odesílá zprávy do jiných bloků a přijímá zprávy z jiných bloků. Tyto tři základní třídy doporučujeme použít ke zpracování podrobností o infrastruktuře, abyste se mohli zaměřit na chování bloku zpráv.

source_block, target_blocka propagator_block třídy jsou šablony, které jsou parametrizovány u typu, který spravuje připojení nebo propojení mezi zdrojovými a cílovými bloky a typem, který spravuje způsob zpracování zpráv. Knihovna agentů definuje dva typy, které provádějí správu propojení, souběžnost::single_link_registry a souběžnost::multi_link_registry. Třída single_link_registry umožňuje propojení bloku zpráv s jedním zdrojem nebo s jedním cílem. Třída multi_link_registry umožňuje propojení bloku zpráv s více zdroji nebo několika cíli. Knihovna agentů definuje jednu třídu, která provádí správu zpráv, souběžnost::ordered_message_processor. Třída ordered_message_processor umožňuje blokům zpráv zpracovávat zprávy v pořadí, ve kterém je přijímá.

Pokud chcete lépe pochopit, jak bloky zpráv souvisejí s jejich zdroji a cíli, zvažte následující příklad. Tento příklad ukazuje deklaraci třídy concurrency::transformer .

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Třída transformer je odvozena z propagator_block, a proto funguje jako zdrojový blok i jako cílový blok. Přijímá zprávy typu _Input a odesílá zprávy typu _Output. Třída transformer určuje single_link_registry jako správce propojení pro všechny cílové bloky a multi_link_registry jako správce propojení pro všechny zdrojové bloky. transformer Objekt proto může mít až jeden cíl a neomezený počet zdrojů.

Třída odvozená od source_block musí implementovat šest metod: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message a resume_propagation. Třída, která je odvozena od target_block musí implementovat propagate_message metoda a může volitelně implementovat send_message metoda. Odvození z propagator_block funkce je funkčně ekvivalentní odvození z obou source_block a target_block.

Metoda propagate_to_any_targets je volána modulem runtime pro asynchronní nebo synchronní zpracování všech příchozích zpráv a šíření odchozích zpráv. Metoda accept_message je volána cílovými bloky pro příjem zpráv. Mnoho typů bloků zpráv, například unbounded_buffer, odesílat zprávy pouze do prvního cíle, který by ho obdržel. Proto přenese vlastnictví zprávy do cíle. Další typy bloků zpráv, jako je souběžnost::overwrite_buffer, nabízejí zprávy každému z cílových bloků. overwrite_buffer Proto vytvoří kopii zprávy pro každý z jeho cílů.

Metody reserve_message, consume_messagea release_messageresume_propagation metody umožňují, aby se bloky zpráv mohly účastnit rezervace zpráv. Cílové bloky volají metodu reserve_message , když se jim zobrazí zpráva, a musí si ji rezervovat pro pozdější použití. Jakmile cílový blok rezervuje zprávu, může zavolat metodu consume_message , která tuto zprávu spotřebuje, nebo metodu release_message zrušení rezervace. Stejně jako u accept_message této metody může implementace consume_message buď převést vlastnictví zprávy, nebo vrátit kopii zprávy. Jakmile cílový blok buď spotřebuje nebo uvolní rezervovanou zprávu, modul runtime volá metodu resume_propagation . Tato metoda obvykle pokračuje v šíření zpráv, počínaje další zprávou ve frontě.

Modul runtime volá metodu propagate_message , která asynchronně přenese zprávu z jiného bloku do aktuálního bloku. Metoda send_message se podobá propagate_message, s tím rozdílem, že synchronně místo asynchronně odešle zprávu do cílových bloků. Výchozí implementace send_message odmítne všechny příchozí zprávy. Modul runtime nevolá některou z těchto metod, pokud zpráva nepředá volitelnou funkci filtru přidruženou k cílovému bloku. Další informace ofiltrch

[Nahoře]

Definování třídy priority_buffer

Třída priority_buffer je vlastní typ bloku zprávy, který se bude řadit jako první příchozí zprávy podle priority a potom podle pořadí, ve kterém se zprávy přijímají. Třída priority_buffer se podobá souběžnosti::unbounded_buffer třída, protože obsahuje frontu zpráv, a také proto, že funguje jako zdroj i cílový blok zpráv a může mít více zdrojů i více cílů. Šíření unbounded_buffer zpráv se však zakládá pouze na pořadí, ve kterém přijímá zprávy ze svých zdrojů.

Třída priority_buffer přijímá zprávy typu std::tuple , které obsahují PriorityType a Type prvky. PriorityType odkazuje na typ, který obsahuje prioritu každé zprávy; Type odkazuje na datovou část zprávy. Třída priority_buffer odesílá zprávy typu Type. Třída priority_buffer také spravuje dvě fronty zpráv: std::p riority_queue objekt pro příchozí zprávy a objekt std::queue pro odchozí zprávy. Řazení zpráv podle priority je užitečné, když priority_buffer objekt přijímá více zpráv současně nebo když přijímá více zpráv před čtením všech zpráv příjemci.

Kromě sedmi metod, které třída odvozená z propagator_block musí implementovat, priority_buffer třída také přepíše link_target_notification a send_message metody. Třída priority_buffer také definuje dvě veřejné pomocné metody enqueue , a dequeuesoukromé pomocné metody, propagate_priority_order.

Následující postup popisuje, jak implementovat priority_buffer třídu.

Definice třídy priority_buffer

  1. Vytvořte soubor hlaviček jazyka C++ a pojmenujte ho priority_buffer.h. Alternativně můžete použít existující hlavičkový soubor, který je součástí projektu.

  2. Do priority_buffer.hpole přidejte následující kód.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. std V oboru názvů definujte specializace std::less a std::greater, které se chovají na objekty concurrency::message.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Třída priority_buffer ukládá message objekty do objektu priority_queue . Tyto specializace typu umožňují frontě priority řadit zprávy podle jejich priority. Priorita je první prvek objektu tuple .

  4. concurrencyex V oboru názvů deklarujte priority_buffer třídu.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    Třída priority_buffer je odvozena z propagator_block. Proto může odesílat i přijímat zprávy. Třída priority_buffer může mít více cílů, které přijímají zprávy typu Type. Může mít také více zdrojů, které odesílají zprávy typu tuple<PriorityType, Type>.

  5. private V části priority_buffer třídy přidejte následující členské proměnné.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Objekt uchovává příchozí zprávy. queue Objekt priority_queue uchovává odchozí zprávy. priority_buffer Objekt může přijímat více zpráv současně; critical_section objekt synchronizuje přístup do fronty vstupních zpráv.

  6. private V části definujte konstruktor kopírování a operátor přiřazení. Tím zabráníte priority_queue přiřazení objektů.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. public V části definujte konstruktory, které jsou společné pro mnoho typů bloků zpráv. Destruktor destruktor také destruktor destruktor definuje.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. public V části definujte metody enqueue a dequeue. Tyto pomocné metody poskytují alternativní způsob, jak odesílat zprávy a přijímat zprávy z objektu priority_buffer .

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. protected V části definujte metodupropagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Metoda propagate_to_any_targets přenese zprávu, která je před vstupní frontou, do výstupní fronty a rozšíří všechny zprávy ve výstupní frontě.

  10. protected V části definujte metoduaccept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Když cílový blok volá metodu accept_message , priority_buffer třída přenese vlastnictví zprávy do prvního cílového bloku, který ji přijme. (Podobá se chování unbounded_buffer.)

  11. protected V části definujte metodureserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Třída priority_buffer umožňuje cílovému bloku rezervovat zprávu, pokud zadaný identifikátor zprávy odpovídá identifikátoru zprávy, která je na přední straně fronty. Jinými slovy, cíl si může zprávu rezervovat, pokud priority_buffer objekt ještě nepřijal další zprávu a ještě nebylo rozšířeno o aktuální zprávu.

  12. protected V části definujte metoduconsume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Cílové blokové volání consume_message pro přenos vlastnictví zprávy, kterou si rezervovala.

  13. protected V části definujte metodurelease_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Cílová bloková volání release_message , která zruší rezervaci zprávy.

  14. protected V části definujte metoduresume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Volání modulu runtime resume_propagation po cílovém bloku buď spotřebuje, nebo uvolní rezervovanou zprávu. Tato metoda rozšíří všechny zprávy, které jsou ve výstupní frontě.

  15. protected V části definujte metodulink_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    Proměnná _M_pReservedFor člena je definována základní třídou , source_block. Tato proměnná člena odkazuje na cílový blok, pokud existuje, který má rezervaci na zprávu, která je na přední straně výstupní fronty. Modul runtime volá link_target_notification , když je nový cíl propojený s objektem priority_buffer . Tato metoda rozšíří všechny zprávy, které jsou ve výstupní frontě, pokud žádný cíl neudržuje rezervaci.

  16. private V části definujte metodupropagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Tato metoda rozšíří všechny zprávy z výstupní fronty. Každá zpráva ve frontě se nabízí každému cílovému bloku, dokud jeden z cílových bloků zprávu nepřijímá. Třída priority_buffer zachovává pořadí odchozích zpráv. Proto musí být první zpráva ve výstupní frontě přijata cílovým blokem předtím, než tato metoda nabídne jakékoli další zprávy cílovým blokům.

  17. protected V části definujte metodupropagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Metoda propagate_message umožňuje priority_buffer , aby třída fungovala jako příjemce zprávy nebo cíl. Tato metoda obdrží zprávu, která je nabízena zadaným zdrojovým blokem, a vloží tuto zprávu do fronty priority. Metoda propagate_message pak asynchronně odešle všechny výstupní zprávy do cílových bloků.

    Modul runtime volá tuto metodu při volání funkce concurrency::asend nebo při připojení bloku zprávy k jiným blokům zpráv.

  18. protected V části definujte metodusend_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Metoda send_message se podobá propagate_message. Ale odesílá výstupní zprávy synchronně místo asynchronně.

    Modul runtime volá tuto metodu během synchronní operace odesílání, například při volání funkce concurrency::send .

Třída priority_buffer obsahuje přetížení konstruktoru, které jsou typické v mnoha typech bloků zpráv. Některá přetížení konstruktoru mají souběžnost::Scheduler nebo concurrency::ScheduleGroup objekty, které umožňují správu bloku zpráv konkrétním plánovačem úloh. Jiné přetížení konstruktoru přebírají funkci filtru. Funkce filtrování umožňují blokům zpráv přijmout nebo odmítnout zprávu na základě datové části. Další informace ofiltrch Další informace o plánovači úloh naleznete v tématu Plánovač úloh.

Vzhledem k tomu, že priority_buffer třída objednává zprávy podle priority a pořadí, ve kterém jsou zprávy přijaty, je tato třída nejužitečnější, když přijímá zprávy asynchronně, například při volání souběžnosti::asend nebo když je blok zpráv připojený k jiným blokům zpráv.

[Nahoře]

Kompletní příklad

Následující příklad ukazuje úplnou definici priority_buffer třídy.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Následující příklad souběžně provádí řadu asend operací a souběžnost::receive operace u objektu priority_buffer .

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Tento příklad vytvoří následující ukázkový výstup.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

Třída priority_buffer objednává zprávy jako první podle priority a potom podle pořadí, ve kterém přijímá zprávy. V tomto příkladu se zprávy s větší číselnou prioritou vloží do přední části fronty.

[Nahoře]

Probíhá kompilace kódu

Zkopírujte ukázkový kód a vložte ho do projektu sady Visual Studio nebo vložte definici priority_buffer třídy do souboru s názvem priority_buffer.h a testovacím programem v souboru s názvem priority_buffer.cpp a potom v okně příkazového řádku sady Visual Studio spusťte následující příkaz.

cl.exe /EHsc priority_buffer.cpp

Viz také

Návody pro Concurrency Runtime
Asynchronní bloky zpráv
Funkce pro předávání zpráv