Megosztás a következőn keresztül:


Útmutató: Egyéni üzenetblokk létrehozása

Ez a dokumentum bemutatja, hogyan hozhat létre egyéni üzenetblokktípust, amely prioritás szerint rendeli meg a bejövő üzeneteket.

Bár a beépített üzenetblokk-típusok számos funkciót biztosítanak, létrehozhatja saját üzenetblokktípusát, és testre szabhatja azt az alkalmazás követelményeinek megfelelően. Az Aszinkron ügynökkódtár által biztosított beépített üzenetblokktípusok leírását az Aszinkron üzenetblokkok című témakörben talál.

Előfeltételek

Az útmutató megkezdése előtt olvassa el a következő dokumentumokat:

Szakaszok

Ez az útmutató a következő szakaszokat tartalmazza:

Egyéni üzenetblokk tervezése

Az üzenetblokkok részt vesznek az üzenetek küldésében és fogadásában. Az üzeneteket küldő üzenetblokkokat forrásblokknak nevezzük. Az üzeneteket fogadó üzenetblokkot célblokknak nevezzük. Az üzeneteket küldő és fogadó üzenetblokkokat propagátorblokknak nevezzük. Az Agents Library az absztrakt osztályt, concurrency::ISource-t használja a forrásblokkok reprezentálására, és az absztrakt osztályt, concurrency::ITarget-et a célblokkok reprezentálására. Azok az üzenetblokktípusok, amelyek forrásként működnek, a ISource-ból származnak; azok az üzenetblokktípusok, amelyek célként működnek, a ITarget-ből származnak.

Bár az üzenetblokk típusát közvetlenül a ISource és ITarget osztályokból is származtathatja, az Agents könyvtár három olyan alaposztályt határoz meg, amelyek elvégzik az összes üzenetblokktípusra jellemző funkciók nagy részét, például a hibák kezelését, és az üzenetblokkok biztonságos, egyidejű kapcsolódását biztosítják. Az egyidejűség::source_block osztály más blokkokból származik ISource , és üzeneteket küld. Az egyidejűség::target_block osztály az ITarget osztályból származik, és fogad üzeneteket más blokkokból. Az concurrency::propagator_block osztály származik ISource-ból és ITarget-ból, üzeneteket küld más blokkoknak, és üzeneteket fogad más blokkoktól. Javasoljuk, hogy ezt a három alaposztályt használja az infrastruktúra részleteinek kezeléséhez, hogy az üzenetblokk viselkedésére összpontosíthasson.

A source_block, target_blockés propagator_block osztályok olyan sablonok, amelyek olyan típusra vannak paraméterezve, amely kezeli a forrás- és célblokkok közötti kapcsolatokat vagy hivatkozásokat, valamint egy olyan típuson, amely kezeli az üzenetek feldolgozását. Az Agents Library két olyan típust határoz meg, amelyek linkkezelést végeznek: concurrency::single_link_registry és concurrency::multi_link_registry. Az single_link_registry osztály lehetővé teszi, hogy egy üzenetblokk egy forráshoz vagy egy célhoz legyen csatolva. Az multi_link_registry osztály lehetővé teszi, hogy egy üzenetblokk több forráshoz vagy több célhoz legyen csatolva. Az Ügynökök könyvtára egy olyan osztályt definiál, amely az üzenetek kezelésével foglalkozik, egyidejűség::ordered_message_processor. Az ordered_message_processor osztály lehetővé teszi az üzenetblokkok számára, hogy az üzeneteket abban a sorrendben dolgozzák fel, amelyben megkapják őket.

Az üzenetblokkok forrásukhoz és céljaikhoz való viszonyának jobb megértéséhez tekintse meg az alábbi példát. Ez a példa a párhuzamosság::transformer osztály deklarációját mutatja be.

template<
   class _Input,
   class _Output
>
class transformer : public propagator_block<
   single_link_registry<ITarget<_Output>>, 
   multi_link_registry<ISource<_Input>>
>;

