Sdílet prostřednictvím


Návod: Vytvoření vlastního bloku zpráv

Tento dokument popisuje, jak vytvořit typ bloku vlastní zprávy, který řadí příchozích zprávy podle priority.

Přestože typy bloků předdefinovaných zpráv poskytují celou řadu funkcí, můžete vytvořit vlastní typ bloku zpráv a přizpůsobit jej tak, aby splňoval požadavky aplikace.Popis typů bloků vestavěné zprávy, které jsou poskytovány asynchronní knihovnou agentů, viz Asynchronní bloky zpráv.

Požadavky

Před zahájením tohoto návodu si přečtěte následující dokumenty:

Oddíly

Tento návod obsahuje následující oddíly:

  • Návrh vlastního bloku zprávy

  • Definice třídy priority_buffer

  • Kompletní příklad

Návrh vlastního bloku zprávy

Bloky zpráv se účastní aktu odesílání a přijímání zpráv.Blok zpráv, který odesílá zprávy, se nazývá zdrojový blok.Blok zpráv, který přijímá zprávy, se nazývá cílový blok.Blok zpráv, který odesílá a přijímá zprávy, se nazývá propagátor.Knihovna agentů používá abstraktní třídu concurrency::ISource pro reprezentaci zdrojových bloků a abstraktní třídu concurrency::ITarget pro reprezentaci cílových bloků.Typy bloků zpráv, které se chovají jako zdroje, jsou odvozeny z ISource; typy bloků zpráv, které se chovají jako cíle, jsou odvozeny z ITarget.

Ačkoli lze odvodit váš typ bloku zprávy přímo z ISource a ITarget, knihovna agentů definuje tři základní třídy, které provádějí většinu funkcí, které jsou společné pro všechny typy bloků zpráv, například bloky zprávy o zpracování chyb a o připojení společně souběžným bezpečným způsobem.Třída concurrency::source_block je odvozena z ISource a odesílá zprávy jiným blokům.Třída concurrency::target_block je odvozena z ITarget a přijímá zprávy z jiných bloků.Třída concurrency::propagator_block je odvozena z ISource a ITarget a odesílá zprávy jiným blokům a přijímá zprávy od jiných bloků.Doporučujeme použít tyto tři základní třídy pro zpracování podrobností infrastrukturu tak, abyste se mohli zaměřit na chování vašeho bloku zpráv.

Třídy source_block, target_block a propagator_block jsou šablony, které jsou parametrizovány na typ, který spravuje připojení nebo propojení mezi zdrojovými a cílovými bloky, a na typ, který spravuje zpracování zpráv.Knihovna agentů definuje dva typy, které provádějí správu odkazů, concurrency::single_link_registry a concurrency::multi_link_registry.Třída single_link_registry umožňuje bloku zprávy být spojen s jedním zdrojem nebo jedním cílem.Třída multi_link_registry umožňuje bloku zprávy být spojen s více zdroji nebo více cíli.Knihovna agentů definuje jednu třídu, která se stará o správu zprávy, concurrency::ordered_message_processor.Třída ordered_message_processor umožňuje blokům zpráv zpracovávat zprávy v pořadí, ve kterém je obdrží.

Abyste lépe pochopili vztah bloků zpráv a jejich zdrojů a cílů, podívejte se na následující příklad.Tento příklad ukazuje deklaraci třídy concurrency::transformer.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Třída transformer je odvozena z propagator_block, a proto se chová jako zdrojový i cílový blok.Přijímá zprávy typu _Input a odesílá zprávy typu _Output.Třída transformer určuje single_link_registry jako správce odkazu pro všechny cílové bloky a multi_link_registry jako správce odkazu pro všechny zdrojové bloky.Proto objekt transformer může mít až jeden cíl a neomezený počet zdrojů.

Třída odvozená z source_block musí implementovat šest metod: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message a resume_propagation.Třída, která je odvozena z target_block musí implementovat metodu propagate_message a může volitelně implementovat metodu send_message.Odvození z propagator_block je funkce ekvivalentní odvození z source_block a target_block.

Metoda propagate_to_any_targets je volána modulem runtime pro synchronní nebo asynchronní zpracování všech příchozích zpráv a propagaci všech odchozích zpráv.Metoda accept_message je volána cílovými bloky pro příjem zpráv.Mnoho typů bloků zpráv jako unbounded_buffer odesílá zprávy pouze prvnímu cíli, který je přijme.Proto převede vlastnictví zprávy na cíl.Další typy bloků zpráv, jako například concurrency::overwrite_buffer, nabízejí zprávy každému svému cílovému bloku.Proto overwrite_buffer vytvoří kopii zprávy pro každý svůj cíl.

