Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
In dit document wordt beschreven hoe u een aangepast berichtbloktype maakt waarmee binnenkomende berichten op prioriteit worden gesorteerd.
Hoewel de ingebouwde berichtbloktypen een breed scala aan functionaliteit bieden, kunt u uw eigen type berichtblok maken en aanpassen om te voldoen aan de vereisten van uw toepassing. Zie Asynchrone berichtblokken voor een beschrijving van de ingebouwde typen berichtenblokken die worden geleverd door de Asynchrone agents-bibliotheek.
Vereiste voorwaarden
Lees de volgende documenten voordat u aan deze procedure begint:
Afdelingen
Dit stappenplan bevat de volgende onderdelen:
Een aangepast berichtblok ontwerpen
Berichtenblokken nemen deel aan het verzenden en ontvangen van berichten. Een berichtblok waarmee berichten worden verzonden, wordt een bronblok genoemd. Een berichtblok dat berichten ontvangt, wordt een doelblok genoemd. Een berichtblok dat zowel berichten verzendt als ontvangt, wordt een doorgifteblok genoemd. De Agents Library gebruikt de abstracte klasse concurrency::ISource om bronblokken te vertegenwoordigen en de abstracte klasse concurrency::ITarget om doelblokken te vertegenwoordigen. Berichtbloktypen die fungeren als bronnen die zijn afgeleid van ISource; berichtbloktypen die fungeren als doelen die zijn afgeleid van ITarget.
Hoewel u het type berichtblok rechtstreeks van ISourceITargeten kunt afleiden, definieert de agentsbibliotheek drie basisklassen die veel van de functionaliteit uitvoeren die gebruikelijk is voor alle typen berichtblokken, bijvoorbeeld het verwerken van fouten en het verbinden van berichtblokken op een gelijktijdigheidsveilige manier. De gelijktijdigheid::source_block klasse is afgeleid van ISource en verzendt berichten naar andere blokken. De gelijktijdigheid::target_block klasse is afgeleid van ITarget en ontvangt berichten van andere blokken. De concurrentie::propagator_block klasse is afgeleid van ISource en ITarget, en daarbij verzendt het berichten naar andere blokken en ontvangt het berichten van andere blokken. U wordt aangeraden deze drie basisklassen te gebruiken om infrastructuurdetails te verwerken, zodat u zich kunt richten op het gedrag van uw berichtblok.
De source_block, target_blocken propagator_block klassen zijn sjablonen die zijn geparameteriseerd voor een type waarmee de verbindingen, of koppelingen, tussen bron- en doelblokken worden beheerd en op een type dat beheert hoe berichten worden verwerkt. De Agents Bibliotheek definieert twee typen voor linkbeheer, concurrency::single_link_registry en concurrency::multi_link_registry. Met de single_link_registry klasse kan een berichtblok worden gekoppeld aan één bron of aan één doel. Met de multi_link_registry klasse kan een berichtblok worden gekoppeld aan meerdere bronnen of meerdere doelen. De agentsbibliotheek definieert één klasse die berichtbeheer, gelijktijdigheid::ordered_message_processor uitvoert. Met ordered_message_processor de klasse kunnen berichtblokken berichten verwerken in de volgorde waarin ze worden ontvangen.
Bekijk het volgende voorbeeld om beter te begrijpen hoe berichtblokken zich verhouden tot hun bronnen en doelen. In dit voorbeeld ziet u de declaratie van de gelijktijdigheidsklasse::transformer .
template<
class _Input,
class _Output
>
class transformer : public propagator_block<
single_link_registry<ITarget<_Output>>,
multi_link_registry<ISource<_Input>>
>;
De transformer klasse is afgeleid van propagator_blocken fungeert daarom als een bronblok en als doelblok. Het accepteert berichten van het type _Input en verzendt berichten van het type _Output. De transformer klasse geeft aan single_link_registry als koppelingsbeheer voor alle doelblokken en multi_link_registry als koppelingsbeheer voor alle bronblokken. Daarom kan een transformer object maximaal één doel en een onbeperkt aantal bronnen hebben.
Een klasse die is afgeleid van source_block , moet zes methoden implementeren: propagate_to_any_targets, accept_message, reserve_message, consume_message, release_message en resume_propagation. Een klasse die is afgeleid van target_block moet de propagate_message-methode implementeren en eventueel de send_message-methode implementeren. Afgeleid van propagator_block is functioneel gelijk aan het afleiden van beide source_block en target_block.
De propagate_to_any_targets methode wordt aangeroepen door de runtime om asynchroon of synchroon alle inkomende berichten te verwerken en uitgaande berichten door te geven. De accept_message methode wordt aangeroepen door doelblokken om berichten te accepteren. Veel typen berichtenblokken, zoals unbounded_buffer, verzenden alleen berichten naar het eerste doel dat het zou ontvangen. Daarom wordt het eigendom van het bericht overgedragen aan het doel. Andere typen berichtenblokken, zoals gelijktijdigheid::overwrite_buffer, bieden berichten aan elk van de doelblokken.
overwrite_buffer Maakt daarom een kopie van het bericht voor elk van de doelen.
Met de methoden reserve_message, consume_message, release_message en resume_propagation kunnen message blocks deelnemen aan de berichtreservering. Doelblokken roepen de reserve_message methode aan wanneer hun een bericht wordt aangeboden en het bericht voor later gebruik moeten reserveren. Nadat een doelblok een bericht heeft gereserveerd, kan het de consume_message methode aanroepen om dat bericht te gebruiken of de release_message methode om de reservering te annuleren. Net als bij de accept_message methode kan de implementatie van consume_message het eigendomsrecht van het bericht overdragen of een kopie van het bericht teruggeven. Nadat een doelblok een gereserveerd bericht verbruikt of loslaat, roept de runtime de resume_propagation methode aan. Normaal gesproken gaat deze methode door met het doorgeven van berichten, te beginnen met het volgende bericht in de wachtrij.
De runtime roept de propagate_message methode aan om een bericht asynchroon over te dragen van een ander blok naar het huidige blok. De send_message methode lijkt op propagate_message, behalve dat deze synchroon, in plaats van asynchroon, het bericht naar de doelblokken verzendt. De standaardimplementatie van send_message weigert alle inkomende berichten. De runtime roept geen van deze methoden aan als het bericht niet de optionele filterfunctie doorgeeft die is gekoppeld aan het doelblok. Zie Asynchrone berichtblokken voor meer informatie over berichtfilters.
[Boven]
De priority_buffer-klasse definiëren
De priority_buffer klasse is een aangepast berichtbloktype dat binnenkomende berichten eerst op prioriteit rangschikt en vervolgens op volgorde waarin berichten worden ontvangen. De priority_buffer klasse lijkt op de gelijktijdigheid::unbounded_buffer klasse omdat deze een wachtrij met berichten bevat, en ook omdat deze fungeert als een bron- en doelberichtblok en zowel meerdere bronnen als meerdere doelen kan hebben.
unbounded_buffer Baseert berichtdoorgifte echter alleen op de volgorde waarin berichten van de bronnen worden ontvangen.
De priority_buffer klasse ontvangt berichten van het type std::tuple die elementen bevatten PriorityType en Type bevatten.
PriorityType verwijst naar het type dat de prioriteit van elk bericht bevat; Type verwijst naar het gegevensgedeelte van het bericht. De priority_buffer klasse verzendt berichten van het type Type. De priority_buffer klasse beheert ook twee berichtenwachtrijen: een std::p riority_queue-object voor binnenkomende berichten en een object std::queue voor uitgaande berichten. Het rangschikken van berichten op prioriteit is handig wanneer een priority_buffer object meerdere berichten tegelijk ontvangt of wanneer het meerdere berichten ontvangt voordat berichten door consumenten worden gelezen.
Naast de zeven methoden die een klasse die is afgeleid van propagator_block moeten implementeren, overschrijft de priority_buffer klasse ook de link_target_notification en send_message methoden. De priority_buffer klasse definieert ook twee openbare helpermethoden en enqueue , en dequeueeen persoonlijke helpermethode, propagate_priority_order.
In de volgende procedure wordt beschreven hoe u de priority_buffer klasse implementeert.
De priority_buffer-klasse definiëren
Maak een C++-headerbestand en geef het een
priority_buffer.hnaam. U kunt ook een bestaand headerbestand gebruiken dat deel uitmaakt van uw project.Voeg
priority_buffer.hde volgende code toe.#pragma once #include <agents.h> #include <queue>Definieer in de
stdnaamruimte specialisaties van std::less en std::greater die werken op concurrency::message objecten.namespace std { // A specialization of less that tests whether the priority element of a // message is less than the priority element of another message. template<class Type, class PriorityType> struct less<concurrency::message<tuple<PriorityType,Type>>*> { typedef concurrency::message<tuple<PriorityType, Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator< to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) < get<0>(right->payload)); } }; // A specialization of less that tests whether the priority element of a // message is greater than the priority element of another message. template<class Type, class PriorityType> struct greater<concurrency::message<tuple<PriorityType, Type>>*> { typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType; bool operator()(const MessageType* left, const MessageType* right) const { // apply operator> to the first element (the priority) // of the tuple payload. return (get<0>(left->payload) > get<0>(right->payload)); } }; }De
priority_buffer-klasse slaatmessage-objecten op in eenpriority_queue-object. Met deze typespecialisaties kan de prioriteitswachtrij berichten sorteren op basis van hun prioriteit. De prioriteit is het eerste element van hettupleobject.Declareer de klasse in de
concurrencyexpriority_buffernaamruimte.namespace concurrencyex { template<class Type, typename PriorityType = int, typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>> class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>, concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>> { public: protected: private: }; }De
priority_bufferklasse is afgeleid vanpropagator_block. Daarom kan zowel berichten worden verzonden als ontvangen. Depriority_bufferklasse kan meerdere doelen hebben die berichten van het typeTypeontvangen. Het kan ook meerdere bronnen hebben die berichten van het typetuple<PriorityType, Type>verzenden.Voeg in de
privatesectie van depriority_bufferklasse de volgende lidvariabelen toe.// Stores incoming messages. // The type parameter Pr specifies how to order messages by priority. std::priority_queue< concurrency::message<_Source_type>*, std::vector<concurrency::message<_Source_type>*>, Pr > _input_messages; // Synchronizes access to the input message queue. concurrency::critical_section _input_lock; // Stores outgoing messages. std::queue<concurrency::message<_Target_type>*> _output_messages;Het
priority_queueobject bevat binnenkomende berichten. Hetqueueobject bevat uitgaande berichten. Eenpriority_bufferobject kan meerdere berichten tegelijk ontvangen. Hetcritical_sectionobject synchroniseert de toegang tot de wachtrij met invoerberichten.Definieer in de
privatesectie de kopieerconstructor en de toewijzingsoperator. Hiermee voorkomt u datpriority_queueobjecten kunnen worden toegewezen.// Hide assignment operator and copy constructor. priority_buffer const &operator =(priority_buffer const&); priority_buffer(priority_buffer const &);Definieer in de
publicsectie de constructors die gebruikelijk zijn voor veel berichtbloktypen. Definieer ook de destructor.// Constructs a priority_buffer message block. priority_buffer() { initialize_source_and_target(); } // Constructs a priority_buffer message block with the given filter function. priority_buffer(filter_method const& filter) { initialize_source_and_target(); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler) { initialize_source_and_target(&scheduler); } // Constructs a priority_buffer message block with the given filter function // and uses the provided Scheduler object to propagate messages. priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter) { initialize_source_and_target(&scheduler); register_filter(filter); } // Constructs a priority_buffer message block that uses the provided // SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group) { initialize_source_and_target(NULL, &schedule_group); } // Constructs a priority_buffer message block with the given filter function // and uses the provided SchedulerGroup object to propagate messages. priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter) { initialize_source_and_target(NULL, &schedule_group); register_filter(filter); } // Destroys the message block. ~priority_buffer() { // Remove all links. remove_network_links(); }Definieer in de
publicsectie de methodenenqueueendequeue. Deze helpermethoden bieden een alternatieve manier om berichten te verzenden naar en te ontvangen van eenpriority_bufferobject.// Sends an item to the message block. bool enqueue(Type const& item) { return concurrency::asend<Type>(this, item); } // Receives an item from the message block. Type dequeue() { return receive<Type>(this); }Definieer de
protectedmethode in depropagate_to_any_targetssectie.// Transfers the message at the front of the input queue to the output queue // and propagates out all messages in the output queue. virtual void propagate_to_any_targets(concurrency::message<_Target_type>*) { // Retrieve the message from the front of the input queue. concurrency::message<_Source_type>* input_message = NULL; { concurrency::critical_section::scoped_lock lock(_input_lock); if (_input_messages.size() > 0) { input_message = _input_messages.top(); _input_messages.pop(); } } // Move the message to the output queue. if (input_message != NULL) { // The payload of the output message does not contain the // priority of the message. concurrency::message<_Target_type>* output_message = new concurrency::message<_Target_type>(get<1>(input_message->payload)); _output_messages.push(output_message); // Free the memory for the input message. delete input_message; // Do not propagate messages if the new message is not the head message. // In this case, the head message is reserved by another message block. if (_output_messages.front()->msg_id() != output_message->msg_id()) { return; } } // Propagate out the output messages. propagate_priority_order(); }Met
propagate_to_any_targetsde methode wordt het bericht dat zich aan de voorzijde van de invoerwachtrij bevindt, overgedragen naar de uitvoerwachtrij en worden alle berichten in de uitvoerwachtrij doorgegeven.Definieer de
protectedmethode in deaccept_messagesectie.// Accepts a message that was offered by this block by transferring ownership // to the caller. virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id) { concurrency::message<_Target_type>* message = NULL; // Transfer ownership if the provided message identifier matches // the identifier of the front of the output message queue. if (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id) { message = _output_messages.front(); _output_messages.pop(); } return message; }Wanneer een doelblok de
accept_messagemethode aanroept, draagt depriority_bufferklasse het eigendom van het bericht over naar het eerste doelblok dat het accepteert. (Dit lijkt op het gedrag vanunbounded_buffer.)Definieer de
protectedmethode in dereserve_messagesectie.// Reserves a message that was previously offered by this block. virtual bool reserve_message(concurrency::runtime_object_identity msg_id) { // Allow the message to be reserved if the provided message identifier // is the message identifier of the front of the message queue. return (!_output_messages.empty() && _output_messages.front()->msg_id() == msg_id); }Met
priority_bufferde klasse kan een doelblok een bericht reserveren wanneer de opgegeven bericht-id overeenkomt met de id van het bericht dat zich vóór de wachtrij bevindt. Met andere woorden, een doel kan het bericht reserveren als hetpriority_bufferobject nog geen extra bericht heeft ontvangen en het huidige bericht nog niet heeft doorgegeven.Definieer de
protectedmethode in deconsume_messagesectie.// Transfers the message that was previously offered by this block // to the caller. The caller of this method is the target block that // reserved the message. virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id) { // Transfer ownership of the message to the caller. return accept_message(msg_id); }Een doelblok roept aan
consume_messageom het eigendom van het bericht dat is gereserveerd over te dragen.Definieer de
protectedmethode in derelease_messagesectie.// Releases a previous message reservation. virtual void release_message(concurrency::runtime_object_identity msg_id) { // The head message must be the one that is reserved. if (_output_messages.empty() || _output_messages.front()->msg_id() != msg_id) { throw message_not_found(); } }Een doelblok roept
release_messageaan om de reservering van een bericht te annuleren.Definieer de
protectedmethode in deresume_propagationsectie.// Resumes propagation after a reservation has been released. virtual void resume_propagation() { // Propagate out any messages in the output queue. if (_output_messages.size() > 0) { async_send(NULL); } }De runtime roept
resume_propagationaan nadat een doelblok een gereserveerd bericht verbruikt of vrijgeeft. Met deze methode worden alle berichten die zich in de uitvoerwachtrij bevinden, doorgegeven.Definieer de
protectedmethode in delink_target_notificationsectie.// Notifies this block that a new target has been linked to it. virtual void link_target_notification(concurrency::ITarget<_Target_type>*) { // Do not propagate messages if a target block reserves // the message at the front of the queue. if (_M_pReservedFor != NULL) { return; } // Propagate out any messages that are in the output queue. propagate_priority_order(); }De
_M_pReservedForlidvariabele wordt gedefinieerd door de basisklasse.source_blockDeze lidvariabele verwijst naar het doelblok, indien aanwezig, dat een reservering vasthoudt voor het bericht dat zich aan de voorkant van de uitvoerwachtrij bevindt. De runtime roept aanlink_target_notificationwanneer een nieuw doel is gekoppeld aan hetpriority_bufferobject. Met deze methode worden berichten die zich in de uitvoerwachtrij bevinden, doorgegeven, op voorwaarde dat er geen doel een reservering vasthoudt.Definieer de
privatemethode in depropagate_priority_ordersectie.// Propagates messages in priority order. void propagate_priority_order() { // Cancel propagation if another block reserves the head message. if (_M_pReservedFor != NULL) { return; } // Propagate out all output messages. // Because this block preserves message ordering, stop propagation // if any of the messages are not accepted by a target block. while (!_output_messages.empty()) { // Get the next message. concurrency::message<_Target_type> * message = _output_messages.front(); concurrency::message_status status = declined; // Traverse each target in the order in which they are connected. for (target_iterator iter = _M_connectedTargets.begin(); *iter != NULL; ++iter) { // Propagate the message to the target. concurrency::ITarget<_Target_type>* target = *iter; status = target->propagate(message, this); // If the target accepts the message then ownership of message has // changed. Do not propagate this message to any other target. if (status == accepted) { break; } // If the target only reserved this message, we must wait until the // target accepts the message. if (_M_pReservedFor != NULL) { break; } } // If status is anything other than accepted, then the head message // was not propagated out. To preserve the order in which output // messages are propagated, we must stop propagation until the head // message is accepted. if (status != accepted) { break; } } }Met deze methode worden alle berichten uit de uitvoerwachtrij doorgegeven. Elk bericht in de wachtrij wordt aan elk doelblok aangeboden totdat een van de doelblokken het bericht accepteert. De
priority_bufferklasse behoudt de volgorde van de uitgaande berichten. Daarom moet het eerste bericht in de uitvoerwachtrij worden geaccepteerd door een doelblok voordat deze methode een ander bericht aan de doelblokken biedt.Definieer de
protectedmethode in depropagate_messagesectie.// Asynchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::propagate. virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Asynchronously send the message to the target blocks. async_send(NULL); return accepted; } else { return missed; } }Met
propagate_messagede methode kan depriority_bufferklasse fungeren als een ontvanger van een bericht of als doel. Deze methode ontvangt het bericht dat wordt aangeboden door het opgegeven bronblok en voegt dat bericht in de prioriteitswachtrij in. Depropagate_messagemethode verzendt vervolgens asynchroon alle uitvoerberichten naar de doelblokken.De runtime roept deze methode aan wanneer u de gelijktijdigheidsfunctie::asend aanroept of wanneer het berichtblok is verbonden met andere berichtblokken.
Definieer de
protectedmethode in desend_messagesectie.// Synchronously passes a message from an ISource block to this block. // This method is typically called by propagator_block::send. virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message, concurrency::ISource<_Source_type>* source) { // Accept the message from the source block. message = source->accept(message->msg_id(), this); if (message != NULL) { // Insert the message into the input queue. The type parameter Pr // defines how to order messages by priority. { concurrency::critical_section::scoped_lock lock(_input_lock); _input_messages.push(message); } // Synchronously send the message to the target blocks. sync_send(NULL); return accepted; } else { return missed; } }De
send_messagemethode lijkt oppropagate_message. De uitvoerberichten worden echter synchroon verzonden in plaats van asynchroon.De runtime roept deze methode aan tijdens een synchrone verzendbewerking, bijvoorbeeld wanneer u de gelijktijdigheidsfunctie::send aanroept.
De priority_buffer klasse bevat constructoroverbelastingen die typisch zijn in veel berichtbloktypen. Sommige constructor-overloads hebben concurrency::Scheduler of concurrency::ScheduleGroup objecten, waardoor het berichtblok kan worden beheerd door een specifieke taakplanner. Andere constructoroverbelastingen hebben een filterfunctie. Met filterfuncties kunnen berichtblokken een bericht accepteren of weigeren op basis van de nettolading. Zie Asynchrone berichtblokken voor meer informatie over berichtfilters. Zie Task Scheduler voor meer informatie over taakplanners.
Omdat de priority_buffer klasse berichten rangschikt op prioriteit en vervolgens op de volgorde waarin berichten worden ontvangen, is deze klasse het handigst wanneer berichten asynchroon worden ontvangen, bijvoorbeeld wanneer u de gelijktijdigheid aanroept ::asend , of wanneer het berichtblok is verbonden met andere berichtblokken.
[Boven]
Het volledige voorbeeld
In het volgende voorbeeld ziet u de volledige definitie van de priority_buffer klasse.
// priority_buffer.h
#pragma once
#include <agents.h>
#include <queue>
namespace std
{
// A specialization of less that tests whether the priority element of a
// message is less than the priority element of another message.
template<class Type, class PriorityType>
struct less<concurrency::message<tuple<PriorityType,Type>>*>
{
typedef concurrency::message<tuple<PriorityType, Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator< to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) < get<0>(right->payload));
}
};
// A specialization of less that tests whether the priority element of a
// message is greater than the priority element of another message.
template<class Type, class PriorityType>
struct greater<concurrency::message<tuple<PriorityType, Type>>*>
{
typedef concurrency::message<std::tuple<PriorityType,Type>> MessageType;
bool operator()(const MessageType* left, const MessageType* right) const
{
// apply operator> to the first element (the priority)
// of the tuple payload.
return (get<0>(left->payload) > get<0>(right->payload));
}
};
}
namespace concurrencyex
{
// A message block type that orders incoming messages first by priority,
// and then by the order in which messages are received.
template<class Type,
typename PriorityType = int,
typename Pr = std::less<message<std::tuple<PriorityType, Type>>*>>
class priority_buffer : public concurrency::propagator_block<concurrency::multi_link_registry<concurrency::ITarget<Type>>,
concurrency::multi_link_registry<concurrency::ISource<std::tuple<PriorityType, Type>>>>
{
public:
// Constructs a priority_buffer message block.
priority_buffer()
{
initialize_source_and_target();
}
// Constructs a priority_buffer message block with the given filter function.
priority_buffer(filter_method const& filter)
{
initialize_source_and_target();
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler)
{
initialize_source_and_target(&scheduler);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided Scheduler object to propagate messages.
priority_buffer(concurrency::Scheduler& scheduler, filter_method const& filter)
{
initialize_source_and_target(&scheduler);
register_filter(filter);
}
// Constructs a priority_buffer message block that uses the provided
// SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group)
{
initialize_source_and_target(NULL, &schedule_group);
}
// Constructs a priority_buffer message block with the given filter function
// and uses the provided SchedulerGroup object to propagate messages.
priority_buffer(concurrency::ScheduleGroup& schedule_group, filter_method const& filter)
{
initialize_source_and_target(NULL, &schedule_group);
register_filter(filter);
}
// Destroys the message block.
~priority_buffer()
{
// Remove all links.
remove_network_links();
}
// Sends an item to the message block.
bool enqueue(Type const& item)
{
return concurrency::asend<Type>(this, item);
}
// Receives an item from the message block.
Type dequeue()
{
return receive<Type>(this);
}
protected:
// Asynchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::propagate.
virtual concurrency::message_status propagate_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Asynchronously send the message to the target blocks.
async_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Synchronously passes a message from an ISource block to this block.
// This method is typically called by propagator_block::send.
virtual concurrency::message_status send_message(concurrency::message<_Source_type>* message,
concurrency::ISource<_Source_type>* source)
{
// Accept the message from the source block.
message = source->accept(message->msg_id(), this);
if (message != NULL)
{
// Insert the message into the input queue. The type parameter Pr
// defines how to order messages by priority.
{
concurrency::critical_section::scoped_lock lock(_input_lock);
_input_messages.push(message);
}
// Synchronously send the message to the target blocks.
sync_send(NULL);
return accepted;
}
else
{
return missed;
}
}
// Accepts a message that was offered by this block by transferring ownership
// to the caller.
virtual concurrency::message<_Target_type>* accept_message(concurrency::runtime_object_identity msg_id)
{
concurrency::message<_Target_type>* message = NULL;
// Transfer ownership if the provided message identifier matches
// the identifier of the front of the output message queue.
if (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id)
{
message = _output_messages.front();
_output_messages.pop();
}
return message;
}
// Reserves a message that was previously offered by this block.
virtual bool reserve_message(concurrency::runtime_object_identity msg_id)
{
// Allow the message to be reserved if the provided message identifier
// is the message identifier of the front of the message queue.
return (!_output_messages.empty() &&
_output_messages.front()->msg_id() == msg_id);
}
// Transfers the message that was previously offered by this block
// to the caller. The caller of this method is the target block that
// reserved the message.
virtual concurrency::message<Type>* consume_message(concurrency::runtime_object_identity msg_id)
{
// Transfer ownership of the message to the caller.
return accept_message(msg_id);
}
// Releases a previous message reservation.
virtual void release_message(concurrency::runtime_object_identity msg_id)
{
// The head message must be the one that is reserved.
if (_output_messages.empty() ||
_output_messages.front()->msg_id() != msg_id)
{
throw message_not_found();
}
}
// Resumes propagation after a reservation has been released.
virtual void resume_propagation()
{
// Propagate out any messages in the output queue.
if (_output_messages.size() > 0)
{
async_send(NULL);
}
}
// Notifies this block that a new target has been linked to it.
virtual void link_target_notification(concurrency::ITarget<_Target_type>*)
{
// Do not propagate messages if a target block reserves
// the message at the front of the queue.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out any messages that are in the output queue.
propagate_priority_order();
}
// Transfers the message at the front of the input queue to the output queue
// and propagates out all messages in the output queue.
virtual void propagate_to_any_targets(concurrency::message<_Target_type>*)
{
// Retrieve the message from the front of the input queue.
concurrency::message<_Source_type>* input_message = NULL;
{
concurrency::critical_section::scoped_lock lock(_input_lock);
if (_input_messages.size() > 0)
{
input_message = _input_messages.top();
_input_messages.pop();
}
}
// Move the message to the output queue.
if (input_message != NULL)
{
// The payload of the output message does not contain the
// priority of the message.
concurrency::message<_Target_type>* output_message =
new concurrency::message<_Target_type>(get<1>(input_message->payload));
_output_messages.push(output_message);
// Free the memory for the input message.
delete input_message;
// Do not propagate messages if the new message is not the head message.
// In this case, the head message is reserved by another message block.
if (_output_messages.front()->msg_id() != output_message->msg_id())
{
return;
}
}
// Propagate out the output messages.
propagate_priority_order();
}
private:
// Propagates messages in priority order.
void propagate_priority_order()
{
// Cancel propagation if another block reserves the head message.
if (_M_pReservedFor != NULL)
{
return;
}
// Propagate out all output messages.
// Because this block preserves message ordering, stop propagation
// if any of the messages are not accepted by a target block.
while (!_output_messages.empty())
{
// Get the next message.
concurrency::message<_Target_type> * message = _output_messages.front();
concurrency::message_status status = declined;
// Traverse each target in the order in which they are connected.
for (target_iterator iter = _M_connectedTargets.begin();
*iter != NULL;
++iter)
{
// Propagate the message to the target.
concurrency::ITarget<_Target_type>* target = *iter;
status = target->propagate(message, this);
// If the target accepts the message then ownership of message has
// changed. Do not propagate this message to any other target.
if (status == accepted)
{
break;
}
// If the target only reserved this message, we must wait until the
// target accepts the message.
if (_M_pReservedFor != NULL)
{
break;
}
}
// If status is anything other than accepted, then the head message
// was not propagated out. To preserve the order in which output
// messages are propagated, we must stop propagation until the head
// message is accepted.
if (status != accepted)
{
break;
}
}
}
private:
// Stores incoming messages.
// The type parameter Pr specifies how to order messages by priority.
std::priority_queue<
concurrency::message<_Source_type>*,
std::vector<concurrency::message<_Source_type>*>,
Pr
> _input_messages;
// Synchronizes access to the input message queue.
concurrency::critical_section _input_lock;
// Stores outgoing messages.
std::queue<concurrency::message<_Target_type>*> _output_messages;
private:
// Hide assignment operator and copy constructor.
priority_buffer const &operator =(priority_buffer const&);
priority_buffer(priority_buffer const &);
};
}
In het volgende voorbeeld worden meerdere asend en concurrency::receive-bewerkingen gelijktijdig uitgevoerd op een priority_buffer object.
// priority_buffer.cpp
// compile with: /EHsc
#include <ppl.h>
#include <iostream>
#include "priority_buffer.h"
using namespace concurrency;
using namespace concurrencyex;
using namespace std;
int wmain()
{
// Concurrently perform a number of asend and receive operations
// on a priority_buffer object.
priority_buffer<int> pb;
parallel_invoke(
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(2, 36)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(0, 12)); },
[&pb] { for (int i = 0; i < 25; ++i) asend(pb, make_tuple(1, 24)); },
[&pb] {
for (int i = 0; i < 75; ++i) {
wcout << receive(pb) << L' ';
if ((i+1) % 25 == 0)
wcout << endl;
}
}
);
}
In dit voorbeeld wordt de volgende voorbeelduitvoer geproduceerd.
36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36 36
24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24 24
12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12 12
De priority_buffer klasse rangschikt berichten eerst op prioriteit en vervolgens op de volgorde waarin berichten worden ontvangen. In dit voorbeeld worden berichten met een hogere numerieke prioriteit ingevoegd aan de voorkant van de wachtrij.
[Boven]
De code compileren
Kopieer de voorbeeldcode en plak deze in een Visual Studio-project of plak de definitie van de priority_buffer klasse in een bestand met de naam priority_buffer.h en het testprogramma in een bestand met de naam priority_buffer.cpp en voer vervolgens de volgende opdracht uit in een Visual Studio-opdrachtpromptvenster.
cl.exe /EHsc-priority_buffer.cpp
Zie ook
Gelijktijdigheidsruntime-handleidingen
Asynchrone berichtblokken
Functies voor het doorgeven van berichten