Az transformer osztály a forrásblokkból propagator_blockszármazik, ezért forrásblokkként és célblokkként is működik. Elfogadja a típusú _Input üzeneteket, és típus típusú _Outputüzeneteket küld. Az transformer osztály single_link_registry-t adja meg hivatkozáskezelőként a célblokkokhoz és multi_link_registry-t hivatkozáskezelőként a forrásblokkokhoz. Ezért egy transformer objektum legfeljebb egy célpontot és korlátlan számú forrást tartalmazhat.

A származtatott source_block osztálynak hat metódust kell implementálnia: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message és resume_propagation. Az abból target_block származó osztálynak implementálnia kell a propagate_message metódust, és opcionálisan implementálnia kell a send_message metódust. A propagator_block-ból való származtatás funkcionálisan megegyezik a source_block és target_block származtatásával.

A propagate_to_any_targets futtatókörnyezet meghívja a metódust, hogy aszinkron vagy szinkron módon dolgozza fel a bejövő üzeneteket, és propagálja a kimenő üzeneteket. A accept_message metódust a célblokkok hívják meg az üzenetek elfogadásához. Számos üzenetblokktípus, például unbounded_buffercsak az első címzettnek küld üzeneteket. Ezért átadja az üzenet tulajdonjogát a célnak. Egyéb üzenetblokktípusok, mint például egyidejűség::overwrite_buffer, üzeneteket kínálnak minden célblokkjának. overwrite_buffer Ezért minden egyes célhoz másolatot készít az üzenetről.

A reserve_message, consume_message, release_messageés resume_propagation metódusok lehetővé teszik, hogy az üzenetblokkok részt vegyenek az üzenetfoglalásban. A célblokkok meghívják a reserve_message metódust, amikor üzenetet ajánlanak fel nekik, és az üzenetet későbbi használatra kell lefoglalniuk. Miután egy célblokk lefoglal egy üzenetet, meghívhatja a consume_message metódust az üzenet feldolgozására, vagy a release_message foglalás lemondására. A módszerhez accept_message hasonlóan a végrehajtás consume_message is átruházhatja az üzenet tulajdonjogát, vagy visszaküldheti az üzenet másolatát. Miután egy célblokk foglalt üzenetet használ vagy bocsát ki, a futtatókörnyezet meghívja a metódust resume_propagation . Ez a módszer általában folytatja az üzenetek propagálását, kezdve az üzenetsor következő üzenetével.

A futtatókörnyezet meghívja a propagate_message metódust, hogy aszinkron módon továbbítson egy üzenetet egy másik blokkból az aktuálisba. A send_message metódus hasonló a propagate_message-hez, azzal a különbséggel, hogy szinkron módon, és nem aszinkron módon küldi el az üzenetet a célblokkoknak. Az alapértelmezett implementáció elutasítja az send_message összes bejövő üzenetet. A futtatókörnyezet nem hívja meg egyik metódust sem, ha az üzenet nem adja át a célblokkhoz társított opcionális szűrőfüggvényt. További információ az üzenetszűrőkről: Aszinkron üzenetblokkok.

[Felső]

A priority_buffer osztály definiálása

Az priority_buffer osztály egy egyéni üzenetblokktípus, amely először prioritás, majd az üzenetek fogadási sorrendje szerint rendeli meg a bejövő üzeneteket. Az priority_buffer osztály hasonlít a concurrency::unbounded_buffer osztályhoz, mert üzenetsort tartalmaz, és azért is, mert forrás és cél üzenetblokként működik, és több forrása és több célja lehet. Az unbounded_buffer üzenetek propagálását azonban csak arra a sorrendre alapozza, amelyben üzeneteket fogad a forrásaitól.

Az priority_buffer osztály std::tuple típusú üzeneteket fogad, amelyek tartalmaznak PriorityType és Type elemeket. PriorityType az egyes üzenetek prioritását tartalmazó típusra hivatkozik; Type az üzenet adatrészére hivatkozik. Az priority_buffer osztály ilyen típusú Typeüzeneteket küld. Az priority_buffer osztály két üzenetsort is kezel: egy std::p riority_queue objektumot a bejövő üzenetekhez, és egy std::queue objektumot a kimenő üzenetekhez. Az üzenetek prioritás szerinti rendezése akkor hasznos, ha egy priority_buffer objektum egyszerre több üzenetet fogad, vagy ha több üzenetet kap, mielőtt a felhasználók felolvassák az üzeneteket.