Metody reserve_message, consume_message, release_message a resume_propagation umožňují blokům zpráv účastnit se rezervace zpráv.Cílové bloky volají metodu reserve_message, když je jim nabídnuta zpráva a musí ji rezervovat pro pozdější použití.Poté, co cílový blok vyhradí zprávu, může zavolat metodu consume_message ke zpracování zprávy nebo metodu release_message ke zrušení rezervace.Stejně jako metoda accept_message, implementace consume_message může převést vlastnictví zprávy nebo vrátit kopii zprávy.Poté, co cílový blok spotřebuje nebo uvolní vyhrazenou zprávu, modul runtime zavolá metodu resume_propagation.Tato metoda obvykle pokračuje v šíření zpráv, počínaje další zprávou ve frontě.

Modul runtime volá metodu propagate_message pro asynchronní přenos zprávy z jiného bloku do aktuálního.Metoda send_message se podobá metodě propagate_message s tím rozdílem, že synchronně, místo asynchronně, odesílá zprávu cílovým blokům.Výchozí implementace send_message odmítne všechny příchozí zprávy.Modul runtime nevolá ani jednu z těchto metod, pokud zpráva neprojde funkcí volitelného filtru, která je přidružena k cílovému bloku.Další informace o filtrech zpráv naleznete v tématu Asynchronní bloky zpráv.

[Nahoře]

Definice třídy priority_buffer

Třída priority_buffer je typu blok vlastní zprávy, která řadí příchozí zprávy nejprve podle priority a pak podle pořadí, ve kterém jsou zprávy přijímány.Třída priority_buffer se podobá třídě concurrency::unbounded_buffer, protože obsahuje frontu zpráv a také protože funguje jako zdrojový i cílový blok zpráv a může mít více zdrojů i více cílů.Nicméně unbounded_buffer zakládá šíření zprávy pouze na pořadí, ve kterém přijímá zprávy ze zdrojů.

Třída priority_buffer přijímá zprávy typu std::tuple obsahující prvky PriorityType a Type.PriorityType odkazuje na typ, který má prioritu každé zprávy; Type se vztahuje na datovou část zprávy.Třída priority_buffer odesílá zprávy typu Type.Třída priority_buffer také spravuje dvě fronty zpráv: objekt std::priority_queue pro příchozí zprávy a objekt std::queue pro odchozí zprávy.Řazení zpráv podle priority je užitečné, když objekt priority_buffer přijímá více zpráv najednou nebo když přijímá více zpráv předtím než jsou nějaké zprávy přečteny spotřebitelem.

Kromě sedmi metod, které třída odvozená z propagator_block musí implementovat, třída priority_buffer také přepisuje metody link_target_notification a send_message.Třída priority_buffer definuje také dvě veřejné pomocné metody, enqueue a dequeue a soukromou pomocnou metodu propagate_priority_order.

Následující postup popisuje, jak implementovat třídu priority_buffer.

