Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Ez a dokumentum bemutatja, hogyan hozhat létre egyéni üzenetblokktípust, amely prioritás szerint rendeli meg a bejövő üzeneteket.
Bár a beépített üzenetblokk-típusok számos funkciót biztosítanak, létrehozhatja saját üzenetblokktípusát, és testre szabhatja azt az alkalmazás követelményeinek megfelelően. Az Aszinkron ügynökkódtár által biztosított beépített üzenetblokktípusok leírását az Aszinkron üzenetblokkok című témakörben talál.
Előfeltételek
Az útmutató megkezdése előtt olvassa el a következő dokumentumokat:
Szakaszok
Ez az útmutató a következő szakaszokat tartalmazza:
Egyéni üzenetblokk tervezése
Az üzenetblokkok részt vesznek az üzenetek küldésében és fogadásában. Az üzeneteket küldő üzenetblokkokat forrásblokknak nevezzük. Az üzeneteket fogadó üzenetblokkot célblokknak nevezzük. Az üzeneteket küldő és fogadó üzenetblokkokat propagátorblokknak nevezzük. Az Agents Library az absztrakt osztályt, concurrency::ISource-t használja a forrásblokkok reprezentálására, és az absztrakt osztályt, concurrency::ITarget-et a célblokkok reprezentálására. Azok az üzenetblokktípusok, amelyek forrásként működnek, a ISource-ból származnak; azok az üzenetblokktípusok, amelyek célként működnek, a ITarget-ből származnak.
Bár az üzenetblokk típusát közvetlenül a ISource és ITarget osztályokból is származtathatja, az Agents könyvtár három olyan alaposztályt határoz meg, amelyek elvégzik az összes üzenetblokktípusra jellemző funkciók nagy részét, például a hibák kezelését, és az üzenetblokkok biztonságos, egyidejű kapcsolódását biztosítják. Az egyidejűség::source_block osztály más blokkokból származik ISource , és üzeneteket küld. Az egyidejűség::target_block osztály az ITarget osztályból származik, és fogad üzeneteket más blokkokból. Az concurrency::propagator_block osztály származik ISource-ból és ITarget-ból, üzeneteket küld más blokkoknak, és üzeneteket fogad más blokkoktól. Javasoljuk, hogy ezt a három alaposztályt használja az infrastruktúra részleteinek kezeléséhez, hogy az üzenetblokk viselkedésére összpontosíthasson.
A source_block, target_blockés propagator_block osztályok olyan sablonok, amelyek olyan típusra vannak paraméterezve, amely kezeli a forrás- és célblokkok közötti kapcsolatokat vagy hivatkozásokat, valamint egy olyan típuson, amely kezeli az üzenetek feldolgozását. Az Agents Library két olyan típust határoz meg, amelyek linkkezelést végeznek: concurrency::single_link_registry és concurrency::multi_link_registry. Az single_link_registry osztály lehetővé teszi, hogy egy üzenetblokk egy forráshoz vagy egy célhoz legyen csatolva. Az multi_link_registry osztály lehetővé teszi, hogy egy üzenetblokk több forráshoz vagy több célhoz legyen csatolva. Az Ügynökök könyvtára egy olyan osztályt definiál, amely az üzenetek kezelésével foglalkozik, egyidejűség::ordered_message_processor. Az ordered_message_processor osztály lehetővé teszi az üzenetblokkok számára, hogy az üzeneteket abban a sorrendben dolgozzák fel, amelyben megkapják őket.
Az üzenetblokkok forrásukhoz és céljaikhoz való viszonyának jobb megértéséhez tekintse meg az alábbi példát. Ez a példa a párhuzamosság::transformer osztály deklarációját mutatja be.
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
Az transformer osztály a forrásblokkból propagator_blockszármazik, ezért forrásblokkként és célblokkként is működik. Elfogadja a típusú _Input üzeneteket, és típus típusú _Outputüzeneteket küld. Az transformer osztály single_link_registry-t adja meg hivatkozáskezelőként a célblokkokhoz és multi_link_registry-t hivatkozáskezelőként a forrásblokkokhoz. Ezért egy transformer objektum legfeljebb egy célpontot és korlátlan számú forrást tartalmazhat.
A származtatott source_block osztálynak hat metódust kell implementálnia: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message és resume_propagation. Az abból target_block származó osztálynak implementálnia kell a propagate_message metódust, és opcionálisan implementálnia kell a send_message metódust. A propagator_block-ból való származtatás funkcionálisan megegyezik a source_block és target_block származtatásával.
A propagate_to_any_targets futtatókörnyezet meghívja a metódust, hogy aszinkron vagy szinkron módon dolgozza fel a bejövő üzeneteket, és propagálja a kimenő üzeneteket. A accept_message metódust a célblokkok hívják meg az üzenetek elfogadásához. Számos üzenetblokktípus, például unbounded_buffercsak az első címzettnek küld üzeneteket. Ezért átadja az üzenet tulajdonjogát a célnak. Egyéb üzenetblokktípusok, mint például egyidejűség::overwrite_buffer, üzeneteket kínálnak minden célblokkjának.
overwrite_buffer Ezért minden egyes célhoz másolatot készít az üzenetről.
A reserve_message, consume_message, release_messageés resume_propagation metódusok lehetővé teszik, hogy az üzenetblokkok részt vegyenek az üzenetfoglalásban. A célblokkok meghívják a reserve_message metódust, amikor üzenetet ajánlanak fel nekik, és az üzenetet későbbi használatra kell lefoglalniuk. Miután egy célblokk lefoglal egy üzenetet, meghívhatja a consume_message metódust az üzenet feldolgozására, vagy a release_message foglalás lemondására. A módszerhez accept_message hasonlóan a végrehajtás consume_message is átruházhatja az üzenet tulajdonjogát, vagy visszaküldheti az üzenet másolatát. Miután egy célblokk foglalt üzenetet használ vagy bocsát ki, a futtatókörnyezet meghívja a metódust resume_propagation . Ez a módszer általában folytatja az üzenetek propagálását, kezdve az üzenetsor következő üzenetével.
A futtatókörnyezet meghívja a propagate_message metódust, hogy aszinkron módon továbbítson egy üzenetet egy másik blokkból az aktuálisba. A send_message metódus hasonló a propagate_message-hez, azzal a különbséggel, hogy szinkron módon, és nem aszinkron módon küldi el az üzenetet a célblokkoknak. Az alapértelmezett implementáció elutasítja az send_message összes bejövő üzenetet. A futtatókörnyezet nem hívja meg egyik metódust sem, ha az üzenet nem adja át a célblokkhoz társított opcionális szűrőfüggvényt. További információ az üzenetszűrőkről: Aszinkron üzenetblokkok.
[Felső]
A priority_buffer osztály definiálása
Az priority_buffer osztály egy egyéni üzenetblokktípus, amely először prioritás, majd az üzenetek fogadási sorrendje szerint rendeli meg a bejövő üzeneteket. Az priority_buffer osztály hasonlít a concurrency::unbounded_buffer osztályhoz, mert üzenetsort tartalmaz, és azért is, mert forrás és cél üzenetblokként működik, és több forrása és több célja lehet. Az unbounded_buffer üzenetek propagálását azonban csak arra a sorrendre alapozza, amelyben üzeneteket fogad a forrásaitól.
Az priority_buffer osztály std::tuple típusú üzeneteket fogad, amelyek tartalmaznak PriorityType és Type elemeket.
PriorityType az egyes üzenetek prioritását tartalmazó típusra hivatkozik; Type az üzenet adatrészére hivatkozik. Az priority_buffer osztály ilyen típusú Typeüzeneteket küld. Az priority_buffer osztály két üzenetsort is kezel: egy std::p riority_queue objektumot a bejövő üzenetekhez, és egy std::queue objektumot a kimenő üzenetekhez. Az üzenetek prioritás szerinti rendezése akkor hasznos, ha egy priority_buffer objektum egyszerre több üzenetet fogad, vagy ha több üzenetet kap, mielőtt a felhasználók felolvassák az üzeneteket.
Azon túlmenően, hogy egy propagator_block-ből származó osztálynak hét metódust kell implementálnia, a priority_buffer osztály felülbírálja a link_target_notification és send_message metódusokat is. Az priority_buffer osztály két nyilvános segédmetódust enqueue és dequeueegy magánsegítőmetódust propagate_priority_orderis definiál.
Az alábbi eljárás ismerteti, hogyan kell a priority_buffer osztályt implementálni.
A priority_buffer osztály definiálása
Hozzon létre egy C++ fejlécfájlt, és nevezze el.
priority_buffer.hMásik lehetőségként használhat egy meglévő fejlécfájlt is, amely a projekt része.Adja
priority_buffer.hhozzá a következő kódot.#pragma once #include <agents.h> #include <queue>A
stdnévtérben definiálja a std::less és std::greater specializációit, amelyek a concurrency::message objektumokon működnek.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }Az
priority_bufferosztály objektumokat tárolmessageegypriority_queueobjektumban. Ezek a típusspecicializációk lehetővé teszik, hogy a prioritási üzenetsor a prioritásuknak megfelelően rendezze az üzeneteket. Aztupleobjektum első eleme a prioritás.A névtérben
concurrencyexdeklarálja az osztálytpriority_buffer.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }A
priority_bufferosztály apropagator_blockszármazik. Ezért egyszerre küldhet és fogadhat üzeneteket. Azpriority_bufferosztály több célpéldánysal is rendelkezhet, amelyek típusú üzeneteket fogadnakType. Több olyan forrás is lehet, amely típusútuple<PriorityType, Type>üzeneteket küld.privateosztálypriority_bufferszakaszában adja hozzá a következő tagváltozókat.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;Az
priority_queueobjektum a bejövő üzeneteket tárolja, az objektum pedigqueuea kimenő üzeneteket. Azpriority_bufferobjektumok egyszerre több üzenetet is fogadhatnak; azcritical_sectionobjektum szinkronizálja a bemeneti üzenetek üzenetsorához való hozzáférést.A szakaszban adja meg a
privatemásolási konstruktort és a hozzárendelési operátort. Ez megakadályozzapriority_queueaz objektumok hozzárendelését.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);A szakaszban definiálja a
publicsok üzenetblokktípusra jellemző konstruktorokat. Definiálja a destruktort is.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }A
publicszakaszban adja meg a metódusokatenqueueésdequeue. Ezek a segédmetódusok alternatív módot kínálnak arra, hogy üzeneteket küldjön és fogadjon egypriority_bufferobjektumból.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }A
protectedszakaszban, határozza meg apropagate_to_any_targetsmetódust.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }A
propagate_to_any_targetsmetódus a bemeneti üzenetsor elején lévő üzenetet a kimeneti üzenetsorba továbbítja, és propagálja a kimeneti üzenetsor összes üzenetét.A
protectedszakaszban, határozza meg aaccept_messagemetódust.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }Amikor egy célblokk meghívja a
accept_messagemetódust, azpriority_bufferosztály átadja az üzenet tulajdonjogát az azt elfogadó első célblokknak. Ez azunbounded_bufferviselkedésére hasonlít.A
protectedszakaszban, határozza meg areserve_messagemetódust.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }Az
priority_bufferosztály lehetővé teszi, hogy a célblokk lefoglaljon egy üzenetet, ha a megadott üzenetazonosító megegyezik az üzenetsor elején található azonosítóval. Más szóval a cél lefoglalhatja az üzenetet, ha azpriority_bufferobjektum még nem kapott további üzenetet, és még nem propagálta az aktuálisat.A
protectedszakaszban, határozza meg aconsume_messagemetódust.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }A célblokk hívásokat indít
consume_messagea fenntartott üzenet tulajdonjogának átruházására.A
protectedszakaszban, határozza meg arelease_messagemetódust.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }A célblokk hívja a
release_message-t, hogy megszakítsa egy üzenetre vonatkozó foglalását.A
protectedszakaszban, határozza meg aresume_propagationmetódust.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }A futtatókörnyezet meghívja
resume_propagationmiután a célblokk vagy felhasznál egy fenntartott üzenetet, vagy kibocsátja azt. Ez a metódus propagálja a kimeneti üzenetsorban lévő üzeneteket.A
protectedszakaszban, határozza meg alink_target_notificationmetódust.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }A
_M_pReservedFortagváltozót az alaposztály határozza meg.source_blockEz a tagváltozó arra a célblokkra mutat( ha van ilyen), amely foglalást tart a kimeneti üzenetsor elején található üzenethez. A futtatókörnyezet meghívjalink_target_notification, ha egy új cél kapcsolódik azpriority_bufferobjektumhoz. Ez a metódus propagálja a kimeneti üzenetsorba tartozó üzeneteket, ha egyik cél sem tart foglalást.A
privateszakaszban, határozza meg apropagate_priority_ordermetódust.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }Ez a metódus propagálja az összes üzenetet a kimeneti üzenetsorból. A rendszer az üzenetsor minden üzenetét felajánlja minden célblokknak, amíg a célblokkok egyike el nem fogadja az üzenetet. Az
priority_bufferosztály megőrzi a kimenő üzenetek sorrendjét. Ezért a kimeneti üzenetsor első üzenetét el kell fogadnia egy célblokknak, mielőtt ez a módszer bármilyen más üzenetet jelenít meg a célblokkok számára.A
protectedszakaszban, határozza meg apropagate_messagemetódust.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }A
propagate_messagemetódus lehetővé teszi, hogy azpriority_bufferosztály üzenet fogadóként vagy célként működjön. Ez a metódus fogadja a megadott forrásblokk által kínált üzenetet, és beszúrja az üzenetet a prioritási üzenetsorba. Apropagate_messagemetódus ezután aszinkron módon elküldi az összes kimeneti üzenetet a célblokkok számára.A futtatókörnyezet meghívja ezt a metódust, amikor meghívja az egyidejűséget::asend függvényt, vagy ha az üzenetblokk más üzenetblokkokhoz csatlakozik.
A
protectedszakaszban, határozza meg asend_messagemetódust.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }A
send_messagemetódus hasonlít apropagate_messagemetódusra. A kimeneti üzeneteket azonban szinkron módon küldi el aszinkron helyett.A futtatókörnyezet egy szinkron küldési művelet során hívja meg ezt a metódust, például az egyidejűség::send függvény meghívásakor.
Az priority_buffer osztály számos üzenetblokktípusra jellemző konstruktor-túlterheléseket tartalmaz. Néhány konstruktor túlterhelése tartalmazhat párhuzamosság::Ütemező vagy párhuzamosság::ScheduleGroup objektumokat, amelyek lehetővé teszik, hogy az üzenetblokkot egy adott feladatütemező kezelje. A többi konstruktor túlterhelése szűrőfüggvényt vesz igénybe. A szűrőfüggvények lehetővé teszik, hogy az üzenetblokkok az üzenet tartalma alapján elfogadják vagy elutasítsák az üzenetet. További információ az üzenetszűrőkről: Aszinkron üzenetblokkok. A feladatütemezőkkel kapcsolatos további információkért lásd: Feladatütemező.
Mivel az priority_buffer osztály prioritás, majd az üzenetek fogadási sorrendje szerint rendeli meg az üzeneteket, ez az osztály akkor hasznos, ha aszinkron módon fogadja az üzeneteket, például ha az egyidejűség::asend függvényt hívja meg, vagy ha az üzenetblokk más üzenetblokkokhoz van csatlakoztatva.
[Felső]
A teljes példa
Az alábbi példa az osztály teljes definícióját priority_buffer mutatja be.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
Az alábbi példa egy sor asend és concurrency::receive műveletet hajt végre egyszerre egy priority_buffer objektumon.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
Ez a példa a következő mintakimenetet hozza létre.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
Az priority_buffer osztály először prioritás, majd az üzenetek fogadási sorrendje szerint rendeli meg az üzeneteket. Ebben a példában a nagyobb numerikus prioritású üzeneteket a rendszer az üzenetsor elé helyezi.
[Felső]
A kód összeállítása
Másolja ki a példakódot, és illessze be egy Visual Studio-projektbe, vagy illessze be az priority_buffer osztály definícióját egy elnevezett priority_buffer.h fájlba és a tesztprogramba egy elnevezett priority_buffer.cpp fájlba, majd futtassa a következő parancsot egy Visual Studio parancssori ablakban.
cl.exe /EHsc priority_buffer.cpp
Lásd még
Egyidejűségi futtatókörnyezeti útmutatók
Aszinkron üzenetblokkok
Üzenetátadási függvények