Azon túlmenően, hogy egy propagator_block-ből származó osztálynak hét metódust kell implementálnia, a priority_buffer osztály felülbírálja a link_target_notification és send_message metódusokat is. Az priority_buffer osztály két nyilvános segédmetódust enqueue és dequeueegy magánsegítőmetódust propagate_priority_orderis definiál.

Az alábbi eljárás ismerteti, hogyan kell a priority_buffer osztályt implementálni.

A priority_buffer osztály definiálása

  1. Hozzon létre egy C++ fejlécfájlt, és nevezze el.priority_buffer.h Másik lehetőségként használhat egy meglévő fejlécfájlt is, amely a projekt része.

  2. Adja priority_buffer.hhozzá a következő kódot.

    #pragma once
    #include <agents.h>
    #include <queue>
    
  3. A std névtérben definiálja a std::less és std::greater specializációit, amelyek a concurrency::message objektumokon működnek.

    namespace std 
    {
        // A specialization of less that tests whether the priority element of a 
        // message is less than the priority element of another message.
        template<class Type, class PriorityType>
        struct less<concurrency::message<tuple<PriorityType,Type>>*> 
        {
            typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator< to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) < get<0>(right->payload));
            }
        };
    
        // A specialization of less that tests whether the priority element of a 
        // message is greater than the priority element of another message.
        template<class Type, class PriorityType>
        struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
        {
            typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
    
            bool operator()(const MessageType* left, const MessageType* right) const
            {
                // apply operator> to the first element (the priority) 
                // of the tuple payload.
                return (get<0>(left->payload) > get<0>(right->payload));
            }
        };
    }
    

    Az priority_buffer osztály objektumokat tárol message egy priority_queue objektumban. Ezek a típusspecicializációk lehetővé teszik, hogy a prioritási üzenetsor a prioritásuknak megfelelően rendezze az üzeneteket. Az tuple objektum első eleme a prioritás.

  4. A névtérben concurrencyex deklarálja az osztályt priority_buffer .

    namespace concurrencyex 
    {
        template<class Type,
            typename PriorityType = int,
            typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
        class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
            concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
        {
        public:
        protected:
        private:
        };
    }
    

    A priority_buffer osztály a propagator_blockszármazik. Ezért egyszerre küldhet és fogadhat üzeneteket. Az priority_buffer osztály több célpéldánysal is rendelkezhet, amelyek típusú üzeneteket fogadnak Type. Több olyan forrás is lehet, amely típusú tuple<PriorityType, Type>üzeneteket küld.

  5. private osztály priority_buffer szakaszában adja hozzá a következő tagváltozókat.

    // Stores incoming messages.
    // The type parameter Pr specifies how to order messages by priority.
    std::priority_queue<
        concurrency::message<_Source_type>*,
        std::vector<concurrency::message<_Source_type>*>,
        Pr
    > _input_messages;
    
    // Synchronizes access to the input message queue.
    concurrency::critical_section _input_lock;
    
    // Stores outgoing messages.
    std::queue<concurrency::message<_Target_type>*> _output_messages;
    

    Az priority_queue objektum a bejövő üzeneteket tárolja, az objektum pedig queue a kimenő üzeneteket. Az priority_buffer objektumok egyszerre több üzenetet is fogadhatnak; az critical_section objektum szinkronizálja a bemeneti üzenetek üzenetsorához való hozzáférést.

  6. A szakaszban adja meg a private másolási konstruktort és a hozzárendelési operátort. Ez megakadályozza priority_queue az objektumok hozzárendelését.

    // Hide assignment operator and copy constructor.
    priority_buffer const &operator =(priority_buffer const&);
    priority_buffer(priority_buffer const &);
    
  7. A szakaszban definiálja a public sok üzenetblokktípusra jellemző konstruktorokat. Definiálja a destruktort is.

    // Constructs a priority_buffer message block.
    priority_buffer() 
    {
        initialize_source_and_target();
    }
    
    // Constructs a priority_buffer message block with the given filter function.
    priority_buffer(filter_method const& filter)
    {
        initialize_source_and_target();
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler)
    {
        initialize_source_and_target(&scheduler);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided Scheduler object to propagate messages.
    priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
    {
        initialize_source_and_target(&scheduler);
        register_filter(filter);
    }
    
    // Constructs a priority_buffer message block that uses the provided 
    // SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group)
    {
        initialize_source_and_target(NULL, &schedule_group);
    }
    
    // Constructs a priority_buffer message block with the given filter function 
    // and uses the provided SchedulerGroup object to propagate messages.
    priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
    {
        initialize_source_and_target(NULL, &schedule_group);
        register_filter(filter);
    }
    
    // Destroys the message block.
    ~priority_buffer()
    {
        // Remove all links.
        remove_network_links();
    }
    
  8. A public szakaszban adja meg a metódusokat enqueue és dequeue. Ezek a segédmetódusok alternatív módot kínálnak arra, hogy üzeneteket küldjön és fogadjon egy priority_buffer objektumból.

    // Sends an item to the message block.
    bool enqueue(Type const& item)
    {
        return concurrency::asend<Type>(this, item);
    }
    
    // Receives an item from the message block.
    Type dequeue()
    {
        return receive<Type>(this);
    }
    
  9. A protected szakaszban, határozza meg a propagate_to_any_targets metódust.

    // Transfers the message at the front of the input queue to the output queue
    // and propagates out all messages in the output queue.
    virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
    {
        // Retrieve the message from the front of the input queue.
        concurrency::message<_Source_type>* input_message = NULL;
        {
            concurrency::critical_section::scoped_lock lock(_input_lock);
            if (_input_messages.size() > 0)
            {
                input_message = _input_messages.top();
                _input_messages.pop();
            }
        }
    
        // Move the message to the output queue.
        if (input_message != NULL)
        {
            // The payload of the output message does not contain the 
            // priority of the message.
            concurrency::message<_Target_type>* output_message = 
                new concurrency::message<_Target_type>(get<1>(input_message->payload));
            _output_messages.push(output_message);
    
            // Free the memory for the input message.
            delete input_message;
    
            // Do not propagate messages if the new message is not the head message.
            // In this case, the head message is reserved by another message block.
            if (_output_messages.front()->msg_id() != output_message->msg_id())
            {
                return;
            }
        }
    
        // Propagate out the output messages.
        propagate_priority_order();
    }
    

    A propagate_to_any_targets metódus a bemeneti üzenetsor elején lévő üzenetet a kimeneti üzenetsorba továbbítja, és propagálja a kimeneti üzenetsor összes üzenetét.

  10. A protected szakaszban, határozza meg a accept_message metódust.

    // Accepts a message that was offered by this block by transferring ownership
    // to the caller.
    virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
    {
        concurrency::message<_Target_type>* message = NULL;
    
        // Transfer ownership if the provided message identifier matches
        // the identifier of the front of the output message queue.
        if (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id)
        {
            message = _output_messages.front();
            _output_messages.pop();
        }
    
        return message;
    }
    

    Amikor egy célblokk meghívja a accept_message metódust, az priority_buffer osztály átadja az üzenet tulajdonjogát az azt elfogadó első célblokknak. Ez az unbounded_buffer viselkedésére hasonlít.

  11. A protected szakaszban, határozza meg a reserve_message metódust.

    // Reserves a message that was previously offered by this block.
    virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
    {
        // Allow the message to be reserved if the provided message identifier
        // is the message identifier of the front of the message queue.
        return (!_output_messages.empty() && 
            _output_messages.front()->msg_id() == msg_id);
    }
    

    Az priority_buffer osztály lehetővé teszi, hogy a célblokk lefoglaljon egy üzenetet, ha a megadott üzenetazonosító megegyezik az üzenetsor elején található azonosítóval. Más szóval a cél lefoglalhatja az üzenetet, ha az priority_buffer objektum még nem kapott további üzenetet, és még nem propagálta az aktuálisat.

  12. A protected szakaszban, határozza meg a consume_message metódust.

    // Transfers the message that was previously offered by this block 
    // to the caller. The caller of this method is the target block that 
    // reserved the message.
    virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
    {
        // Transfer ownership of the message to the caller.
        return accept_message(msg_id);
    }
    

    A célblokk hívásokat indít consume_message a fenntartott üzenet tulajdonjogának átruházására.

  13. A protected szakaszban, határozza meg a release_message metódust.

    // Releases a previous message reservation.
    virtual void release_message(concurrency::runtime_object_identity msg_id)
    {
        // The head message must be the one that is reserved. 
        if (_output_messages.empty() || 
            _output_messages.front()->msg_id() != msg_id)
        {
            throw message_not_found();
        }
    }
    

    A célblokk hívja a release_message-t, hogy megszakítsa egy üzenetre vonatkozó foglalását.

  14. A protected szakaszban, határozza meg a resume_propagation metódust.

    // Resumes propagation after a reservation has been released.
    virtual void resume_propagation()
    {
        // Propagate out any messages in the output queue.
        if (_output_messages.size() > 0)
        {
            async_send(NULL);
        }
    }
    

    A futtatókörnyezet meghívja resume_propagation miután a célblokk vagy felhasznál egy fenntartott üzenetet, vagy kibocsátja azt. Ez a metódus propagálja a kimeneti üzenetsorban lévő üzeneteket.

  15. A protected szakaszban, határozza meg a link_target_notification metódust.

    // Notifies this block that a new target has been linked to it.
    virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
    {
        // Do not propagate messages if a target block reserves
        // the message at the front of the queue.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out any messages that are in the output queue.
        propagate_priority_order();
    }
    

    A _M_pReservedFor tagváltozót az alaposztály határozza meg. source_block Ez a tagváltozó arra a célblokkra mutat( ha van ilyen), amely foglalást tart a kimeneti üzenetsor elején található üzenethez. A futtatókörnyezet meghívja link_target_notification , ha egy új cél kapcsolódik az priority_buffer objektumhoz. Ez a metódus propagálja a kimeneti üzenetsorba tartozó üzeneteket, ha egyik cél sem tart foglalást.

  16. A private szakaszban, határozza meg a propagate_priority_order metódust.

    // Propagates messages in priority order.
    void propagate_priority_order()
    {
        // Cancel propagation if another block reserves the head message.
        if (_M_pReservedFor != NULL)
        {
            return;
        }
    
        // Propagate out all output messages. 
        // Because this block preserves message ordering, stop propagation
        // if any of the messages are not accepted by a target block.
        while (!_output_messages.empty())
        {
            // Get the next message.
            concurrency::message<_Target_type> * message = _output_messages.front();
    
            concurrency::message_status status = declined;
    
            // Traverse each target in the order in which they are connected.
            for (target_iterator iter = _M_connectedTargets.begin(); 
                *iter != NULL; 
                ++iter)
            {
                // Propagate the message to the target.
                concurrency::ITarget<_Target_type>* target = *iter;
                status = target->propagate(message, this);
    
                // If the target accepts the message then ownership of message has 
                // changed. Do not propagate this message to any other target.
                if (status == accepted)
                {
                    break;
                }
    
                // If the target only reserved this message, we must wait until the 
                // target accepts the message.
                if (_M_pReservedFor != NULL)
                {
                    break;
                }
            }
    
            // If status is anything other than accepted, then the head message
            // was not propagated out. To preserve the order in which output 
            // messages are propagated, we must stop propagation until the head 
            // message is accepted.
            if (status != accepted)
            {
                break;
            }
        }
    }
    

    Ez a metódus propagálja az összes üzenetet a kimeneti üzenetsorból. A rendszer az üzenetsor minden üzenetét felajánlja minden célblokknak, amíg a célblokkok egyike el nem fogadja az üzenetet. Az priority_buffer osztály megőrzi a kimenő üzenetek sorrendjét. Ezért a kimeneti üzenetsor első üzenetét el kell fogadnia egy célblokknak, mielőtt ez a módszer bármilyen más üzenetet jelenít meg a célblokkok számára.

  17. A protected szakaszban, határozza meg a propagate_message metódust.

    // Asynchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::propagate.
    virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Asynchronously send the message to the target blocks.
            async_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }      
    }
    

    A propagate_message metódus lehetővé teszi, hogy az priority_buffer osztály üzenet fogadóként vagy célként működjön. Ez a metódus fogadja a megadott forrásblokk által kínált üzenetet, és beszúrja az üzenetet a prioritási üzenetsorba. A propagate_message metódus ezután aszinkron módon elküldi az összes kimeneti üzenetet a célblokkok számára.

    A futtatókörnyezet meghívja ezt a metódust, amikor meghívja az egyidejűséget::asend függvényt, vagy ha az üzenetblokk más üzenetblokkokhoz csatlakozik.

  18. A protected szakaszban, határozza meg a send_message metódust.

    // Synchronously passes a message from an ISource block to this block.
    // This method is typically called by propagator_block::send.
    virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
        concurrency::ISource<_Source_type>* source)
    {
        // Accept the message from the source block.
        message = source->accept(message->msg_id(), this);
    
        if (message != NULL)
        {
            // Insert the message into the input queue. The type parameter Pr
            // defines how to order messages by priority.
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                _input_messages.push(message);
            }
    
            // Synchronously send the message to the target blocks.
            sync_send(NULL);
            return accepted;
        }
        else
        {
            return missed;
        }
    }
    

    A send_message metódus hasonlít a propagate_message metódusra. A kimeneti üzeneteket azonban szinkron módon küldi el aszinkron helyett.

    A futtatókörnyezet egy szinkron küldési művelet során hívja meg ezt a metódust, például az egyidejűség::send függvény meghívásakor.