Definice třídy priority_buffer

  1. Vytvořte soubor hlaviček C++ a pojmenujte jej priority_buffer.h.Alternativně můžete použít existující soubor hlaviček, který je součástí projektu.

  2. V priority_buffer.h přidejte následující kód.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. V oboru názvů std definujte specializace z std::less a std::greater, které působí na objekty concurrency::message.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Třída priority_buffer ukládá objekty message do objektu priority_queue.Tyto specializace umožňují frontě priorit řadit zprávy podle jejich priority.Priorita je první prvek objektu tuple.

  4. V oboru názvů concurrencyex deklarujte třídu priority_buffer.

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    Třída priority_buffer vychází z propagator_block.Proto může jak odesílat, tak i přijímat zprávy.Třída priority_buffer může mít více cílů, které přijímají zprávy typu Type.Můžete mít také více zdrojů, které odesílají zprávy typu tuple<PriorityType, Type>.

  5. V sekci private třídy priority_buffer přidejte následující proměnné členů.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Objekt priority_queue obsahuje příchozí zprávy; objekt queue obsahuje odchozí zprávy.Objekt priority_buffer může přijmout více zpráv najednou; objekt critical_section synchronizuje přístup k frontě vstupních zpráv.

  6. V oddílu private definujte konstruktor kopie a operátor přiřazení.To zabrání objektům priority_queue v přiřaditelnosti.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. V oddílu public definujte konstruktory, které jsou společné pro mnoho typů bloků zprávy.Také definujte destruktor.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. V oddílu public definujte metody enqueue a dequeue.Tyto pomocné metody poskytují alternativní způsob odesílání a přijímání zpráv od objektu priority_buffer.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. V oddílu protected definujte metodu propagate_to_any_targets.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    Metoda propagate_to_any_targets převede zprávu, která je na začátku vstupní fronty do výstupní fronty a šíří všechny zprávy ve výstupní frontě.

  10. V oddílu protected definujte metodu accept_message.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Když cílový blok zavolá metodu accept_message, třída priority_buffer převede vlastnictví zprávy na první cílový blok, který ji přijímá. (To se podobá chování unbounded_buffer.)

  11. V oddílu protected definujte metodu reserve_message.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Třída priority_buffer umožňuje cílovému bloku vyhradit zprávu, když poskytnutý identifikátor zprávy odpovídá identifikátoru zprávy, která je na začátku fronty.Jinými slovy, cíl může rezervovat zprávu, pokud objekt priority_buffer dosud nepřijal další zprávu a nebyla dosud šířen mimo stávající.

  12. V oddílu protected definujte metodu consume_message.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    Cílový blok volá consume_message k převodu vlastnictví zprávy, která je vyhrazena.

  13. V oddílu protected definujte metodu release_message.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    Cíl bloku volá release_message pro zrušení rezervace zprávy.

  14. V oddílu protected definujte metodu resume_propagation.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    Modul runtime zavolá metodu resume_propagation poté, co cílový blok spotřebuje nebo uvolní vyhrazenou zprávu.Tato metoda šíří všechny zprávy , které jsou ve výstupní frontě.

  15. V oddílu protected definujte metodu link_target_notification.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    Členská proměnná _M_pReservedFor je definována základní třídou, source_block.Tato členská proměnná ukazuje na cílový blok, pokud existuje, který drží rezervaci na zprávu, která je na začátku výstupní fronty.Modulu runtime volá link_target_notification, když je připojen nový cíl k objektu priority_buffer.Tato metoda šíří všechny zprávy, které jsou ve výstupní frontě, pokud žádný cíl nedrží rezervaci.

  16. V oddílu private definujte metodu propagate_priority_order.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Tato metoda šíří všechny zprávy z výstupní fronty.Všechny zprávy ve frontě jsou nabízeny každému cílovému bloku, dokud jeden z cílových bloků zprávu nepřijme.Třída priority_buffer zachovává pořadí odchozích zpráv.Proto první zpráva ve výstupní frontě musí být přijata cílovým blokem, než tato metoda cílovým blokům nabídne jakoukoli jinou zprávu.

  17. V oddílu protected definujte metodu propagate_message.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    Metoda propagate_message umožňuje třídě priority_buffer chovat se jako příjemce zprávy neboli cíl.Tato metoda obdrží zprávu, která je nabízena poskytnutým zdrojovým blokem, a vloží ji do prioritní fronty.Metoda propagate_message pak asynchronně odešle všechny výstupní zprávy cílovým blokům.

    Modul runtime volá tuto metodu při volání funkce concurrency::asend nebo při připojení bloku zpráv k ostatním blokům zpráv.

  18. V oddílu protected definujte metodu send_message.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    Metoda send_message se podobá metodě propagate_message.Odešle však výstupní zprávy synchronně místo asynchronně.

    Modul runtime volá tuto metodu při odesílání synchronní operace, například při volání funkce concurrency::send.

Třída priority_buffer obsahuje přetížení konstruktoru, která jsou běžná v mnoha typech bloku zprávy.Některá přetížení konstruktoru přijímají objekty concurrency::Scheduler nebo concurrency::ScheduleGroup, které umožňují bloku zprávy být spravován určitým plánovačem úloh.Další přetížení konstruktoru přebírají funkci filtru.Funkce filtru povolují blokům zpráv přijmout nebo odmítnout zprávu na základě její datové části.Další informace o filtrech zpráv naleznete v tématu Asynchronní bloky zpráv.Další informace o plánovačích úkolů naleznete v tématu Plánovač úloh (Concurrency Runtime).

Protože třída priority_buffer seřadí zprávy podle priority a pak podle pořadí, ve kterém jsou zprávy přijímány, tato třída je nejužitečnější, když obdrží zprávy asynchronně, například při volání funkce concurrency::asend nebo když je blok zpráv připojen k jinému bloku zpráv.

[Nahoře]

Kompletní příklad

Následující příklad ukazuje úplnou definici třídy priority_buffer.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Následující příklad provádí souběžně několik operací asend a concurrency::receive na objektu priority_buffer.

// priority_buffer.cpp 
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h" 

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations 
   // on a priority_buffer object.

   priority_buffer<int> pb;

   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Tento příklad vytvoří následující vzorový výstup.

  

Třída priority_buffer seřadí zprávy nejprve podle priority a pak podle pořadí, ve kterém zprávy příjme.V tomto příkladě jsou zprávy s vyšší číselnou prioritou vloženy směrem k začátku fronty.

[Nahoře]

Probíhá kompilace kódu

Zkopírovat ukázkový kód a vložit jej do projektu sady Visual Studio nebo vložit definici třídy priority_buffer do souboru s názvem priority_buffer.h a testovací program do souboru s názvem priority_buffer.cpp a potom spustit následující příkaz v okně Příkazový řádek Visual Studio.

cl.exe /EHsc priority_buffer.cpp

Viz také

Koncepty

Asynchronní bloky zpráv

Funkce usnadnění

Další zdroje

Návody k Concurrency Runtime