Az priority_buffer osztály számos üzenetblokktípusra jellemző konstruktor-túlterheléseket tartalmaz. Néhány konstruktor túlterhelése tartalmazhat párhuzamosság::Ütemező vagy párhuzamosság::ScheduleGroup objektumokat, amelyek lehetővé teszik, hogy az üzenetblokkot egy adott feladatütemező kezelje. A többi konstruktor túlterhelése szűrőfüggvényt vesz igénybe. A szűrőfüggvények lehetővé teszik, hogy az üzenetblokkok az üzenet tartalma alapján elfogadják vagy elutasítsák az üzenetet. További információ az üzenetszűrőkről: Aszinkron üzenetblokkok. A feladatütemezőkkel kapcsolatos további információkért lásd: Feladatütemező.

Mivel az priority_buffer osztály prioritás, majd az üzenetek fogadási sorrendje szerint rendeli meg az üzeneteket, ez az osztály akkor hasznos, ha aszinkron módon fogadja az üzeneteket, például ha az egyidejűség::asend függvényt hívja meg, vagy ha az üzenetblokk más üzenetblokkokhoz van csatlakoztatva.

[Felső]

A teljes példa

Az alábbi példa az osztály teljes definícióját priority_buffer mutatja be.

// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>

namespace std 
{
    // A specialization of less that tests whether the priority element of a 
    // message is less than the priority element of another message.
    template<class Type, class PriorityType>
    struct less<concurrency::message<tuple<PriorityType,Type>>*> 
    {
        typedef concurrency::message<tuple<PriorityType, Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator< to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) < get<0>(right->payload));
        }
    };

    // A specialization of less that tests whether the priority element of a 
    // message is greater than the priority element of another message.
    template<class Type, class PriorityType>
    struct greater<concurrency::message<tuple<PriorityType, Type>>*> 
    {
        typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;

        bool operator()(const MessageType* left, const MessageType* right) const
        {
            // apply operator> to the first element (the priority) 
            // of the tuple payload.
            return (get<0>(left->payload) > get<0>(right->payload));
        }
    };
}

namespace concurrencyex
{
    // A message block type that orders incoming messages first by priority, 
    // and then by the order in which messages are received. 
    template<class Type, 
        typename PriorityType = int,
        typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
    class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
        concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
    {  
    public:
        // Constructs a priority_buffer message block.
        priority_buffer() 
        {
            initialize_source_and_target();
        }

        // Constructs a priority_buffer message block with the given filter function.
        priority_buffer(filter_method const& filter)
        {
            initialize_source_and_target();
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler)
        {
            initialize_source_and_target(&scheduler);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided Scheduler object to propagate messages.
        priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) 
        {
            initialize_source_and_target(&scheduler);
            register_filter(filter);
        }

        // Constructs a priority_buffer message block that uses the provided 
        // SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group)
        {
            initialize_source_and_target(NULL, &schedule_group);
        }

        // Constructs a priority_buffer message block with the given filter function 
        // and uses the provided SchedulerGroup object to propagate messages.
        priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
        {
            initialize_source_and_target(NULL, &schedule_group);
            register_filter(filter);
        }

        // Destroys the message block.
        ~priority_buffer()
        {
            // Remove all links.
            remove_network_links();
        }

        // Sends an item to the message block.
        bool enqueue(Type const& item)
        {
            return concurrency::asend<Type>(this, item);
        }

        // Receives an item from the message block.
        Type dequeue()
        {
            return receive<Type>(this);
        }

    protected:
        // Asynchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::propagate.
        virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, 
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Asynchronously send the message to the target blocks.
                async_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }      
        }

        // Synchronously passes a message from an ISource block to this block.
        // This method is typically called by propagator_block::send.
        virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
            concurrency::ISource<_Source_type>* source)
        {
            // Accept the message from the source block.
            message = source->accept(message->msg_id(), this);

            if (message != NULL)
            {
                // Insert the message into the input queue. The type parameter Pr
                // defines how to order messages by priority.
                {
                    concurrency::critical_section::scoped_lock lock(_input_lock);
                    _input_messages.push(message);
                }

                // Synchronously send the message to the target blocks.
                sync_send(NULL);
                return accepted;
            }
            else
            {
                return missed;
            }
        }

        // Accepts a message that was offered by this block by transferring ownership
        // to the caller.
        virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
        {
            concurrency::message<_Target_type>* message = NULL;

            // Transfer ownership if the provided message identifier matches
            // the identifier of the front of the output message queue.
            if (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id)
            {
                message = _output_messages.front();
                _output_messages.pop();
            }

            return message;
        }

        // Reserves a message that was previously offered by this block.
        virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
        {
            // Allow the message to be reserved if the provided message identifier
            // is the message identifier of the front of the message queue.
            return (!_output_messages.empty() && 
                _output_messages.front()->msg_id() == msg_id);
        }

        // Transfers the message that was previously offered by this block 
        // to the caller. The caller of this method is the target block that 
        // reserved the message.
        virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
        {
            // Transfer ownership of the message to the caller.
            return accept_message(msg_id);
        }

        // Releases a previous message reservation.
        virtual void release_message(concurrency::runtime_object_identity msg_id)
        {
            // The head message must be the one that is reserved. 
            if (_output_messages.empty() || 
                _output_messages.front()->msg_id() != msg_id)
            {
                throw message_not_found();
            }
        }

        // Resumes propagation after a reservation has been released.
        virtual void resume_propagation()
        {
            // Propagate out any messages in the output queue.
            if (_output_messages.size() > 0)
            {
                async_send(NULL);
            }
        }

        // Notifies this block that a new target has been linked to it.
        virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
        {
            // Do not propagate messages if a target block reserves
            // the message at the front of the queue.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out any messages that are in the output queue.
            propagate_priority_order();
        }

        // Transfers the message at the front of the input queue to the output queue
        // and propagates out all messages in the output queue.
        virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
        {
            // Retrieve the message from the front of the input queue.
            concurrency::message<_Source_type>* input_message = NULL;
            {
                concurrency::critical_section::scoped_lock lock(_input_lock);
                if (_input_messages.size() > 0)
                {
                    input_message = _input_messages.top();
                    _input_messages.pop();
                }
            }

            // Move the message to the output queue.
            if (input_message != NULL)
            {
                // The payload of the output message does not contain the 
                // priority of the message.
                concurrency::message<_Target_type>* output_message = 
                    new concurrency::message<_Target_type>(get<1>(input_message->payload));
                _output_messages.push(output_message);

                // Free the memory for the input message.
                delete input_message;

                // Do not propagate messages if the new message is not the head message.
                // In this case, the head message is reserved by another message block.
                if (_output_messages.front()->msg_id() != output_message->msg_id())
                {
                    return;
                }
            }

            // Propagate out the output messages.
            propagate_priority_order();
        }

    private:

        // Propagates messages in priority order.
        void propagate_priority_order()
        {
            // Cancel propagation if another block reserves the head message.
            if (_M_pReservedFor != NULL)
            {
                return;
            }

            // Propagate out all output messages. 
            // Because this block preserves message ordering, stop propagation
            // if any of the messages are not accepted by a target block.
            while (!_output_messages.empty())
            {
                // Get the next message.
                concurrency::message<_Target_type> * message = _output_messages.front();

                concurrency::message_status status = declined;

                // Traverse each target in the order in which they are connected.
                for (target_iterator iter = _M_connectedTargets.begin(); 
                    *iter != NULL; 
                    ++iter)
                {
                    // Propagate the message to the target.
                    concurrency::ITarget<_Target_type>* target = *iter;
                    status = target->propagate(message, this);

                    // If the target accepts the message then ownership of message has 
                    // changed. Do not propagate this message to any other target.
                    if (status == accepted)
                    {
                        break;
                    }

                    // If the target only reserved this message, we must wait until the 
                    // target accepts the message.
                    if (_M_pReservedFor != NULL)
                    {
                        break;
                    }
                }

                // If status is anything other than accepted, then the head message
                // was not propagated out. To preserve the order in which output 
                // messages are propagated, we must stop propagation until the head 
                // message is accepted.
                if (status != accepted)
                {
                    break;
                }
            }
        }

    private:

        // Stores incoming messages.
        // The type parameter Pr specifies how to order messages by priority.
        std::priority_queue<
            concurrency::message<_Source_type>*,
            std::vector<concurrency::message<_Source_type>*>,
            Pr
        > _input_messages;

        // Synchronizes access to the input message queue.
        concurrency::critical_section _input_lock;

        // Stores outgoing messages.
        std::queue<concurrency::message<_Target_type>*> _output_messages;

    private:
        // Hide assignment operator and copy constructor.
        priority_buffer const &operator =(priority_buffer const&);
        priority_buffer(priority_buffer const &);
    };

}

Az alábbi példa egy sor asend és concurrency::receive műveletet hajt végre egyszerre egy priority_buffer objektumon.

// priority_buffer.cpp
// compile with: /EHsc 
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"

using namespace concurrency;
using namespace concurrencyex;
using namespace std;

int wmain()
{
   // Concurrently perform a number of asend and receive operations
   // on a priority_buffer object.

   priority_buffer<int> pb;
   
   parallel_invoke(
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
      [&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
      [&pb] { 
         for (int i = 0; i < 75; ++i) {
            wcout << receive(pb) << L' ';
            if ((i+1) % 25 == 0)
               wcout << endl;
         }
      }
   );
}

Ez a példa a következő mintakimenetet hozza létre.

36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12

Az priority_buffer osztály először prioritás, majd az üzenetek fogadási sorrendje szerint rendeli meg az üzeneteket. Ebben a példában a nagyobb numerikus prioritású üzeneteket a rendszer az üzenetsor elé helyezi.

[Felső]

A kód összeállítása

Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be az priority_buffer osztály definícióját egy elnevezett priority_buffer.h fájlba és a tesztprogramba egy elnevezett priority_buffer.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.

cl.exe /EHsc priority_buffer.cpp

Lásd még

Egyidejűségi futtatókörnyezeti útmutatók
Aszinkron üzenetblokkok
Üzenetátadási függvények